Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9bddf99d
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
9bddf99d
编写于
7月 25, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor(query): do some internal refactor.
上级
112642a3
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
265 addition
and
591 deletion
+265
-591
source/dnode/mnode/impl/src/mndProfile.c
source/dnode/mnode/impl/src/mndProfile.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+226
-537
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+0
-2
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-1
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+4
-12
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+30
-30
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+3
-8
未找到文件。
source/dnode/mnode/impl/src/mndProfile.c
浏览文件 @
9bddf99d
...
...
@@ -312,7 +312,7 @@ static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) {
pConn
->
numOfQueries
=
pBasic
->
queryDesc
?
taosArrayGetSize
(
pBasic
->
queryDesc
)
:
0
;
pBasic
->
queryDesc
=
NULL
;
mDebug
(
"queries updated in conn %
d
, num:%d"
,
pConn
->
id
,
pConn
->
numOfQueries
);
mDebug
(
"queries updated in conn %
u
, num:%d"
,
pConn
->
id
,
pConn
->
numOfQueries
);
taosWUnLockLatch
(
&
pConn
->
queryLock
);
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
9bddf99d
...
...
@@ -16,6 +16,12 @@
#include "tsdb.h"
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
typedef
enum
{
EXTERNAL_ROWS_PREV
=
0x1
,
EXTERNAL_ROWS_MAIN
=
0x2
,
EXTERNAL_ROWS_NEXT
=
0x3
,
}
EContentData
;
typedef
struct
{
STbDataIter
*
iter
;
int32_t
index
;
...
...
@@ -70,9 +76,9 @@ typedef struct SFilesetIter {
}
SFilesetIter
;
typedef
struct
SFileDataBlockInfo
{
int32_t
tbBlockIdx
;
// index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
// index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
uint64_t
uid
;
int32_t
tbBlockIdx
;
}
SFileDataBlockInfo
;
typedef
struct
SDataBlockIter
{
...
...
@@ -99,12 +105,11 @@ typedef struct SReaderStatus {
SHashObj
*
pTableMap
;
// SHash<STableBlockScanInfo>
STableBlockScanInfo
*
pTableIter
;
// table iterator used in building in-memory buffer data blocks.
SFileBlockDumpInfo
fBlockDumpInfo
;
SDFileSet
*
pCurrentFileset
;
// current opened file set
SBlockData
fileBlockData
;
SFilesetIter
fileIter
;
SDataBlockIter
blockIter
;
bool
composedDataBlock
;
// the returned data block is a composed block or not
SDFileSet
*
pCurrentFileset
;
// current opened file set
SBlockData
fileBlockData
;
SFilesetIter
fileIter
;
SDataBlockIter
blockIter
;
bool
composedDataBlock
;
// the returned data block is a composed block or not
}
SReaderStatus
;
struct
STsdbReader
{
...
...
@@ -115,15 +120,17 @@ struct STsdbReader {
SSDataBlock
*
pResBlock
;
int32_t
capacity
;
SReaderStatus
status
;
char
*
idStr
;
// query info handle, for debug purpose
int32_t
type
;
// query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
char
*
idStr
;
// query info handle, for debug purpose
int32_t
type
;
// query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
SBlockLoadSuppInfo
suppInfo
;
STsdbReadSnap
*
pReadSnap
;
SIOCostSummary
cost
;
STSchema
*
pSchema
;
SDataFReader
*
pFileReader
;
SVersionRange
verRange
;
SIOCostSummary
cost
;
STSchema
*
pSchema
;
SDataFReader
*
pFileReader
;
SVersionRange
verRange
;
int32_t
step
;
STsdbReader
*
innerReader
[
2
];
};
static
SFileDataBlockInfo
*
getCurrentBlockInfo
(
SDataBlockIter
*
pBlockIter
);
...
...
@@ -200,6 +207,9 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
pTsdbReader
->
idStr
);
}
tsdbDebug
(
"%p create %d tables scan-info, size:%.2f Kb, %s"
,
pTsdbReader
,
numOfTables
,
(
sizeof
(
STableBlockScanInfo
)
*
numOfTables
)
/
1024
.
0
,
pTsdbReader
->
idStr
);
return
pTableMap
;
}
...
...
@@ -328,7 +338,7 @@ static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
continue
;
}
tsdbDebug
(
"%p file found fid:%d for qrange:%"
PRId64
"-%"
PRId64
",
ignore,
%s"
,
pReader
,
fid
,
pReader
->
window
.
skey
,
tsdbDebug
(
"%p file found fid:%d for qrange:%"
PRId64
"-%"
PRId64
", %s"
,
pReader
,
fid
,
pReader
->
window
.
skey
,
pReader
->
window
.
ekey
,
pReader
->
idStr
);
return
true
;
}
...
...
@@ -378,7 +388,7 @@ static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity)
return
pResBlock
;
}
static
int32_t
tsdbReaderCreate
(
SVnode
*
pVnode
,
SQueryTableDataCond
*
pCond
,
STsdbReader
**
ppReader
,
const
char
*
idstr
)
{
static
int32_t
tsdbReaderCreate
(
SVnode
*
pVnode
,
SQueryTableDataCond
*
pCond
,
STsdbReader
**
ppReader
,
int32_t
capacity
,
const
char
*
idstr
)
{
int32_t
code
=
0
;
int8_t
level
=
0
;
STsdbReader
*
pReader
=
(
STsdbReader
*
)
taosMemoryCalloc
(
1
,
sizeof
(
*
pReader
));
...
...
@@ -392,7 +402,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
pReader
->
pTsdb
=
getTsdbByRetentions
(
pVnode
,
pCond
->
twindows
.
skey
,
pVnode
->
config
.
tsdbCfg
.
retentions
,
idstr
,
&
level
);
pReader
->
suid
=
pCond
->
suid
;
pReader
->
order
=
pCond
->
order
;
pReader
->
capacity
=
4096
;
pReader
->
capacity
=
capacity
;
pReader
->
idStr
=
(
idstr
!=
NULL
)
?
strdup
(
idstr
)
:
NULL
;
pReader
->
verRange
=
getQueryVerRange
(
pVnode
,
pCond
,
level
);
pReader
->
type
=
pCond
->
type
;
...
...
@@ -483,95 +493,6 @@ _end:
// return res;
// }
// static TSKEY extractFirstTraverseKey(STableBlockScanInfo* pCheckInfo, int32_t order, int32_t update, TDRowVerT
// maxVer) {
// TSDBROW row = {0};
// STSRow *rmem = NULL, *rimem = NULL;
// if (pCheckInfo->iter) {
// if (tsdbTbDataIterGet(pCheckInfo->iter, &row)) {
// rmem = row.pTSRow;
// }
// }
// if (pCheckInfo->iiter) {
// if (tsdbTbDataIterGet(pCheckInfo->iiter, &row)) {
// rimem = row.pTSRow;
// }
// }
// if (rmem == NULL && rimem == NULL) {
// return TSKEY_INITIAL_VAL;
// }
// if (rmem != NULL && rimem == NULL) {
// pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
// return TD_ROW_KEY(rmem);
// }
// if (rmem == NULL && rimem != NULL) {
// pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
// return TD_ROW_KEY(rimem);
// }
// TSKEY r1 = TD_ROW_KEY(rmem);
// TSKEY r2 = TD_ROW_KEY(rimem);
// if (r1 == r2) {
// if (TD_SUPPORT_UPDATE(update)) {
// pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
// } else {
// pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
// tsdbTbDataIterNext(pCheckInfo->iter);
// }
// return r1;
// } else if (r1 < r2 && ASCENDING_TRAVERSE(order)) {
// pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
// return r1;
// } else {
// pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
// return r2;
// }
// }
// static bool moveToNextRowInMem(STableBlockScanInfo* pCheckInfo) {
// bool hasNext = false;
// if (pCheckInfo->chosen == CHECKINFO_CHOSEN_MEM) {
// if (pCheckInfo->iter != NULL) {
// hasNext = tsdbTbDataIterNext(pCheckInfo->iter);
// }
// if (hasNext) {
// return hasNext;
// }
// if (pCheckInfo->iiter != NULL) {
// return tsdbTbDataIterGet(pCheckInfo->iiter, NULL);
// }
// } else if (pCheckInfo->chosen == CHECKINFO_CHOSEN_IMEM) {
// if (pCheckInfo->iiter != NULL) {
// hasNext = tsdbTbDataIterNext(pCheckInfo->iiter);
// }
// if (hasNext) {
// return hasNext;
// }
// if (pCheckInfo->iter != NULL) {
// return tsdbTbDataIterGet(pCheckInfo->iter, NULL);
// }
// } else {
// if (pCheckInfo->iter != NULL) {
// hasNext = tsdbTbDataIterNext(pCheckInfo->iter);
// }
// if (pCheckInfo->iiter != NULL) {
// hasNext = tsdbTbDataIterNext(pCheckInfo->iiter) || hasNext;
// }
// }
// return hasNext;
// }
// static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
// int32_t firstSlot = 0;
// int32_t lastSlot = numOfBlocks - 1;
...
...
@@ -602,18 +523,22 @@ _end:
static
int32_t
doLoadBlockIndex
(
STsdbReader
*
pReader
,
SDataFReader
*
pFileReader
,
SArray
*
pIndexList
)
{
SArray
*
aBlockIdx
=
taosArrayInit
(
0
,
sizeof
(
SBlockIdx
));
int64_t
st
=
taosGetTimestampUs
();
int32_t
code
=
tsdbReadBlockIdx
(
pFileReader
,
aBlockIdx
,
NULL
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_end
;
}
if
(
taosArrayGetSize
(
aBlockIdx
)
==
0
)
{
size_t
num
=
taosArrayGetSize
(
aBlockIdx
);
if
(
num
==
0
)
{
taosArrayClear
(
aBlockIdx
);
return
TSDB_CODE_SUCCESS
;
}
SBlockIdx
*
pBlockIdx
;
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
aBlockIdx
);
++
i
)
{
int64_t
et1
=
taosGetTimestampUs
();
SBlockIdx
*
pBlockIdx
=
NULL
;
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
pBlockIdx
=
(
SBlockIdx
*
)
taosArrayGet
(
aBlockIdx
,
i
);
// uid check
...
...
@@ -627,17 +552,6 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader,
continue
;
}
// todo: not valid info in bockIndex
// time range check
// if (pBlockIdx->minKey > pReader->window.ekey || pBlockIdx->maxKey < pReader->window.skey) {
// continue;
// }
// version check
// if (pBlockIdx->minVersion > pReader->verRange.maxVer || pBlockIdx->maxVersion < pReader->verRange.minVer) {
// continue;
// }
STableBlockScanInfo
*
pScanInfo
=
p
;
if
(
pScanInfo
->
pBlockList
==
NULL
)
{
pScanInfo
->
pBlockList
=
taosArrayInit
(
16
,
sizeof
(
SBlock
));
...
...
@@ -647,6 +561,9 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader,
taosArrayPush
(
pIndexList
,
pBlockIdx
);
}
int64_t
et2
=
taosGetTimestampUs
();
tsdbDebug
(
"load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%d bytes %s"
,
(
int32_t
)
num
,
(
et1
-
st
)
/
1000
.
0
,
(
et2
-
et1
)
/
1000
.
0
,
num
*
sizeof
(
SBlockIdx
),
pReader
->
idStr
);
_end:
taosArrayDestroy
(
aBlockIdx
);
return
code
;
...
...
@@ -655,9 +572,11 @@ _end:
static
int32_t
doLoadFileBlock
(
STsdbReader
*
pReader
,
SArray
*
pIndexList
,
uint32_t
*
numOfValidTables
,
int32_t
*
numOfBlocks
)
{
size_t
numOfTables
=
taosArrayGetSize
(
pIndexList
);
*
numOfValidTables
=
0
;
int64_t
st
=
taosGetTimestampUs
();
size_t
size
=
0
;
STableBlockScanInfo
*
px
=
NULL
;
while
(
1
)
{
px
=
taosHashIterate
(
pReader
->
status
.
pTableMap
,
px
);
...
...
@@ -675,6 +594,8 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_
tMapDataReset
(
&
mapData
);
tsdbReadBlock
(
pReader
->
pFileReader
,
pBlockIdx
,
&
mapData
,
NULL
);
size
+=
mapData
.
nData
;
STableBlockScanInfo
*
pScanInfo
=
taosHashGet
(
pReader
->
status
.
pTableMap
,
&
pBlockIdx
->
uid
,
sizeof
(
int64_t
));
for
(
int32_t
j
=
0
;
j
<
mapData
.
nItem
;
++
j
)
{
SBlock
block
=
{
0
};
...
...
@@ -706,6 +627,10 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_
}
}
int64_t
et
=
taosGetTimestampUs
();
tsdbDebug
(
"load block of %d tables completed, blocks:%d in %d tables, size:%.2f Kb, elapsed time:%.2f ms %s"
,
numOfTables
,
*
numOfBlocks
,
*
numOfValidTables
,
size
/
1000
.
0
,
(
et
-
st
)
/
1000
.
0
,
pReader
->
idStr
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -816,7 +741,6 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
return
TSDB_CODE_SUCCESS
;
}
// todo consider the output buffer size
static
int32_t
doLoadFileBlockData
(
STsdbReader
*
pReader
,
SDataBlockIter
*
pBlockIter
,
STableBlockScanInfo
*
pBlockScanInfo
,
SBlockData
*
pBlockData
)
{
int64_t
st
=
taosGetTimestampUs
();
...
...
@@ -853,346 +777,6 @@ _error:
return
code
;
}
// static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
// int firstPos, lastPos, midPos = -1;
// int numOfRows;
// TSKEY* keyList;
// assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
// if (num <= 0) return -1;
// keyList = (TSKEY*)pValue;
// firstPos = 0;
// lastPos = num - 1;
// if (order == TSDB_ORDER_DESC) {
// // find the first position which is smaller than the key
// while (1) {
// if (key >= keyList[lastPos]) return lastPos;
// if (key == keyList[firstPos]) return firstPos;
// if (key < keyList[firstPos]) return firstPos - 1;
// numOfRows = lastPos - firstPos + 1;
// midPos = (numOfRows >> 1) + firstPos;
// if (key < keyList[midPos]) {
// lastPos = midPos - 1;
// } else if (key > keyList[midPos]) {
// firstPos = midPos + 1;
// } else {
// break;
// }
// }
// } else {
// // find the first position which is bigger than the key
// while (1) {
// if (key <= keyList[firstPos]) return firstPos;
// if (key == keyList[lastPos]) return lastPos;
// if (key > keyList[lastPos]) {
// lastPos = lastPos + 1;
// if (lastPos >= num)
// return -1;
// else
// return lastPos;
// }
// numOfRows = lastPos - firstPos + 1;
// midPos = (numOfRows >> 1) + firstPos;
// if (key < keyList[midPos]) {
// lastPos = midPos - 1;
// } else if (key > keyList[midPos]) {
// firstPos = midPos + 1;
// } else {
// break;
// }
// }
// }
// return midPos;
// }
// static void doCheckGeneratedBlockRange(STsdbReader* pTsdbReadHandle) {
// SQueryFilePos* cur = &pTsdbReadHandle->cur;
// if (cur->rows > 0) {
// if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
// assert(cur->win.skey >= pTsdbReadHandle->window.skey && cur->win.ekey <= pTsdbReadHandle->window.ekey);
// } else {
// assert(cur->win.skey >= pTsdbReadHandle->window.ekey && cur->win.ekey <= pTsdbReadHandle->window.skey);
// }
// SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, 0);
// assert(cur->win.skey == ((TSKEY*)pColInfoData->pData)[0] &&
// cur->win.ekey == ((TSKEY*)pColInfoData->pData)[cur->rows - 1]);
// } else {
// cur->win = pTsdbReadHandle->window;
// int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
// cur->lastKey = pTsdbReadHandle->window.ekey + step;
// }
// }
// static void copyAllRemainRowsFromFileBlock(STsdbReader* pTsdbReadHandle, STableBlockScanInfo* pCheckInfo,
// SDataBlockInfo* pBlockInfo, int32_t endPos) {
// SQueryFilePos* cur = &pTsdbReadHandle->cur;
// SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
// TSKEY* tsArray = pCols->cols[0].pData;
// bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
// int32_t step = ascScan ? 1 : -1;
// int32_t start = cur->pos;
// int32_t end = endPos;
// if (!ascScan) {
// TSWAP(start, end);
// }
// assert(pTsdbReadHandle->outputCapacity >= (end - start + 1));
// int32_t numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, start, end);
// // the time window should always be ascending order: skey <= ekey
// cur->win = (STimeWindow){.skey = tsArray[start], .ekey = tsArray[end]};
// cur->mixBlock = (numOfRows != pBlockInfo->rows);
// cur->lastKey = tsArray[endPos] + step;
// cur->blockCompleted = (ascScan ? (endPos == pBlockInfo->rows - 1) : (endPos == 0));
// // The value of pos may be -1 or pBlockInfo->rows, and it is invalid in both cases.
// int32_t pos = endPos + step;
// updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
// doCheckGeneratedBlockRange(pTsdbReadHandle);
// tsdbDebug("%p uid:%" PRIu64 ", data block created, mixblock:%d, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s",
// pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows,
// pTsdbReadHandle->idStr);
// }
// // only return the qualified data to client in terms of query time window, data rows in the same block but do not
// // be included in the query time window will be discarded
// static void doMergeTwoLevelData(STsdbReader* pTsdbReadHandle, STableBlockScanInfo* pCheckInfo, SBlock* pBlock) {
// SQueryFilePos* cur = &pTsdbReadHandle->cur;
// SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
// STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
// initTableMemIterator(pTsdbReadHandle, pCheckInfo);
// SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
// assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_ID &&
// cur->pos >= 0 && cur->pos < pBlock->numOfRows);
// // Even Multi-Version supported, the records with duplicated TSKEY would be merged inside of tsdbLoadData
// interface. TSKEY* tsArray = pCols->cols[0].pData; assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] ==
// pBlock->minKey.ts &&
// tsArray[pBlock->numOfRows - 1] == pBlock->maxKey.ts);
// bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
// int32_t step = ascScan ? 1 : -1;
// // for search the endPos, so the order needs to reverse
// int32_t order = ascScan ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
// int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
// int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo);
// STimeWindow* pWin = &blockInfo.window;
// tsdbDebug("%p uid:%" PRIu64 " start merge data block, file block range:%" PRIu64 "-%" PRIu64
// " rows:%d, start:%d, end:%d, %s",
// pTsdbReadHandle, pCheckInfo->tableId, pWin->skey, pWin->ekey, blockInfo.rows, cur->pos, endPos,
// pTsdbReadHandle->idStr);
// // compared with the data from in-memory buffer, to generate the correct timestamp array list
// int32_t numOfRows = 0;
// int32_t curRow = 0;
// int16_t rv1 = -1;
// int16_t rv2 = -1;
// STSchema* pSchema1 = NULL;
// STSchema* pSchema2 = NULL;
// int32_t pos = cur->pos;
// cur->win = TSWINDOW_INITIALIZER;
// bool adjustPos = false;
// // no data in buffer, load data from file directly
// if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) {
// copyAllRemainRowsFromFileBlock(pTsdbReadHandle, pCheckInfo, &blockInfo, endPos);
// return;
// } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
// SSkipListNode* node = NULL;
// TSKEY lastKeyAppend = TSKEY_INITIAL_VAL;
// do {
// STSRow* row2 = NULL;
// STSRow* row1 = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, &row2, TD_VER_MAX);
// if (row1 == NULL) {
// break;
// }
// TSKEY key = TD_ROW_KEY(row1);
// if ((key > pTsdbReadHandle->window.ekey && ascScan) || (key < pTsdbReadHandle->window.ekey && !ascScan)) {
// break;
// }
// if (adjustPos) {
// if (key == lastKeyAppend) {
// pos -= step;
// }
// adjustPos = false;
// }
// if (((pos > endPos || tsArray[pos] > pTsdbReadHandle->window.ekey) && ascScan) ||
// ((pos < endPos || tsArray[pos] < pTsdbReadHandle->window.ekey) && !ascScan)) {
// break;
// }
// if ((key < tsArray[pos] && ascScan) || (key > tsArray[pos] && !ascScan)) {
// if (rv1 != TD_ROW_SVER(row1)) {
// // pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
// rv1 = TD_ROW_SVER(row1);
// }
// if (row2 && rv2 != TD_ROW_SVER(row2)) {
// // pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
// rv2 = TD_ROW_SVER(row2);
// }
// numOfRows +=
// mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
// pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);
// if (cur->win.skey == TSKEY_INITIAL_VAL) {
// cur->win.skey = key;
// }
// cur->win.ekey = key;
// cur->lastKey = key + step;
// cur->mixBlock = true;
// moveToNextRowInMem(pCheckInfo);
// } else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it
// if (TD_SUPPORT_UPDATE(pCfg->update)) {
// if (lastKeyAppend != key) {
// if (lastKeyAppend != TSKEY_INITIAL_VAL) {
// ++curRow;
// }
// lastKeyAppend = key;
// }
// // load data from file firstly
// numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, pos, pos);
// if (rv1 != TD_ROW_SVER(row1)) {
// rv1 = TD_ROW_SVER(row1);
// }
// if (row2 && rv2 != TD_ROW_SVER(row2)) {
// rv2 = TD_ROW_SVER(row2);
// }
// // still assign data into current row
// numOfRows +=
// mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
// pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);
// if (cur->win.skey == TSKEY_INITIAL_VAL) {
// cur->win.skey = key;
// }
// cur->win.ekey = key;
// cur->lastKey = key + step;
// cur->mixBlock = true;
// moveToNextRowInMem(pCheckInfo);
// pos += step;
// adjustPos = true;
// } else {
// // discard the memory record
// moveToNextRowInMem(pCheckInfo);
// }
// } else if ((key > tsArray[pos] && ascScan) || (key < tsArray[pos] && !ascScan)) {
// if (cur->win.skey == TSKEY_INITIAL_VAL) {
// cur->win.skey = tsArray[pos];
// }
// int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order);
// assert(end != -1);
// if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it
// #if 0
// if (pCfg->update == TD_ROW_DISCARD_UPDATE) {
// moveToNextRowInMem(pCheckInfo);
// } else {
// end -= step;
// }
// #endif
// if (!TD_SUPPORT_UPDATE(pCfg->update)) {
// moveToNextRowInMem(pCheckInfo);
// } else {
// end -= step;
// }
// }
// int32_t qstart = 0, qend = 0;
// getQualifiedRowsPos(pTsdbReadHandle, pos, end, numOfRows, &qstart, &qend);
// if ((lastKeyAppend != TSKEY_INITIAL_VAL) && (lastKeyAppend != (ascScan ? tsArray[qstart] : tsArray[qend]))) {
// ++curRow;
// }
// numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, qstart, qend);
// pos += (qend - qstart + 1) * step;
// if (numOfRows > 0) {
// curRow = numOfRows - 1;
// }
// cur->win.ekey = ascScan ? tsArray[qend] : tsArray[qstart];
// cur->lastKey = cur->win.ekey + step;
// lastKeyAppend = cur->win.ekey;
// }
// } while (numOfRows < pTsdbReadHandle->outputCapacity);
// if (numOfRows < pTsdbReadHandle->outputCapacity) {
// /**
// * if cache is empty, load remain file block data. In contrast, if there are remain data in cache, do NOT
// * copy them all to result buffer, since it may be overlapped with file data block.
// */
// if (node == NULL || ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) > pTsdbReadHandle->window.ekey) && ascScan)
// ||
// ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) < pTsdbReadHandle->window.ekey) && !ascScan)) {
// // no data in cache or data in cache is greater than the ekey of time window, load data from file block
// if (cur->win.skey == TSKEY_INITIAL_VAL) {
// cur->win.skey = tsArray[pos];
// }
// int32_t start = -1, end = -1;
// getQualifiedRowsPos(pTsdbReadHandle, pos, endPos, numOfRows, &start, &end);
// numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, start, end);
// pos += (end - start + 1) * step;
// cur->win.ekey = ascScan ? tsArray[end] : tsArray[start];
// cur->lastKey = cur->win.ekey + step;
// cur->mixBlock = true;
// }
// }
// }
// cur->blockCompleted = (((pos > endPos || cur->lastKey > pTsdbReadHandle->window.ekey) && ascScan) ||
// ((pos < endPos || cur->lastKey < pTsdbReadHandle->window.ekey) && !ascScan));
// if (!ascScan) {
// TSWAP(cur->win.skey, cur->win.ekey);
// }
// updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
// doCheckGeneratedBlockRange(pTsdbReadHandle);
// tsdbDebug("%p uid:%" PRIu64 ", data block created, mixblock:%d, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s",
// pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows,
// pTsdbReadHandle->idStr);
// }
static
void
cleanupBlockOrderSupporter
(
SBlockOrderSupporter
*
pSup
)
{
taosMemoryFreeClear
(
pSup
->
numOfBlocksPerTable
);
taosMemoryFreeClear
(
pSup
->
indexPerTable
);
...
...
@@ -1252,8 +836,9 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
// access data blocks according to the offset of each block in asc/desc order.
int32_t
numOfTables
=
(
int32_t
)
taosHashGetSize
(
pReader
->
status
.
pTableMap
);
SBlockOrderSupporter
sup
=
{
0
}
;
int64_t
st
=
taosGetTimestampUs
()
;
SBlockOrderSupporter
sup
=
{
0
};
int32_t
code
=
initBlockOrderSupporter
(
&
sup
,
numOfTables
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
...
...
@@ -1302,11 +887,12 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
SFileDataBlockInfo
blockInfo
=
{.
uid
=
sup
.
pDataBlockInfo
[
0
][
i
].
uid
,
.
tbBlockIdx
=
i
};
taosArrayPush
(
pBlockIter
->
blockList
,
&
blockInfo
);
}
tsdbDebug
(
"%p create blocks info struct completed for one table, %d blocks not sorted %s"
,
pReader
,
cnt
,
pReader
->
idStr
);
pBlockIter
->
index
=
asc
?
0
:
(
numOfBlocks
-
1
);
int64_t
et
=
taosGetTimestampUs
();
tsdbDebug
(
"%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s"
,
pReader
,
cnt
,
(
et
-
st
)
/
1000
.
0
,
pReader
->
idStr
);
pBlockIter
->
index
=
asc
?
0
:
(
numOfBlocks
-
1
);
cleanupBlockOrderSupporter
(
&
sup
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1340,7 +926,8 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
tMergeTreeAdjust
(
pTree
,
tMergeTreeGetAdjustIndex
(
pTree
));
}
tsdbDebug
(
"%p %d data blocks sort completed, %s"
,
pReader
,
cnt
,
pReader
->
idStr
);
int64_t
et
=
taosGetTimestampUs
();
tsdbDebug
(
"%p %d data blocks access order completed, elapsed time:%.2f ms %s"
,
pReader
,
cnt
,
(
et
-
st
)
/
1000
.
0
,
pReader
->
idStr
);
cleanupBlockOrderSupporter
(
&
sup
);
taosMemoryFree
(
pTree
);
...
...
@@ -1813,6 +1400,8 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo*
SBlockData
*
pBlockData
=
&
pReader
->
status
.
fileBlockData
;
int32_t
step
=
ASCENDING_TRAVERSE
(
pReader
->
order
)
?
1
:
-
1
;
int64_t
st
=
taosGetTimestampUs
();
while
(
1
)
{
// todo check the validate of row in file block
{
...
...
@@ -1851,10 +1440,11 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo*
blockDataUpdateTsWindow
(
pResBlock
,
0
);
setComposedBlockFlag
(
pReader
,
true
);
int64_t
et
=
taosGetTimestampUs
();
tsdbDebug
(
"%p uid:%"
PRIu64
", composed data block created, brange:%"
PRIu64
"-%"
PRIu64
" rows:%d, %s"
,
pReader
,
tsdbDebug
(
"%p uid:%"
PRIu64
", composed data block created, brange:%"
PRIu64
"-%"
PRIu64
" rows:%d,
elapsed time:%.2f ms
%s"
,
pReader
,
pBlockScanInfo
->
uid
,
pResBlock
->
info
.
window
.
skey
,
pResBlock
->
info
.
window
.
ekey
,
pResBlock
->
info
.
rows
,
pReader
->
idStr
);
(
et
-
st
)
/
1000
.
0
,
pReader
->
idStr
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -2031,7 +1621,9 @@ static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pRead
static
int32_t
moveToNextFile
(
STsdbReader
*
pReader
,
int32_t
*
numOfBlocks
)
{
SReaderStatus
*
pStatus
=
&
pReader
->
status
;
SArray
*
pIndexList
=
taosArrayInit
(
4
,
sizeof
(
SBlockIdx
));
size_t
numOfTables
=
taosHashGetSize
(
pReader
->
status
.
pTableMap
);
SArray
*
pIndexList
=
taosArrayInit
(
numOfTables
,
sizeof
(
SBlockIdx
));
while
(
1
)
{
bool
hasNext
=
filesetIteratorNext
(
&
pStatus
->
fileIter
,
pReader
);
...
...
@@ -2803,24 +2395,57 @@ int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
// ====================================== EXPOSED APIs ======================================
int32_t
tsdbReaderOpen
(
SVnode
*
pVnode
,
SQueryTableDataCond
*
pCond
,
SArray
*
pTableList
,
STsdbReader
**
ppReader
,
const
char
*
idstr
)
{
int32_t
code
=
tsdbReaderCreate
(
pVnode
,
pCond
,
ppReader
,
idstr
);
if
(
code
)
{
int32_t
code
=
tsdbReaderCreate
(
pVnode
,
pCond
,
ppReader
,
4096
,
idstr
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_err
;
}
if
(
pCond
->
suid
!=
0
)
{
(
*
ppReader
)
->
pSchema
=
metaGetTbTSchema
((
*
ppReader
)
->
pTsdb
->
pVnode
->
pMeta
,
(
*
ppReader
)
->
suid
,
-
1
);
}
else
if
(
taosArrayGetSize
(
pTableList
)
>
0
)
{
STableKeyInfo
*
pKey
=
taosArrayGet
(
pTableList
,
0
);
(
*
ppReader
)
->
pSchema
=
metaGetTbTSchema
((
*
ppReader
)
->
pTsdb
->
pVnode
->
pMeta
,
pKey
->
uid
,
-
1
);
}
// check for query time window
STsdbReader
*
pReader
=
*
ppReader
;
if
(
isEmptyQueryTimeWindow
(
&
pReader
->
window
))
{
tsdbDebug
(
"%p query window not overlaps with the data set, no result returned, %s"
,
pReader
,
pReader
->
idStr
);
return
TSDB_CODE_SUCCESS
;
}
if
(
pCond
->
type
==
TIMEWINDOW_RANGE_EXTERNAL
)
{
// update the SQueryTableDataCond to create inner reader
STimeWindow
w
=
pCond
->
twindows
;
int32_t
order
=
pCond
->
order
;
if
(
order
==
TSDB_ORDER_ASC
)
{
pCond
->
twindows
.
ekey
=
pCond
->
twindows
.
skey
;
pCond
->
twindows
.
skey
=
INT64_MIN
;
pCond
->
order
=
TSDB_ORDER_DESC
;
}
else
{
pCond
->
twindows
.
skey
=
pCond
->
twindows
.
ekey
;
pCond
->
twindows
.
ekey
=
INT64_MAX
;
pCond
->
order
=
TSDB_ORDER_ASC
;
}
code
=
tsdbReaderCreate
(
pVnode
,
pCond
,
&
pReader
->
innerReader
[
0
],
1
,
idstr
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_err
;
}
if
(
order
==
TSDB_ORDER_ASC
)
{
pCond
->
twindows
.
skey
=
w
.
ekey
;
pCond
->
twindows
.
ekey
=
INT64_MAX
;
}
else
{
pCond
->
twindows
.
skey
=
INT64_MIN
;
pCond
->
twindows
.
ekey
=
w
.
ekey
;
}
code
=
tsdbReaderCreate
(
pVnode
,
pCond
,
&
pReader
->
innerReader
[
1
],
1
,
idstr
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_err
;
}
}
if
(
pCond
->
suid
!=
0
)
{
pReader
->
pSchema
=
metaGetTbTSchema
(
pReader
->
pTsdb
->
pVnode
->
pMeta
,
pReader
->
suid
,
-
1
);
}
else
if
(
taosArrayGetSize
(
pTableList
)
>
0
)
{
STableKeyInfo
*
pKey
=
taosArrayGet
(
pTableList
,
0
);
pReader
->
pSchema
=
metaGetTbTSchema
(
pReader
->
pTsdb
->
pVnode
->
pMeta
,
pKey
->
uid
,
-
1
);
}
int32_t
numOfTables
=
taosArrayGetSize
(
pTableList
);
pReader
->
status
.
pTableMap
=
createDataBlockScanInfo
(
pReader
,
pTableList
->
pData
,
numOfTables
);
if
(
pReader
->
status
.
pTableMap
==
NULL
)
{
...
...
@@ -2831,21 +2456,41 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
goto
_err
;
}
SDataBlockIter
*
pBlockIter
=
&
pReader
->
status
.
blockIter
;
code
=
tsdbTakeReadSnap
(
pReader
->
pTsdb
,
&
pReader
->
pReadSnap
);
if
(
code
)
goto
_err
;
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_err
;
}
i
nitFilesetIterator
(
&
pReader
->
status
.
fileIter
,
(
*
ppReader
)
->
pReadSnap
->
fs
.
aDFileSet
,
pReader
->
order
,
pReader
->
idStr
);
resetDataBlockIterator
(
&
pReader
->
status
.
blockIter
,
pReader
->
order
)
;
i
f
(
pReader
->
type
==
TIMEWINDOW_RANGE_CONTAINED
)
{
SDataBlockIter
*
pBlockIter
=
&
pReader
->
status
.
blockIter
;
// no data in files, let's try buffer in memory
if
(
pReader
->
status
.
fileIter
.
numOfFiles
==
0
)
{
pReader
->
status
.
loadFromFile
=
false
;
initFilesetIterator
(
&
pReader
->
status
.
fileIter
,
pReader
->
pReadSnap
->
fs
.
aDFileSet
,
pReader
->
order
,
pReader
->
idStr
);
resetDataBlockIterator
(
&
pReader
->
status
.
blockIter
,
pReader
->
order
);
// no data in files, let's try buffer in memory
if
(
pReader
->
status
.
fileIter
.
numOfFiles
==
0
)
{
pReader
->
status
.
loadFromFile
=
false
;
}
else
{
code
=
initForFirstBlockInFile
(
pReader
,
pBlockIter
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
}
}
else
{
code
=
initForFirstBlockInFile
(
pReader
,
pBlockIter
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
STsdbReader
*
pPrevReader
=
pReader
->
innerReader
[
0
];
SDataBlockIter
*
pBlockIter
=
&
pPrevReader
->
status
.
blockIter
;
initFilesetIterator
(
&
pPrevReader
->
status
.
fileIter
,
pPrevReader
->
pReadSnap
->
fs
.
aDFileSet
,
pPrevReader
->
order
,
pPrevReader
->
idStr
);
resetDataBlockIterator
(
&
pPrevReader
->
status
.
blockIter
,
pPrevReader
->
order
);
// no data in files, let's try buffer in memory
if
(
pPrevReader
->
status
.
fileIter
.
numOfFiles
==
0
)
{
pPrevReader
->
status
.
loadFromFile
=
false
;
}
else
{
code
=
initForFirstBlockInFile
(
pPrevReader
,
pBlockIter
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
}
}
...
...
@@ -2885,20 +2530,6 @@ void tsdbReaderClose(STsdbReader* pReader) {
tsdbDataFReaderClose
(
&
pReader
->
pFileReader
);
}
#if 0
// if (pReader->status.pTableScanInfo != NULL) {
// pReader->status.pTableScanInfo = destroyTableCheckInfo(pReader->status.pTableScanInfo);
// }
// tsdbDestroyReadH(&pReader->rhelper);
// tdFreeDataCols(pReader->pDataCols);
// pReader->pDataCols = NULL;
//
// pReader->prev = doFreeColumnInfoData(pReader->prev);
// pReader->next = doFreeColumnInfoData(pReader->next);
#endif
SIOCostSummary
*
pCost
=
&
pReader
->
cost
;
tsdbDebug
(
"%p :io-cost summary: head-file read cnt:%"
PRIu64
", head-file time:%"
PRIu64
" us, statis-info:%"
PRId64
...
...
@@ -2911,55 +2542,100 @@ void tsdbReaderClose(STsdbReader* pReader) {
taosMemoryFreeClear
(
pReader
);
}
bool
tsdbNextDataBlock
(
STsdbReader
*
pReader
)
{
if
(
isEmptyQueryTimeWindow
(
&
pReader
->
window
))
{
return
false
;
}
static
bool
doTsdbNextDataBlock
(
STsdbReader
*
pReader
)
{
// cleanup the data that belongs to the previous data block
SSDataBlock
*
pBlock
=
pReader
->
pResBlock
;
blockDataCleanup
(
pBlock
);
int64_t
stime
=
taosGetTimestampUs
();
int64_t
elapsedTime
=
stime
;
SReaderStatus
*
pStatus
=
&
pReader
->
status
;
if
(
pReader
->
type
==
BLOCK_LOAD_OFFSET_ORDER
)
{
if
(
pStatus
->
loadFromFile
)
{
int32_t
code
=
buildBlockFromFiles
(
pReader
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
false
;
}
if
(
pStatus
->
loadFromFile
)
{
int32_t
code
=
buildBlockFromFiles
(
pReader
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
false
;
}
if
(
pBlock
->
info
.
rows
>
0
)
{
return
true
;
}
else
{
buildBlockFromBufferSequentially
(
pReader
);
return
pBlock
->
info
.
rows
>
0
;
}
}
else
{
// no data in files, let's try the buffer
if
(
pBlock
->
info
.
rows
>
0
)
{
return
true
;
}
else
{
buildBlockFromBufferSequentially
(
pReader
);
return
pBlock
->
info
.
rows
>
0
;
}
}
else
if
(
pReader
->
type
==
BLOCK_LOAD_TABLESEQ_ORDER
)
{
}
else
if
(
pReader
->
type
==
BLOCK_LOAD_EXTERN_ORDER
)
{
}
else
{
ASSERT
(
0
);
}
else
{
// no data in files, let's try the buffer
buildBlockFromBufferSequentially
(
pReader
);
return
pBlock
->
info
.
rows
>
0
;
}
return
false
;
}
void
tsdbRetrieveDataBlockInfo
(
STsdbReader
*
pReader
,
SDataBlockInfo
*
pDataBlockInfo
)
{
bool
tsdbNextDataBlock
(
STsdbReader
*
pReader
)
{
if
(
isEmptyQueryTimeWindow
(
&
pReader
->
window
))
{
return
false
;
}
if
(
pReader
->
innerReader
[
0
]
!=
NULL
)
{
bool
ret
=
doTsdbNextDataBlock
(
pReader
->
innerReader
[
0
]);
if
(
ret
)
{
pReader
->
step
=
EXTERNAL_ROWS_PREV
;
return
ret
;
}
tsdbReaderClose
(
pReader
->
innerReader
[
0
]);
pReader
->
innerReader
[
0
]
=
NULL
;
}
pReader
->
step
=
EXTERNAL_ROWS_MAIN
;
bool
ret
=
doTsdbNextDataBlock
(
pReader
);
if
(
ret
)
{
return
ret
;
}
if
(
pReader
->
innerReader
[
1
]
!=
NULL
)
{
bool
ret1
=
doTsdbNextDataBlock
(
pReader
->
innerReader
[
1
]);
if
(
ret1
)
{
pReader
->
step
=
EXTERNAL_ROWS_NEXT
;
return
ret1
;
}
tsdbReaderClose
(
pReader
->
innerReader
[
1
]);
pReader
->
innerReader
[
1
]
=
NULL
;
}
return
false
;
}
static
void
setBlockInfo
(
STsdbReader
*
pReader
,
SDataBlockInfo
*
pDataBlockInfo
)
{
ASSERT
(
pDataBlockInfo
!=
NULL
&&
pReader
!=
NULL
);
pDataBlockInfo
->
rows
=
pReader
->
pResBlock
->
info
.
rows
;
pDataBlockInfo
->
uid
=
pReader
->
pResBlock
->
info
.
uid
;
pDataBlockInfo
->
window
=
pReader
->
pResBlock
->
info
.
window
;
}
void
tsdbRetrieveDataBlockInfo
(
STsdbReader
*
pReader
,
SDataBlockInfo
*
pDataBlockInfo
)
{
if
(
pReader
->
type
==
TIMEWINDOW_RANGE_EXTERNAL
)
{
if
(
pReader
->
step
==
EXTERNAL_ROWS_MAIN
)
{
setBlockInfo
(
pReader
,
pDataBlockInfo
);
}
else
if
(
pReader
->
step
==
EXTERNAL_ROWS_PREV
)
{
setBlockInfo
(
pReader
->
innerReader
[
0
],
pDataBlockInfo
);
}
else
{
setBlockInfo
(
pReader
->
innerReader
[
1
],
pDataBlockInfo
);
}
}
else
{
setBlockInfo
(
pReader
,
pDataBlockInfo
);
}
}
int32_t
tsdbRetrieveDatablockSMA
(
STsdbReader
*
pReader
,
SColumnDataAgg
***
pBlockStatis
,
bool
*
allHave
)
{
int32_t
code
=
0
;
*
allHave
=
false
;
if
(
pReader
->
type
==
TIMEWINDOW_RANGE_EXTERNAL
)
{
*
pBlockStatis
=
NULL
;
return
TSDB_CODE_SUCCESS
;
}
// there is no statistics data for composed block
if
(
pReader
->
status
.
composedDataBlock
)
{
*
pBlockStatis
=
NULL
;
...
...
@@ -3029,7 +2705,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS
return
code
;
}
SArray
*
tsdbRetrieveDataBlock
(
STsdbReader
*
pReader
,
SArray
*
pIdList
)
{
static
SArray
*
doRetrieveDataBlock
(
STsdbReader
*
pReader
)
{
SReaderStatus
*
pStatus
=
&
pReader
->
status
;
if
(
pStatus
->
composedDataBlock
)
{
...
...
@@ -3058,16 +2734,27 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
return
pReader
->
pResBlock
->
pDataBlock
;
}
SArray
*
tsdbRetrieveDataBlock
(
STsdbReader
*
pReader
,
SArray
*
pIdList
)
{
if
(
pReader
->
type
==
TIMEWINDOW_RANGE_EXTERNAL
)
{
if
(
pReader
->
step
==
EXTERNAL_ROWS_PREV
)
{
return
doRetrieveDataBlock
(
pReader
->
innerReader
[
0
]);
}
else
if
(
pReader
->
step
==
EXTERNAL_ROWS_NEXT
)
{
return
doRetrieveDataBlock
(
pReader
->
innerReader
[
1
]);
}
}
return
doRetrieveDataBlock
(
pReader
);
}
int32_t
tsdbReaderReset
(
STsdbReader
*
pReader
,
SQueryTableDataCond
*
pCond
)
{
if
(
isEmptyQueryTimeWindow
(
&
pReader
->
window
))
{
return
TSDB_CODE_SUCCESS
;
}
pReader
->
order
=
pCond
->
order
;
pReader
->
type
=
BLOCK_LOAD_OFFSET_ORDER
;
pReader
->
type
=
TIMEWINDOW_RANGE_CONTAINED
;
pReader
->
status
.
loadFromFile
=
true
;
pReader
->
status
.
pTableIter
=
NULL
;
pReader
->
window
=
updateQueryTimeWindow
(
pReader
->
pTsdb
,
&
pCond
->
twindows
);
// allocate buffer in order to load data blocks from file
...
...
@@ -3077,10 +2764,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
pReader
->
suppInfo
.
tsColAgg
.
colId
=
PRIMARYKEY_TIMESTAMP_COL_ID
;
tsdbDataFReaderClose
(
&
pReader
->
pFileReader
);
// todo set the correct numOfTables
int32_t
numOfTables
=
1
;
SDataBlockIter
*
pBlockIter
=
&
pReader
->
status
.
blockIter
;
int32_t
numOfTables
=
taosHashGetSize
(
pReader
->
status
.
pTableMap
);
tsdbDataFReaderClose
(
&
pReader
->
pFileReader
);
initFilesetIterator
(
&
pReader
->
status
.
fileIter
,
pReader
->
pReadSnap
->
fs
.
aDFileSet
,
pReader
->
order
,
pReader
->
idStr
);
...
...
@@ -3088,18 +2772,23 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
resetDataBlockScanInfo
(
pReader
->
status
.
pTableMap
);
int32_t
code
=
0
;
SDataBlockIter
*
pBlockIter
=
&
pReader
->
status
.
blockIter
;
// no data in files, let's try buffer in memory
if
(
pReader
->
status
.
fileIter
.
numOfFiles
==
0
)
{
pReader
->
status
.
loadFromFile
=
false
;
}
else
{
code
=
initForFirstBlockInFile
(
pReader
,
pBlockIter
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tsdbError
(
"%p reset reader failed, numOfTables:%d, query range:%"
PRId64
" - %"
PRId64
" in query %s"
,
pReader
,
numOfTables
,
pReader
->
window
.
skey
,
pReader
->
window
.
ekey
,
pReader
->
idStr
);
return
code
;
}
}
tsdbDebug
(
"%p reset reader, suid:%"
PRIu64
", numOfTables:%d, query range:%"
PRId64
" - %"
PRId64
" in query %s"
,
pReader
,
pReader
->
suid
,
numOfTables
,
pReader
->
window
.
skey
,
pReader
->
window
.
ekey
,
pReader
->
idStr
);
return
code
;
}
...
...
@@ -3190,7 +2879,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
STbData
*
d
=
NULL
;
if
(
pReader
->
pTsdb
->
mem
!=
NULL
)
{
tsdbGetTbDataFromMemTable
(
pReader
->
p
Tsdb
->
m
em
,
pReader
->
suid
,
pBlockScanInfo
->
uid
,
&
d
);
tsdbGetTbDataFromMemTable
(
pReader
->
p
ReadSnap
->
pM
em
,
pReader
->
suid
,
pBlockScanInfo
->
uid
,
&
d
);
if
(
d
!=
NULL
)
{
rows
+=
tsdbGetNRowsInTbData
(
d
);
}
...
...
@@ -3198,7 +2887,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
STbData
*
di
=
NULL
;
if
(
pReader
->
pTsdb
->
imem
!=
NULL
)
{
tsdbGetTbDataFromMemTable
(
pReader
->
p
Tsdb
->
im
em
,
pReader
->
suid
,
pBlockScanInfo
->
uid
,
&
di
);
tsdbGetTbDataFromMemTable
(
pReader
->
p
ReadSnap
->
pIM
em
,
pReader
->
suid
,
pBlockScanInfo
->
uid
,
&
di
);
if
(
di
!=
NULL
)
{
rows
+=
tsdbGetNRowsInTbData
(
di
);
}
...
...
source/libs/executor/inc/executil.h
浏览文件 @
9bddf99d
...
...
@@ -82,8 +82,6 @@ size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
void
initResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
);
void
cleanupResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
);
void
closeAllResultRows
(
SResultRowInfo
*
pResultRowInfo
);
void
initResultRow
(
SResultRow
*
pResultRow
);
void
closeResultRow
(
SResultRow
*
pResultRow
);
bool
isResultRowClosed
(
SResultRow
*
pResultRow
);
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
9bddf99d
...
...
@@ -108,7 +108,6 @@ typedef struct STaskCostInfo {
SFileBlockLoadRecorder
*
pRecoder
;
uint64_t
elapsedTime
;
uint64_t
firstStageMergeTime
;
uint64_t
winInfoSize
;
uint64_t
tableInfoSize
;
uint64_t
hashSize
;
...
...
@@ -549,6 +548,7 @@ typedef struct SProjectOperatorInfo {
SLimitInfo
limitInfo
;
bool
mergeDataBlocks
;
SSDataBlock
*
pFinalRes
;
SNode
*
pCondition
;
}
SProjectOperatorInfo
;
typedef
struct
SIndefOperatorInfo
{
...
...
source/libs/executor/src/executil.c
浏览文件 @
9bddf99d
...
...
@@ -43,10 +43,6 @@ void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo) {
}
}
void
closeAllResultRows
(
SResultRowInfo
*
pResultRowInfo
)
{
// do nothing
}
bool
isResultRowClosed
(
SResultRow
*
pRow
)
{
return
(
pRow
->
closed
==
true
);
}
void
closeResultRow
(
SResultRow
*
pResultRow
)
{
pResultRow
->
closed
=
true
;
}
...
...
@@ -160,11 +156,13 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
SArray
*
createSortInfo
(
SNodeList
*
pNodeList
)
{
size_t
numOfCols
=
0
;
if
(
pNodeList
!=
NULL
)
{
numOfCols
=
LIST_LENGTH
(
pNodeList
);
}
else
{
numOfCols
=
0
;
}
SArray
*
pList
=
taosArrayInit
(
numOfCols
,
sizeof
(
SBlockOrderInfo
));
if
(
pList
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -196,10 +194,6 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) {
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SSlotDescNode
*
pDescNode
=
(
SSlotDescNode
*
)
nodesListGetNode
(
pNode
->
pSlots
,
i
);
/*if (!pDescNode->output) { // todo disable it temporarily*/
/*continue;*/
/*}*/
SColumnInfoData
idata
=
createColumnInfoData
(
pDescNode
->
dataType
.
type
,
pDescNode
->
dataType
.
bytes
,
pDescNode
->
slotId
);
idata
.
info
.
scale
=
pDescNode
->
dataType
.
scale
;
...
...
@@ -701,9 +695,6 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu
}
}
#ifdef BUF_PAGE_DEBUG
qDebug
(
"page_setSelect num:%d"
,
num
);
#endif
if
(
p
!=
NULL
)
{
p
->
subsidiaries
.
pCtx
=
pValCtx
;
p
->
subsidiaries
.
num
=
num
;
...
...
@@ -852,7 +843,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
// TODO: get it from stable scan node
pCond
->
twindows
=
pTableScanNode
->
scanRange
;
pCond
->
suid
=
pTableScanNode
->
scan
.
suid
;
pCond
->
type
=
BLOCK_LOAD_OFFSET_ORDER
;
pCond
->
type
=
TIMEWINDOW_RANGE_CONTAINED
;
pCond
->
startVersion
=
-
1
;
pCond
->
endVersion
=
-
1
;
// pCond->type = pTableScanNode->scanFlag;
...
...
@@ -947,6 +938,7 @@ STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInter
}
// get the correct time window according to the handled timestamp
// todo refactor
STimeWindow
getActiveTimeWindow
(
SDiskbasedBuf
*
pBuf
,
SResultRowInfo
*
pResultRowInfo
,
int64_t
ts
,
SInterval
*
pInterval
,
int32_t
order
)
{
STimeWindow
w
=
{
0
};
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
9bddf99d
...
...
@@ -1665,9 +1665,6 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) {
// hashSize += taosHashGetMemSize(pRuntimeEnv->tableqinfoGroupInfo.map);
// pSummary->hashSize = hashSize;
// add the merge time
pSummary
->
elapsedTime
+=
pSummary
->
firstStageMergeTime
;
// SResultRowPool* p = pTaskInfo->pool;
// if (p != NULL) {
// pSummary->winInfoSize = getResultRowPoolMemSize(p);
...
...
@@ -1676,17 +1673,16 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) {
// pSummary->winInfoSize = 0;
// pSummary->numOfTimeWindows = 0;
// }
//
// calculateOperatorProfResults(pQInfo);
SFileBlockLoadRecorder
*
pRecorder
=
pSummary
->
pRecoder
;
if
(
pSummary
->
pRecoder
!=
NULL
)
{
qDebug
(
"%s :cost summary: elapsed time:%"
PRId64
" us, first merge:%"
PRId64
" us, total blocks:%d,
"
"load block statis:%d, load data block:%d, total rows:%"
PRId64
", check rows:%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pSummary
->
elapsedTime
,
pSummary
->
firstStageMergeTime
,
pRecorder
->
totalBlock
s
,
pRecorder
->
loadBlockStatis
,
pRecorder
->
loadBlocks
,
pRecorder
->
totalRows
,
pRecorder
->
totalCheckedRows
);
qDebug
(
"%s :cost summary: elapsed time:%.2f ms, total blocks:%d, load block SMA:%d, load data block:%d, total rows:%
"
PRId64
", check rows:%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pSummary
->
elapsedTime
/
1000
.
0
,
pRecorder
->
totalBlocks
,
pRecorder
->
loadBlockStatis
,
pRecorder
->
loadBlocks
,
pRecorder
->
totalRow
s
,
pRecorder
->
totalCheckedRows
);
}
// qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb,
// hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0,
// pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0);
...
...
@@ -3031,7 +3027,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
}
}
closeAllResultRows
(
&
pAggInfo
->
binfo
.
resultRowInfo
);
initGroupedResultInfo
(
&
pAggInfo
->
groupResInfo
,
pAggInfo
->
aggSup
.
pResultRowHashTable
,
0
);
OPTR_SET_OPENED
(
pOperator
);
...
...
@@ -3400,27 +3395,33 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
break
;
}
// no results generated
if
(
pInfo
->
pRes
->
info
.
rows
==
0
||
(
!
pProjectInfo
->
mergeDataBlocks
))
{
break
;
}
if
(
pProjectInfo
->
mergeDataBlocks
)
{
pFinalRes
->
info
.
groupId
=
pInfo
->
pRes
->
info
.
groupId
;
pFinalRes
->
info
.
version
=
pInfo
->
pRes
->
info
.
version
;
if
(
pRes
->
info
.
rows
>
0
)
{
pFinalRes
->
info
.
groupId
=
pRes
->
info
.
groupId
;
pFinalRes
->
info
.
version
=
pRes
->
info
.
version
;
// continue merge data, ignore the group id
blockDataMerge
(
pFinalRes
,
pInfo
->
pRes
);
// continue merge data, ignore the group id
blockDataMerge
(
pFinalRes
,
pRes
);
if
(
pFinalRes
->
info
.
rows
+
pRes
->
info
.
rows
<=
pOperator
->
resultInfo
.
threshold
)
{
continue
;
}
}
if
(
pFinalRes
->
info
.
rows
+
pInfo
->
pRes
->
info
.
rows
<=
pOperator
->
resultInfo
.
threshold
)
{
continue
;
// do apply filter
doFilter
(
pProjectInfo
->
pFilterNode
,
pFinalRes
,
NULL
);
if
(
pFinalRes
->
info
.
rows
>
0
||
pRes
->
info
.
rows
==
0
)
{
break
;
}
}
else
{
// do apply filter
if
(
pRes
->
info
.
rows
>
0
)
{
doFilter
(
pProjectInfo
->
pFilterNode
,
pRes
,
NULL
);
if
(
pRes
->
info
.
rows
==
0
)
{
continue
;
}
}
}
// do apply filter
SSDataBlock
*
p
=
pProjectInfo
->
mergeDataBlocks
?
pFinalRes
:
pRes
;
doFilter
(
pProjectInfo
->
pFilterNode
,
p
,
NULL
);
if
(
p
->
info
.
rows
>
0
)
{
// no results generated
break
;
}
}
...
...
@@ -3884,8 +3885,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
initLimitInfo
(
pProjPhyNode
->
node
.
pLimit
,
pProjPhyNode
->
node
.
pSlimit
,
&
pInfo
->
limitInfo
);
pInfo
->
binfo
.
pRes
=
pResBlock
;
pInfo
->
pFinalRes
=
createOneDataBlock
(
pResBlock
,
false
);
pInfo
->
pFinalRes
=
createOneDataBlock
(
pResBlock
,
false
);
pInfo
->
pFilterNode
=
pProjPhyNode
->
node
.
pConditions
;
pInfo
->
mergeDataBlocks
=
pProjPhyNode
->
mergeDataBlock
;
...
...
@@ -4416,7 +4416,7 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC
pCond
->
twindows
=
(
STimeWindow
){.
skey
=
INT64_MIN
,
.
ekey
=
INT64_MAX
};
pCond
->
suid
=
uid
;
pCond
->
type
=
BLOCK_LOAD_OFFSET_ORDER
;
pCond
->
type
=
TIMEWINDOW_RANGE_CONTAINED
;
pCond
->
startVersion
=
-
1
;
pCond
->
endVersion
=
-
1
;
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
9bddf99d
...
...
@@ -1057,7 +1057,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
hashIntervalAgg
(
pOperator
,
&
pInfo
->
binfo
.
resultRowInfo
,
pBlock
,
scanFlag
,
NULL
);
}
closeAllResultRows
(
&
pInfo
->
binfo
.
resultRowInfo
);
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
pInfo
->
order
);
OPTR_SET_OPENED
(
pOperator
);
...
...
@@ -1213,7 +1212,6 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
pOperator
->
cost
.
openCost
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
pOperator
->
status
=
OP_RES_TO_RETURN
;
closeAllResultRows
(
&
pBInfo
->
resultRowInfo
);
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
TSDB_ORDER_ASC
);
blockDataEnsureCapacity
(
pBInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
...
...
@@ -2008,7 +2006,6 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
// restore the value
pOperator
->
status
=
OP_RES_TO_RETURN
;
closeAllResultRows
(
&
pBInfo
->
resultRowInfo
);
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
TSDB_ORDER_ASC
);
blockDataEnsureCapacity
(
pBInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
...
...
@@ -2172,8 +2169,6 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
SSDataBlock
*
pResBlock
=
pSliceInfo
->
pRes
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
blockDataEnsureCapacity
(
pResBlock
,
pOperator
->
resultInfo
.
capacity
);
// if (pOperator->status == OP_RES_TO_RETURN) {
// // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
// if (pResBlock->info.rows == 0 || !hasDataInGroupInfo(&pSliceInfo->groupResInfo)) {
...
...
@@ -2313,10 +2308,10 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
initResultSizeInfo
(
&
pOperator
->
resultInfo
,
4096
);
pInfo
->
pFillColInfo
=
createFillColInfo
(
pExprInfo
,
numOfExprs
,
(
SNodeListNode
*
)
pInterpPhyNode
->
pFillValues
);
pInfo
->
pRes
=
createResDataBlock
(
pPhyNode
->
pOutputDataBlockDesc
);
pInfo
->
win
=
pInterpPhyNode
->
timeRange
;
pInfo
->
pRes
=
createResDataBlock
(
pPhyNode
->
pOutputDataBlockDesc
);
pInfo
->
win
=
pInterpPhyNode
->
timeRange
;
pInfo
->
interval
.
interval
=
pInterpPhyNode
->
interval
;
pInfo
->
current
=
pInfo
->
win
.
skey
;
pInfo
->
current
=
pInfo
->
win
.
skey
;
pOperator
->
name
=
"TimeSliceOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录