Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
dbd8c300
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
dbd8c300
编写于
6月 27, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor: do tsdbread refactor.
上级
ce15ee0e
变更
5
显示空白变更内容
内联
并排
Showing
5 changed file
with
989 addition
and
783 deletion
+989
-783
include/common/tcommon.h
include/common/tcommon.h
+5
-5
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+4
-5
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+974
-760
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+1
-12
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+5
-1
未找到文件。
include/common/tcommon.h
浏览文件 @
dbd8c300
...
...
@@ -101,17 +101,17 @@ typedef struct SColumnInfoData {
}
SColumnInfoData
;
typedef
struct
SQueryTableDataCond
{
// STimeWindow twindow;
uint64_t
suid
;
int32_t
order
;
// desc|asc order to iterate the data block
int32_t
numOfCols
;
SColumnInfo
*
colList
;
bool
loadExternalRows
;
// load external rows or not
int32_t
type
;
// data block load type:
int32_t
numOfTWindows
;
STimeWindow
*
twindows
;
int64_t
startVersion
;
int64_t
endVersion
;
int32_t
numOfTables
;
// number of tables
uint64_t
*
uidList
;
// table uid list
int64_t
startVersion
;
// start version
int64_t
endVersion
;
// end version
}
SQueryTableDataCond
;
void
*
blockDataDestroy
(
SSDataBlock
*
pBlock
);
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
dbd8c300
...
...
@@ -116,12 +116,11 @@ int32_t metaTbCursorNext(SMTbCursor *pTbCur);
// typedef struct STsdb STsdb;
typedef
struct
STsdbReader
STsdbReader
;
#define BLOCK_LOAD_OFFSET_
SEQ_ORDER
1
#define BLOCK_LOAD_TABLE
_
SEQ_ORDER 2
#define BLOCK_LOAD_
TABLE_RR_ORDER
3
#define BLOCK_LOAD_OFFSET_
ORDER
1
#define BLOCK_LOAD_TABLESEQ_ORDER 2
#define BLOCK_LOAD_
EXTERN_ORDER
3
int32_t
tsdbReaderOpen
(
SVnode
*
pVnode
,
SQueryTableDataCond
*
pCond
,
STableListInfo
*
tableInfoGroup
,
uint64_t
qId
,
uint64_t
taskId
,
STsdbReader
**
ppReader
);
int32_t
tsdbReaderOpen
(
SVnode
*
pVnode
,
SQueryTableDataCond
*
pCond
,
STableListInfo
*
pTableList
,
uint64_t
qId
,
uint64_t
taskId
,
STsdbReader
**
ppReader
);
void
tsdbReaderClose
(
STsdbReader
*
pReader
);
bool
tsdbNextDataBlock
(
STsdbReader
*
pReader
);
void
tsdbRetrieveDataBlockInfo
(
STsdbReader
*
pReader
,
SDataBlockInfo
*
pDataBlockInfo
);
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
dbd8c300
...
...
@@ -14,7 +14,6 @@
*/
#include "tsdb.h"
#define EXTRA_BYTES 2
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
#define QH_GET_NUM_OF_COLS(handle) ((size_t)(taosArrayGetSize((handle)->pColumns)))
...
...
@@ -24,11 +23,6 @@
.rows = (_block)->numOfRows, \
.uid = (_checkInfo)->tableId})
enum
{
TSDB_QUERY_TYPE_ALL
=
1
,
TSDB_QUERY_TYPE_LAST
=
2
,
};
enum
{
TSDB_CACHED_TYPE_NONE
=
0
,
TSDB_CACHED_TYPE_LASTROW
=
1
,
...
...
@@ -41,7 +35,7 @@ typedef struct SQueryFilePos {
int32_t
pos
;
int64_t
lastKey
;
int32_t
rows
;
bool
mix
Block
;
bool
composed
Block
;
bool
blockCompleted
;
STimeWindow
win
;
}
SQueryFilePos
;
...
...
@@ -53,39 +47,37 @@ typedef struct SDataBlockLoadInfo {
SArray
*
pLoadedCols
;
}
SDataBlockLoadInfo
;
typedef
struct
SLoadCompBlockInfo
{
int32_t
tid
;
/* table tid */
int32_t
fileId
;
}
SLoadCompBlockInfo
;
enum
{
CHECKINFO_CHOSEN_MEM
=
0
,
CHECKINFO_CHOSEN_IMEM
=
1
,
CHECKINFO_CHOSEN_BOTH
=
2
// for update=2(merge case)
};
typedef
struct
STableCheckInfo
{
uint64_t
suid
;
uint64_t
tableId
;
typedef
struct
STableBlockScanInfo
{
uint64_t
uid
;
TSKEY
lastKey
;
SBlockIdx
blockIdx
;
SArray
*
pBlockList
;
// block data index list
// SBlockInfo* pCompInfo;
int32_t
compSize
;
int32_t
numOfBlocks
:
29
;
// number of qualified data blocks not the original blocks
//
int32_t compSize;
//
int32_t numOfBlocks : 29; // number of qualified data blocks not the original blocks
uint8_t
chosen
:
2
;
// indicate which iterator should move forward
bool
initBuf
:
1
;
// whether to initialize the in-memory skip list iterator or not
STbDataIter
*
iter
;
// mem buffer skip list iterator
STbDataIter
*
iiter
;
// imem buffer skip list iterator
}
STableCheckInfo
;
typedef
struct
STableBlockInfo
{
SBlock
*
compBlock
;
STableCheckInfo
*
pTableCheckInfo
;
}
STableBlockInfo
;
bool
iterInit
;
// whether to initialize the in-memory skip list iterator or not
STbDataIter
iter
;
// mem buffer skip list iterator
STbDataIter
iiter
;
// imem buffer skip list iterator
bool
memHasVal
;
bool
imemHasVal
;
}
STableBlockScanInfo
;
typedef
struct
SBlockOrderWrapper
{
int64_t
uid
;
SBlock
*
pBlock
;
}
SBlockOrderWrapper
;
typedef
struct
SBlockOrderSupporter
{
int32_t
numOfTables
;
S
TableBlockInfo
**
pDataBlockInfo
;
int32_t
*
blockIndexArray
;
S
BlockOrderWrapper
**
pDataBlockInfo
;
int32_t
*
indexPerTable
;
int32_t
*
numOfBlocksPerTable
;
}
SBlockOrderSupporter
;
...
...
@@ -100,47 +92,92 @@ typedef struct SIOCostSummary {
typedef
struct
SBlockLoadSuppInfo
{
SColumnDataAgg
*
pstatis
;
SColumnDataAgg
**
plist
;
SArray
*
defaultLoadColumn
;
// default load column
int16_t
*
colIdList
;
// default load column
int32_t
*
slotIds
;
// colId to slotId
}
SBlockLoadSuppInfo
;
typedef
struct
SFileSetIter
{
int32_t
numOfFiles
;
// number of total files
int32_t
index
;
// current accessed index in the list
SArray
*
pFileList
;
// data file list
}
SFileSetIter
;
typedef
struct
SFileDataBlockInfo
{
int32_t
tbBlockIdx
;
// index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
int64_t
uid
;
}
SFileDataBlockInfo
;
typedef
struct
SDataBlockIter
{
int32_t
numOfBlocks
;
int32_t
index
;
SArray
*
blockList
;
// SArray<SFileDataBlockInfo>
}
SDataBlockIter
;
typedef
struct
SFileBlockDumpInfo
{
int32_t
totalRows
;
int32_t
rowIndex
;
int64_t
lastKey
;
}
SFileBlockDumpInfo
;
typedef
struct
SComposedDataBlock
{
bool
composed
;
int32_t
rows
;
}
SComposedDataBlock
;
typedef
struct
SReaderStatus
{
SQueryFilePos
cur
;
// current position
int32_t
tableListIndex
;
bool
loadFromFile
;
// check file stage
bool
initStartPos
;
SHashObj
*
pTableMap
;
// SHash<STableBlockScanInfo>
int32_t
realNumOfRows
;
SFileBlockDumpInfo
fBlockDumpInfo
;
SBlockData
fileBlockData
;
SFileSetIter
fileIter
;
SDataBlockIter
blockIter
;
}
SReaderStatus
;
struct
STsdbReader
{
STsdb
*
pTsdb
;
uint64_t
suid
;
int16_t
order
;
SQueryFilePos
cur
;
// current position
STimeWindow
window
;
// the primary query time window that applies to all queries
// SColumnDataAgg* statis; // query level statistics, only one table block statistics info exists at any time
// SColumnDataAgg** pstatis;// the ptr array list to return to caller
int32_t
numOfBlocks
;
SArray
*
pColumns
;
// SArray<SColumnInfoData>
bool
locateStart
;
int32_t
outputCapacity
;
int32_t
realNumOfRows
;
SArray
*
pTableCheckInfo
;
// SArray<STableCheckInfo>
int32_t
activeIndex
;
bool
checkFiles
;
// check file stage
int8_t
cachelastrow
;
// check if last row cached
bool
loadExternalRow
;
// load time window external data rows
bool
currentLoadExternalRows
;
// current load external rows
int32_t
loadType
;
// block load type
SSDataBlock
*
pResBlock
;
int32_t
capacity
;
SReaderStatus
status
;
char
*
idStr
;
// query info handle, for debug purpose
int32_t
type
;
// query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows
SDFileSet
*
pFileGroup
;
// SFSIter fileIter;
// SReadH rhelper;
STableBlockInfo
*
pDataBlockInfo
;
SDataCols
*
pDataCols
;
// in order to hold current file data block
int32_t
allocSize
;
// allocated data block size
SDataBlockLoadInfo
dataBlockLoadInfo
;
/* record current block load information */
SLoadCompBlockInfo
compBlockLoadInfo
;
/* record current compblock information in SQueryAttr */
int32_t
type
;
// query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
SBlockLoadSuppInfo
suppInfo
;
SArray
*
prev
;
// previous row which is before than time window
SArray
*
next
;
// next row which is after the query time window
SIOCostSummary
cost
;
STSchema
*
pSchema
;
SDataFReader
*
pFileReader
;
int64_t
startVersion
;
int64_t
endVersion
;
#if 0
SFileBlockInfo* pDataBlockInfo;
SDataCols* pDataCols; // in order to hold current file data block
int32_t allocSize; // allocated data block size
SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQueryAttr */
// SDFileSet* pFileGroup;
// SFSIter fileIter;
// SReadH rhelper;
// SColumnDataAgg* statis; // query level statistics, only one table block statistics info exists at any time
// SColumnDataAgg** pstatis;// the ptr array list to return to caller
#endif
};
static
SFileDataBlockInfo
*
getCurrentBlockInfo
(
SDataBlockIter
*
pBlockIter
);
static
int
tsdbReadRowsFromCache
(
STableBlockScanInfo
*
pScanInfo
,
TSDBKEY
maxKey
,
int32_t
capacity
,
STsdbReader
*
pReader
);
static
TSDBROW
*
getValidRow
(
STbDataIter
*
pIter
,
bool
*
hasVal
,
STsdbReader
*
pReader
);
static
int32_t
doLoadRowsOfIdenticalTsInFileBlock
(
SBlockData
*
pData
,
SFileBlockDumpInfo
*
pDumpInfo
,
int64_t
ts
,
SRowMerger
*
pMerger
,
STsdbReader
*
pReader
,
STSRow
**
pRow
);
// static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
// pBlockLoadInfo->slot = -1;
// pBlockLoadInfo->uid = 0;
...
...
@@ -152,70 +189,50 @@ struct STsdbReader {
// pCompBlockLoadInfo->fileId = -1;
// }
// static SArray* getColumnIdList(STsdbReader* pTsdbReadHandle) {
// size_t numOfCols = QH_GET_NUM_OF_COLS(pTsdbReadHandle);
// assert(numOfCols <= TSDB_MAX_COLUMNS);
// SArray* pIdList = taosArrayInit(numOfCols, sizeof(int16_t));
// for (int32_t i = 0; i < numOfCols; ++i) {
// SColumnInfoData* pCol = taosArrayGet(pTsdbReadHandle->pColumns, i);
// taosArrayPush(pIdList, &pCol->info.colId);
// }
// return pIdList;
// }
// static SArray* getDefaultLoadColumns(STsdbReader* pTsdbReadHandle, bool loadTS) {
// SArray* pLocalIdList = getColumnIdList(pTsdbReadHandle);
// // check if the primary time stamp column needs to load
// int16_t colId = *(int16_t*)taosArrayGet(pLocalIdList, 0);
// // the primary timestamp column does not be included in the the specified load column list, add it
// if (loadTS && colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
// int16_t columnId = PRIMARYKEY_TIMESTAMP_COL_ID;
// taosArrayInsert(pLocalIdList, 0, &columnId);
// }
static
int32_t
setColumnIdList
(
STsdbReader
*
pReader
,
SSDataBlock
*
pBlock
)
{
size_t
numOfCols
=
taosArrayGetSize
(
pBlock
->
pDataBlock
);
pReader
->
suppInfo
.
colIdList
=
taosMemoryCalloc
(
numOfCols
,
sizeof
(
int16_t
));
if
(
pReader
->
suppInfo
.
colIdList
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
// return pLocalIdList;
// }
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
*
pCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
i
);
pReader
->
suppInfo
.
colIdList
[
i
]
=
pCol
->
info
.
colId
;
}
static
SArray
*
createCheckInfoFromTableGroup
(
STsdbReader
*
pTsdbReadHandle
,
STableListInfo
*
pTableList
)
{
// size_t tableSize = taosArrayGetSize(pTableList->pTableList);
// assert(tableSize >= 1);
return
TSDB_CODE_SUCCESS
;
}
// // allocate buffer in order to load data blocks from file
// SArray* pTableCheckInfo = taosArrayInit(tableSize, sizeof(STableCheckInfo));
// if (pTableCheckInfo == NULL) {
// return NULL;
// }
static
SHashObj
*
createDataBlockScanInfo
(
STsdbReader
*
pTsdbReader
,
const
uint64_t
*
idList
,
int32_t
numOfTables
)
{
ASSERT
(
numOfTables
>=
1
);
// // todo apply the lastkey of table check to avoid to load header file
// for (int32_t j = 0; j < tableSize; ++j) {
// STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(pTableList->pTableList, j);
// allocate buffer in order to load data blocks from file
// todo use simple hash instead
SHashObj
*
pTableMap
=
taosHashInit
(
numOfTables
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_NO_LOCK
);
if
(
pTableMap
==
NULL
)
{
return
NULL
;
}
// STableCheckInfo info = {.lastKey = pKeyInfo->lastKey, .tableId = pKeyInfo->uid};
// info.suid = pTsdbReadHandle->suid;
// if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
// if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReadHandle->window.skey) {
// info.lastKey = pTsdbReadHandle->window.skey;
// }
// todo apply the lastkey of table check to avoid to load header file
for
(
int32_t
j
=
0
;
j
<
numOfTables
;
++
j
)
{
STableBlockScanInfo
info
=
{.
lastKey
=
0
,
.
uid
=
idList
[
j
]};
if
(
ASCENDING_TRAVERSE
(
pTsdbReader
->
order
))
{
if
(
info
.
lastKey
==
INT64_MIN
||
info
.
lastKey
<
pTsdbReader
->
window
.
skey
)
{
info
.
lastKey
=
pTsdbReader
->
window
.
skey
;
}
// assert(info.lastKey >= pTsdbReadHandle->window.skey && info.lastKey <= pTsdbReadHandle
->window.ekey);
//
} else {
// info.lastKey = pTsdbReadHandle
->window.skey;
//
}
ASSERT
(
info
.
lastKey
>=
pTsdbReader
->
window
.
skey
&&
info
.
lastKey
<=
pTsdbReader
->
window
.
ekey
);
}
else
{
info
.
lastKey
=
pTsdbReader
->
window
.
skey
;
}
// taosArrayPush(pTableCheckInfo, &info);
// tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReadHandle, info.tableId,
// info.lastKey,
// pTsdbReadHandle->idStr);
// }
taosHashPut
(
pTableMap
,
&
info
.
uid
,
sizeof
(
uint64_t
),
&
info
,
sizeof
(
info
));
tsdbDebug
(
"%p check table uid:%"
PRId64
" from lastKey:%"
PRId64
" %s"
,
pTsdbReader
,
info
.
uid
,
info
.
lastKey
,
pTsdbReader
->
idStr
);
}
// // TODO group table according to the tag value.
// taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar);
// return pTableCheckInfo;
return
NULL
;
return
pTableMap
;
}
// static void resetCheckInfo(STsdbReader* pTsdbReadHandle) {
...
...
@@ -224,7 +241,7 @@ static SArray* createCheckInfoFromTableGroup(STsdbReader* pTsdbReadHandle, STabl
// // todo apply the lastkey of table check to avoid to load header file
// for (int32_t i = 0; i < numOfTables; ++i) {
// STable
CheckInfo* pCheckInfo = (STableCheck
Info*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
// STable
BlockScanInfo* pCheckInfo = (STableBlockScan
Info*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
// pCheckInfo->lastKey = pTsdbReadHandle->window.skey;
// pCheckInfo->iter = tsdbTbDataIterDestroy(pCheckInfo->iter);
// pCheckInfo->iiter = tsdbTbDataIterDestroy(pCheckInfo->iiter);
...
...
@@ -239,24 +256,24 @@ static SArray* createCheckInfoFromTableGroup(STsdbReader* pTsdbReadHandle, STabl
// }
// // only one table, not need to sort again
// static SArray* createCheckInfoFromCheckInfo(STable
Check
Info* pCheckInfo, TSKEY skey, SArray** psTable) {
// SArray* pNew = taosArrayInit(1, sizeof(STable
Check
Info));
// static SArray* createCheckInfoFromCheckInfo(STable
BlockScan
Info* pCheckInfo, TSKEY skey, SArray** psTable) {
// SArray* pNew = taosArrayInit(1, sizeof(STable
BlockScan
Info));
// STable
Check
Info info = {.lastKey = skey};
// STable
BlockScan
Info info = {.lastKey = skey};
// info.tableId = pCheckInfo->tableId;
// taosArrayPush(pNew, &info);
// return pNew;
// }
// static bool emptyQueryTimewindow(STsdbReader* pTsdbReadHandle
) {
// assert(pTsdbReadHandle
!= NULL);
static
bool
isEmptyQueryTimeWindow
(
STsdbReader
*
pTsdbReader
)
{
ASSERT
(
pTsdbReader
!=
NULL
);
// STimeWindow* w = &pTsdbReadHandle
->window;
// bool asc = ASCENDING_TRAVERSE(pTsdbReadHandle
->order);
STimeWindow
*
w
=
&
pTsdbReader
->
window
;
bool
asc
=
ASCENDING_TRAVERSE
(
pTsdbReader
->
order
);
//
return ((asc && w->skey > w->ekey) || (!asc && w->ekey > w->skey));
//
}
return
((
asc
&&
w
->
skey
>
w
->
ekey
)
||
(
!
asc
&&
w
->
ekey
>
w
->
skey
));
}
// // Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// // the expired data to client, even it is queried already.
...
...
@@ -267,6 +284,7 @@ static SArray* createCheckInfoFromTableGroup(STsdbReader* pTsdbReadHandle, STabl
// return now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1; // needs to add one tick
// }
// todo remove this
static
void
setQueryTimewindow
(
STsdbReader
*
pReader
,
SQueryTableDataCond
*
pCond
,
int32_t
tWinIdx
)
{
// pReader->window = pCond->twindows[tWinIdx];
...
...
@@ -293,113 +311,140 @@ static void setQueryTimewindow(STsdbReader* pReader, SQueryTableDataCond* pCond,
// }
}
static
int32_t
tsdbReaderCreate
(
SVnode
*
pVnode
,
SQueryTableDataCond
*
pCond
,
uint64_t
qId
,
uint64_t
taskId
,
STsdbReader
**
ppReader
)
{
int32_t
code
=
0
;
STsdbReader
*
pReader
=
NULL
;
static
void
checkResultSize
(
const
SQueryTableDataCond
*
pCond
,
STsdbReader
*
pReader
)
{
int32_t
rowLen
=
0
;
for
(
int32_t
i
=
0
;
i
<
pCond
->
numOfCols
;
++
i
)
{
rowLen
+=
pCond
->
colList
[
i
].
bytes
;
}
// alloc
pReader
=
(
STsdbReader
*
)
taosMemoryCalloc
(
1
,
sizeof
(
*
pReader
));
if
(
pReader
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
// make sure the output SSDataBlock size be less than 2MB.
int32_t
TWOMB
=
2
*
1024
*
1024
;
if
(
pReader
->
capacity
*
rowLen
>
TWOMB
)
{
pReader
->
capacity
=
TWOMB
/
rowLen
;
}
}
// init file iterator
static
int32_t
initFileIterator
(
SFileSetIter
*
pIter
,
const
STsdbFSState
*
pFState
)
{
pIter
->
index
=
-
1
;
pIter
->
numOfFiles
=
taosArrayGetSize
(
pFState
->
aDFileSet
);
pIter
->
pFileList
=
taosArrayDup
(
pFState
->
aDFileSet
);
return
TSDB_CODE_SUCCESS
;
}
static
void
resetDataBlockIterator
(
SDataBlockIter
*
pIter
)
{
pIter
->
numOfBlocks
=
-
1
;
}
static
bool
nextFilesetIterator
(
SFileSetIter
*
pIter
,
int32_t
order
,
STsdbReader
*
pReader
)
{
if
(
pIter
->
index
>=
pIter
->
numOfFiles
)
{
return
false
;
}
pIter
->
index
+=
1
;
// check file the time range of coverage
STimeWindow
win
=
{
0
};
SDFileSet
*
pDFile
=
(
SDFileSet
*
)
taosArrayGet
(
pIter
->
pFileList
,
pIter
->
index
);
int32_t
code
=
tsdbDataFReaderOpen
(
&
pReader
->
pFileReader
,
pReader
->
pTsdb
,
pDFile
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_err
;
}
pReader
->
pTsdb
=
pVnode
->
pTsdb
;
// TODO: pass in pTsdb directly
pReader
->
suid
=
pCond
->
suid
;
pReader
->
order
=
pCond
->
order
;
pReader
->
loadType
=
pCond
->
type
;
pReader
->
loadExternalRow
=
pCond
->
loadExternalRows
;
pReader
->
currentLoadExternalRows
=
pCond
->
loadExternalRows
;
pReader
->
type
=
TSDB_QUERY_TYPE_ALL
;
pReader
->
cur
.
fid
=
INT32_MIN
;
pReader
->
cur
.
win
=
TSWINDOW_INITIALIZER
;
pReader
->
checkFiles
=
true
;
pReader
->
activeIndex
=
0
;
// current active table index
pReader
->
allocSize
=
0
;
pReader
->
locateStart
=
false
;
pReader
->
outputCapacity
=
4096
;
//((STsdb*)tsdb)->config.maxRowsPerFileBlock;
// char buf[128] = {0};
// snprintf(buf, tListLen(buf), "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, qId);
// pReadHandle->idStr = strdup(buf);
// // if (tsdbInitReadH(&pReadHandle->rhelper, pReadHandle->pTsdb) != 0) {
// // goto _end;
// // }
setQueryTimewindow
(
pReader
,
pCond
,
0
);
// tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey
);
if
(
pCond
->
numOfCols
>
0
)
{
// int32_t rowLen = 0;
// for (int32_t i = 0; i < pCond->numOfCols; ++i) {
// rowLen += pCond->colList[i].bytes;
// }
// current file are not overlapped with query time window, ignore remain files
if
((
ASCENDING_TRAVERSE
(
order
)
&&
win
.
skey
>
pReader
->
window
.
ekey
)
||
(
!
ASCENDING_TRAVERSE
(
order
)
&&
win
.
ekey
<
pReader
->
window
.
ekey
))
{
tsdbDebug
(
"%p remain files are not qualified for qrange:%"
PRId64
"-%"
PRId64
", ignore, %s"
,
pReader
,
pReader
->
window
.
skey
,
pReader
->
window
.
ekey
,
pReader
->
idStr
);
return
false
;
}
// // make sure the output SSDataBlock size be less than 2MB.
// int32_t TWOMB = 2 * 1024 * 1024;
// if (pReadHandle->outputCapacity * rowLen > TWOMB) {
// pReadHandle->outputCapacity = TWOMB / rowLen;
// }
_err:
return
false
;
}
// // allocate buffer in order to load data blocks from file
// pReadHandle->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg));
// if (pReadHandle->suppInfo.pstatis == NULL) {
// goto _end;
// }
static
void
initReaderStatus
(
SReaderStatus
*
pStatus
)
{
pStatus
->
cur
.
fid
=
INT32_MIN
;
pStatus
->
cur
.
win
=
TSWINDOW_INITIALIZER
;
pStatus
->
initStartPos
=
false
;
pStatus
->
tableListIndex
=
0
;
// current active table index
pStatus
->
loadFromFile
=
true
;
}
// // todo: use list instead of array?
// pReadHandle->pColumns = taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData));
// if (pReadHandle->pColumns == NULL) {
// goto _end;
// }
static
int32_t
tsdbReaderCreate
(
SVnode
*
pVnode
,
SQueryTableDataCond
*
pCond
,
STsdbReader
**
ppReader
,
const
char
*
idstr
)
{
int32_t
code
=
0
;
STsdbReader
*
pReader
=
(
STsdbReader
*
)
taosMemoryCalloc
(
1
,
sizeof
(
*
pReader
));
if
(
pReader
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_end
;
}
// for (int32_t i = 0; i < pCond->numOfCols; ++i) {
// SColumnInfoData colInfo = {{0}, 0};
// colInfo.info = pCond->colList[i];
initReaderStatus
(
&
pReader
->
status
);
pReader
->
pTsdb
=
pVnode
->
pTsdb
;
pReader
->
suid
=
pCond
->
suid
;
pReader
->
order
=
pCond
->
order
;
pReader
->
capacity
=
4096
;
pReader
->
idStr
=
strdup
(
idstr
);
pReader
->
startVersion
=
pCond
->
startVersion
;
pReader
->
endVersion
=
pCond
->
endVersion
;
// int32_t code = colInfoDataEnsureCapacity(&colInfo, 0, pReadHandle->outputCapacity);
// if (code != TSDB_CODE_SUCCESS) {
// goto _end;
// }
// todo remove this
setQueryTimewindow
(
pReader
,
pCond
,
0
);
// taosArrayPush(pReadHandle->pColumns, &colInfo);
// }
if
(
pCond
->
numOfCols
>
0
)
{
checkResultSize
(
pCond
,
pReader
);
// pReadHandle->suppInfo.defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true);
// allocate buffer in order to load data blocks from file
pReader
->
suppInfo
.
pstatis
=
taosMemoryCalloc
(
pCond
->
numOfCols
,
sizeof
(
SColumnDataAgg
));
if
(
pReader
->
suppInfo
.
pstatis
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_end
;
}
// size_t size = taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn);
// pReadHandle->suppInfo.slotIds = taosMemoryCalloc(size, sizeof(int32_t));
// pReadHandle->suppInfo.plist = taosMemoryCalloc(size, POINTER_BYTES);
// todo use new api refactor this
pReader
->
pResBlock
=
taosMemoryCalloc
(
1
,
sizeof
(
SSDataBlock
));
if
(
pReader
->
pResBlock
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_end
;
}
// pReadHandle->pDataCols = tdNewDataCols(1000, pVnode->config.tsdbCfg.maxRows
);
// if (pReadHandle->pDataCols == NULL
) {
// tsdbError("%p failed to malloc buf for pDataCols, %s", pReadHandle, pReadHandle->idStr)
;
// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY
;
// goto _end
;
//
}
pReader
->
pResBlock
->
pDataBlock
=
taosArrayInit
(
pCond
->
numOfCols
,
sizeof
(
SColumnInfoData
)
);
for
(
int32_t
i
=
0
;
i
<
pCond
->
numOfCols
;
++
i
)
{
SColumnInfoData
colInfo
=
{{
0
},
0
}
;
colInfo
.
info
=
pCond
->
colList
[
i
]
;
taosArrayPush
(
pReader
->
pResBlock
->
pDataBlock
,
&
colInfo
)
;
}
// tsdbInitDataBlockLoadInfo(&pReadHandle->dataBlockLoadInfo);
// tsdbInitCompBlockLoadInfo(&pReadHandle->compBlockLoadInfo);
setColumnIdList
(
pReader
,
pReader
->
pResBlock
);
pReader
->
suppInfo
.
slotIds
=
taosMemoryCalloc
(
pCond
->
numOfCols
,
sizeof
(
int32_t
));
pReader
->
suppInfo
.
plist
=
taosMemoryCalloc
(
pCond
->
numOfCols
,
POINTER_BYTES
);
}
// return (STsdbReader*)pReadHandle;
// todo refactor
STsdbFSState
*
pFState
=
pReader
->
pTsdb
->
fs
->
cState
;
initFileIterator
(
&
pReader
->
status
.
fileIter
,
pFState
);
resetDataBlockIterator
(
&
pReader
->
status
.
blockIter
);
//
_end:
// tsdbReaderClose(pReadHandle);
// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY
;
// return NULL;
//
no data in files, let's try buffer in memory
if
(
pReader
->
status
.
fileIter
.
numOfFiles
==
0
)
{
pReader
->
status
.
loadFromFile
=
false
;
}
*
ppReader
=
pReader
;
return
code
;
_e
rr
:
// tsdbError(""
);
_e
nd
:
tsdbReaderClose
(
pReader
);
*
ppReader
=
NULL
;
return
code
;
}
// static int32_t setCurrentSchema(SVnode* pVnode, STsdbReader* pTsdbReadHandle) {
// STable
Check
Info* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, 0);
// STable
BlockScan
Info* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, 0);
// int32_t sversion = 1;
...
...
@@ -463,7 +508,7 @@ _err:
// // pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo);
// pTsdbReadHandle->pTableCheckInfo = NULL; // create
CheckInfoFromTableGroup
(pTsdbReadHandle, groupList, pMeta,
// pTsdbReadHandle->pTableCheckInfo = NULL; // create
DataBlockScanInfo
(pTsdbReadHandle, groupList, pMeta,
// // &pTable);
// if (pTsdbReadHandle->pTableCheckInfo == NULL) {
// // tsdbReaderClose(pTsdbReadHandle);
...
...
@@ -484,7 +529,7 @@ _err:
// return res;
// }
// static bool initTableMemIterator(STsdbReader* pHandle, STable
Check
Info* pCheckInfo) {
// static bool initTableMemIterator(STsdbReader* pHandle, STable
BlockScan
Info* pCheckInfo) {
// if (pCheckInfo->initBuf) {
// return true;
// }
...
...
@@ -566,12 +611,12 @@ _err:
// return true;
// }
// static void destroyTableMemIterator(STable
Check
Info* pCheckInfo) {
// static void destroyTableMemIterator(STable
BlockScan
Info* pCheckInfo) {
// tsdbTbDataIterDestroy(pCheckInfo->iter);
// tsdbTbDataIterDestroy(pCheckInfo->iiter);
// }
// static TSKEY extractFirstTraverseKey(STable
Check
Info* pCheckInfo, int32_t order, int32_t update, TDRowVerT maxVer) {
// static TSKEY extractFirstTraverseKey(STable
BlockScan
Info* pCheckInfo, int32_t order, int32_t update, TDRowVerT maxVer) {
// TSDBROW row = {0};
// STSRow *rmem = NULL, *rimem = NULL;
...
...
@@ -621,7 +666,7 @@ _err:
// }
// }
// static STSRow* getSRowInTableMem(STable
Check
Info* pCheckInfo, int32_t order, int32_t update, STSRow** extraRow,
// static STSRow* getSRowInTableMem(STable
BlockScan
Info* pCheckInfo, int32_t order, int32_t update, STSRow** extraRow,
// TDRowVerT maxVer) {
// TSDBROW row;
// STSRow *rmem = NULL, *rimem = NULL;
...
...
@@ -685,7 +730,7 @@ _err:
// }
// }
// static bool moveToNextRowInMem(STable
Check
Info* pCheckInfo) {
// static bool moveToNextRowInMem(STable
BlockScan
Info* pCheckInfo) {
// bool hasNext = false;
// if (pCheckInfo->chosen == CHECKINFO_CHOSEN_MEM) {
// if (pCheckInfo->iter != NULL) {
...
...
@@ -729,7 +774,7 @@ _err:
// assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1);
// pHandle->cur.fid = INT32_MIN;
// STable
Check
Info* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
// STable
BlockScan
Info* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
// if (!pCheckInfo->initBuf) {
// initTableMemIterator(pHandle, pCheckInfo);
// }
...
...
@@ -814,188 +859,154 @@ _err:
// return midSlot;
// }
// static int32_t loadBlockInfo(STsdbReader* pTsdbReadHandle, int32_t index, int32_t* numOfBlocks) {
// int32_t code = 0;
// STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, index);
// pCheckInfo->numOfBlocks = 0;
// STable table = {.uid = pCheckInfo->tableId, .suid = pCheckInfo->suid};
// table.pSchema = pTsdbReadHandle->pSchema;
// if (tsdbSetReadTable(&pTsdbReadHandle->rhelper, &table) != TSDB_CODE_SUCCESS) {
// code = terrno;
// return code;
// }
// SBlockIdx* compIndex = pTsdbReadHandle->rhelper.pBlkIdx;
// // no data block in this file, try next file
// if (compIndex == NULL || compIndex->uid != pCheckInfo->tableId) {
// return 0; // no data blocks in the file belongs to pCheckInfo->pTable
// }
// if (pCheckInfo->compSize < (int32_t)compIndex->len) {
// assert(compIndex->len > 0);
// char* t = taosMemoryRealloc(pCheckInfo->pCompInfo, compIndex->len);
// if (t == NULL) {
// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
// code = TSDB_CODE_TDB_OUT_OF_MEMORY;
// return code;
// }
static
int32_t
doLoadBlockIndex
(
STsdbReader
*
pReader
,
SDataFReader
*
pFileReader
,
SArray
*
pIndexList
)
{
int32_t
code
=
0
;
// pCheckInfo->pCompInfo = (SBlockInfo*)t;
// pCheckInfo->compSize = compIndex->len;
// }
SMapData
blockIdxMap
;
tMapDataReset
(
&
blockIdxMap
);
// if (tsdbLoadBlockInfo(&(pTsdbReadHandle->rhelper), (void*)(pCheckInfo->pCompInfo)) < 0) {
// return terrno;
// }
// SBlockInfo* pCompInfo = pCheckInfo->pCompInfo;
code
=
tsdbReadBlockIdx
(
pFileReader
,
&
blockIdxMap
,
NULL
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_err
;
}
// TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL;
if
(
blockIdxMap
.
nItem
==
0
)
{
return
TSDB_CODE_SUCCESS
;
}
// if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
// assert(pCheckInfo->lastKey <= pTsdbReadHandle->window.ekey &&
// pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
// } else {
// assert(pCheckInfo->lastKey >= pTsdbReadHandle->window.ekey &&
// pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
// }
SBlockIdx
blockIndex
=
{
0
};
for
(
int32_t
i
=
0
;
i
<
blockIdxMap
.
nItem
;
++
i
)
{
code
=
tMapDataGetItemByIdx
(
&
blockIdxMap
,
i
,
&
blockIndex
,
tGetBlockIdx
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_err
;
}
// s = TMIN(pCheckInfo->lastKey, pTsdbReadHandle->window.ekey);
// e = TMAX(pCheckInfo->lastKey, pTsdbReadHandle->window.ekey);
if
(
blockIndex
.
suid
!=
pReader
->
suid
)
{
continue
;
}
// // discard the unqualified data block based on the query time window
// int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC);
// int32_t end = start;
// this block belongs to a table that is not queried.
void
*
p
=
taosHashGet
(
pReader
->
status
.
pTableMap
,
&
blockIndex
.
uid
,
sizeof
(
uint64_t
));
if
(
p
==
NULL
)
{
continue
;
}
// if (s > pCompInfo->blocks[start].maxKey.ts) {
// return 0;
// }
if
((
ASCENDING_TRAVERSE
(
pReader
->
order
)
&&
(
blockIndex
.
minKey
>
pReader
->
window
.
ekey
||
blockIndex
.
maxKey
<
pReader
->
window
.
skey
))
||
(
!
ASCENDING_TRAVERSE
(
pReader
->
order
)
&&
(
blockIndex
.
minKey
>
pReader
->
window
.
skey
||
blockIndex
.
maxKey
<
pReader
->
window
.
ekey
)))
{
continue
;
}
// // todo speedup the procedure of located end block
// while (end < (int32_t)compIndex->numOfBlocks && (pCompInfo->blocks[end].minKey.ts <= e)
) {
// end += 1
;
//
}
STableBlockScanInfo
*
pScanInfo
=
p
;
if
(
pScanInfo
->
pBlockList
==
NULL
)
{
pScanInfo
->
pBlockList
=
taosArrayInit
(
16
,
sizeof
(
SBlock
))
;
}
// pCheckInfo->numOfBlocks = (end - start);
taosArrayPush
(
pIndexList
,
&
blockIndex
);
}
// if (start > 0) {
// memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SBlock));
// }
tMapDataClear
(
&
blockIdxMap
);
return
TSDB_CODE_SUCCESS
;
// (*numOfBlocks) += pCheckInfo->numOfBlocks;
// return 0;
// }
_err:
tMapDataClear
(
&
blockIdxMap
);
return
code
;
}
// static int32_t getFileCompInfo(STsdbReader* pTsdbReadHandle, int32_t* numOfBlocks) {
// // load all the comp offset value for all tables in this file
// int32_t code = TSDB_CODE_SUCCESS;
// *numOfBlocks = 0;
static
int32_t
doLoadFileBlock
(
STsdbReader
*
pReader
,
SArray
*
pIndexList
,
uint32_t
*
numOfValidTables
)
{
size_t
numOfTables
=
taosArrayGetSize
(
pIndexList
);
// pTsdbReadHandle->cost.headFileLoad += 1;
// int64_t s = taosGetTimestampUs();
*
numOfValidTables
=
0
;
// size_t numOfTables = 0;
// if (pTsdbReadHandle->loadType == BLOCK_LOAD_TABLE_SEQ_ORDER) {
// code = loadBlockInfo(pTsdbReadHandle, pTsdbReadHandle->activeIndex, numOfBlocks);
// } else if (pTsdbReadHandle->loadType == BLOCK_LOAD_OFFSET_SEQ_ORDER) {
// numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
for
(
int32_t
i
=
0
;
i
<
numOfTables
;
++
i
)
{
SBlockIdx
*
pBlockIdx
=
taosArrayGet
(
pIndexList
,
i
);
// for (int32_t i = 0; i < numOfTables; ++i) {
// code = loadBlockInfo(pTsdbReadHandle, i, numOfBlocks);
// if (code != TSDB_CODE_SUCCESS) {
// int64_t e = taosGetTimestampUs();
SMapData
mapData
;
tMapDataReset
(
&
mapData
);
tsdbReadBlock
(
pReader
->
pFileReader
,
pBlockIdx
,
&
mapData
,
NULL
);
// pTsdbReadHandle->cost.headFileLoadTime += (e - s);
// return code;
// }
// }
// } else {
// assert(0);
// }
STableBlockScanInfo
*
pScanInfo
=
taosHashGet
(
pReader
->
status
.
pTableMap
,
&
pBlockIdx
->
uid
,
sizeof
(
int64_t
));
ASSERT
(
pScanInfo
->
pBlockList
==
NULL
||
taosArrayGetSize
(
pScanInfo
->
pBlockList
)
==
0
);
for
(
int32_t
j
=
0
;
j
<
mapData
.
nItem
;
++
j
)
{
SBlock
block
=
{
0
};
// int64_t e = taosGetTimestampUs(
);
// pTsdbReadHandle->cost.headFileLoadTime += (e - s);
//
return code;
//
}
int32_t
code
=
tMapDataGetItemByIdx
(
&
mapData
,
j
,
&
block
,
tGetBlock
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
// static int32_t doLoadFileDataBlock(STsdbReader* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo,
// int32_t slotIndex) {
// int64_t st = taosGetTimestampUs();
if
((
ASCENDING_TRAVERSE
(
pReader
->
order
)
&&
(
block
.
minKey
.
ts
>
pReader
->
window
.
ekey
||
block
.
maxKey
.
ts
<
pReader
->
window
.
skey
))
||
(
!
ASCENDING_TRAVERSE
(
pReader
->
order
)
&&
(
block
.
minKey
.
ts
>
pReader
->
window
.
skey
||
block
.
maxKey
.
ts
<
pReader
->
window
.
ekey
)))
{
continue
;
}
// int32_t code = tdInitDataCols(pTsdbReadHandle->pDataCols, pTsdbReadHandle->pSchema);
// if (code != TSDB_CODE_SUCCESS) {
// tsdbError("%p failed to malloc buf for pDataCols, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
// goto _error;
// }
void
*
p
=
taosArrayPush
(
pScanInfo
->
pBlockList
,
&
block
);
if
(
p
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
// code = tdInitDataCols(pTsdbReadHandle->rhelper.pDCols[0], pTsdbReadHandle->pSchema);
// if (code != TSDB_CODE_SUCCESS) {
// tsdbError("%p failed to malloc buf for rhelper.pDataCols[0], %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
// goto _error;
// }
if
(
pScanInfo
->
pBlockList
!=
NULL
&&
taosArrayGetSize
(
pScanInfo
->
pBlockList
)
>
0
)
{
(
*
numOfValidTables
)
+=
1
;
}
}
// code = tdInitDataCols(pTsdbReadHandle->rhelper.pDCols[1], pTsdbReadHandle->pSchema);
// if (code != TSDB_CODE_SUCCESS) {
// tsdbError("%p failed to malloc buf for rhelper.pDataCols[1], %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
// goto _error;
// }
return
TSDB_CODE_SUCCESS
;
}
// int16_t* colIds = pTsdbReadHandle->suppInfo.defaultLoadColumn->pData;
static
int32_t
doLoadFileBlockData
(
STsdbReader
*
pReader
,
SDataBlockIter
*
pBlockIter
,
STableBlockScanInfo
*
pBlockScanInfo
,
SBlockData
*
pBlockData
)
{
int64_t
st
=
taosGetTimestampUs
();
int32_t
numOfCols
=
taosArrayGetSize
(
pReader
->
pResBlock
->
pDataBlock
);
// int32_t ret = tsdbLoadBlockDataCols(&(pTsdbReadHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds,
// (int)(QH_GET_NUM_OF_COLS(pTsdbReadHandle)), true);
// if (ret != TSDB_CODE_SUCCESS) {
// int32_t c = terrno;
// assert(c != TSDB_CODE_SUCCESS);
// goto _error;
// }
SFileDataBlockInfo
*
pFBlock
=
getCurrentBlockInfo
(
pBlockIter
);
SBlock
*
pBlock
=
taosArrayGet
(
pBlockScanInfo
->
pBlockList
,
pFBlock
->
tbBlockIdx
);
// SDataBlockLoadInfo* pBlockLoadInfo = &pTsdbReadHandle->dataBlockLoadInfo;
uint8_t
*
pb
=
NULL
,
*
pb1
=
NULL
;
int32_t
code
=
tsdbReadBlockData
(
pReader
->
pFileReader
,
&
pBlockScanInfo
->
blockIdx
,
pBlock
,
pBlockData
,
/*pReader->suppInfo.colIdList, numOfCols, */
&
pb
,
&
pb1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
// pBlockLoadInfo->fileGroup = pTsdbReadHandle->pFileGroup;
// pBlockLoadInfo->slot = pTsdbReadHandle->cur.slot;
// pBlockLoadInfo->uid = pCheckInfo->tableId;
/*
int32_t ret = tsdbLoadBlockDataCols(&(pReader->rhelper), pBlock, pCheckInfo->pCompInfo, colIds,
(int)(QH_GET_NUM_OF_COLS(pReader)), true);
if (ret != TSDB_CODE_SUCCESS) {
int32_t c = terrno;
assert(c != TSDB_CODE_SUCCESS);
goto _error;
}
// SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
// assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows);
SDataBlockLoadInfo* pBlockLoadInfo = &pReader->dataBlockLoadInfo;
// pBlock->numOfRows = pCols->numOfRows;
pBlockLoadInfo->fileGroup = pReader->pFileGroup;
pBlockLoadInfo->slot = pReader->cur.slot;
pBlockLoadInfo->uid = pCheckInfo->tableId;
// // Convert from TKEY to TSKEY for primary timestamp column if current block has timestamp before
// 1970-01-01T00:00:00Z if (pBlock->minKey.ts < 0 && colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
// int64_t* src = pCols->cols[0].pData;
// for (int32_t i = 0; i < pBlock->numOfRows; ++i) {
// src[i] = tdGetKey(src[i]);
// }
// }
SDataCols* pCols = pReader->rhelper.pDCols[0];
assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows);
// int64_t elapsedTime = (taosGetTimestampUs() - st)
;
// pTsdbReadHandle->cost.blockLoadTime += elapsedTime;
pBlock->numOfRows = pCols->numOfRows
;
*/
// tsdbDebug("%p load file block into buffer, index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%"
// PRId64
// " us, %s",
// pTsdbReadHandle, slotIndex, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->numOfRows, elapsedTime,
// pTsdbReadHandle->idStr);
// return TSDB_CODE_SUCCESS;
int64_t
elapsedTime
=
(
taosGetTimestampUs
()
-
st
);
pReader
->
cost
.
blockLoadTime
+=
elapsedTime
;
// _error:
// pBlock->numOfRows = 0;
tsdbDebug
(
"%p load file block into buffer, global index:%d, table index:%d, brange:%"
PRId64
"-%"
PRId64
", rows:%d, minVer:%"
PRId64
", maxVer:%"
PRId64
", elapsed time:%"
PRId64
" us, %s"
,
pReader
,
pBlockIter
->
index
,
pFBlock
->
tbBlockIdx
,
pBlock
->
minKey
.
ts
,
pBlock
->
maxKey
.
ts
,
pBlockData
->
nRow
,
pBlock
->
minVersion
,
pBlock
->
maxVersion
,
elapsedTime
,
pReader
->
idStr
);
return
TSDB_CODE_SUCCESS
;
// tsdbError("%p error occurs in loading file block, index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, %s",
// pTsdbReadHandle, slotIndex, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->numOfRows
,
// pTsdbReadHandle
->idStr);
// return terrno
;
//
}
_error:
tsdbError
(
"%p error occurs in loading file block, global index:%d, table index:%d, brange:%"
PRId64
"-%"
PRId64
", rows:%d, %s"
,
pReader
,
pBlockIter
->
index
,
pFBlock
->
tbBlockIdx
,
pBlock
->
minKey
.
ts
,
pBlock
->
maxKey
.
ts
,
pBlock
->
nRow
,
pReader
->
idStr
);
return
code
;
}
// static int32_t handleDataMergeIfNeeded(STsdbReader* pTsdbReadHandle, SBlock* pBlock, STable
Check
Info* pCheckInfo) {
// static int32_t handleDataMergeIfNeeded(STsdbReader* pTsdbReadHandle, SBlock* pBlock, STable
BlockScan
Info* pCheckInfo) {
// SQueryFilePos* cur = &pTsdbReadHandle->cur;
// STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
// SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
...
...
@@ -1041,7 +1052,7 @@ _err:
// }
// // return error, add test cases
// if ((code = doLoadFile
DataBlock
(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
// if ((code = doLoadFile
BlockData
(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
// return code;
// }
...
...
@@ -1074,7 +1085,7 @@ _err:
// // make sure to only load once
// bool firstTimeExtract = ((cur->pos == 0 && ascScan) || (cur->pos == binfo.rows - 1 && (!ascScan)));
// if (pTsdbReadHandle->outputCapacity < binfo.rows && firstTimeExtract) {
// code = doLoadFile
DataBlock
(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot);
// code = doLoadFile
BlockData
(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot);
// if (code != TSDB_CODE_SUCCESS) {
// return code;
// }
...
...
@@ -1102,7 +1113,7 @@ _err:
// return code;
// }
// static int32_t loadFileDataBlock(STsdbReader* pTsdbReadHandle, SBlock* pBlock, STable
Check
Info* pCheckInfo,
// static int32_t loadFileDataBlock(STsdbReader* pTsdbReadHandle, SBlock* pBlock, STable
BlockScan
Info* pCheckInfo,
// bool* exists) {
// SQueryFilePos* cur = &pTsdbReadHandle->cur;
// int32_t code = TSDB_CODE_SUCCESS;
...
...
@@ -1111,7 +1122,7 @@ _err:
// if (asc) {
// // query ended in/started from current block
// if (pTsdbReadHandle->window.ekey < pBlock->maxKey.ts || pCheckInfo->lastKey > pBlock->minKey.ts) {
// if ((code = doLoadFile
DataBlock
(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
// if ((code = doLoadFile
BlockData
(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
// *exists = false;
// return code;
// }
...
...
@@ -1135,7 +1146,7 @@ _err:
// }
// } else { // desc order, query ended in current block
// if (pTsdbReadHandle->window.ekey > pBlock->minKey.ts || pCheckInfo->lastKey < pBlock->maxKey.ts) {
// if ((code = doLoadFile
DataBlock
(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
// if ((code = doLoadFile
BlockData
(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
// *exists = false;
// return code;
// }
...
...
@@ -1531,7 +1542,7 @@ _err:
// }
// }
// static void updateInfoAfterMerge(STsdbReader* pTsdbReadHandle, STable
Check
Info* pCheckInfo, int32_t numOfRows,
// static void updateInfoAfterMerge(STsdbReader* pTsdbReadHandle, STable
BlockScan
Info* pCheckInfo, int32_t numOfRows,
// int32_t endPos) {
// SQueryFilePos* cur = &pTsdbReadHandle->cur;
...
...
@@ -1562,7 +1573,7 @@ _err:
// }
// }
// static void copyAllRemainRowsFromFileBlock(STsdbReader* pTsdbReadHandle, STable
Check
Info* pCheckInfo,
// static void copyAllRemainRowsFromFileBlock(STsdbReader* pTsdbReadHandle, STable
BlockScan
Info* pCheckInfo,
// SDataBlockInfo* pBlockInfo, int32_t endPos) {
// SQueryFilePos* cur = &pTsdbReadHandle->cur;
...
...
@@ -1648,7 +1659,7 @@ _err:
// // only return the qualified data to client in terms of query time window, data rows in the same block but do not
// // be included in the query time window will be discarded
// static void doMergeTwoLevelData(STsdbReader* pTsdbReadHandle, STable
Check
Info* pCheckInfo, SBlock* pBlock) {
// static void doMergeTwoLevelData(STsdbReader* pTsdbReadHandle, STable
BlockScan
Info* pCheckInfo, SBlock* pBlock) {
// SQueryFilePos* cur = &pTsdbReadHandle->cur;
// SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
// STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
...
...
@@ -1926,180 +1937,164 @@ _err:
// return midPos;
// }
// static void cleanBlockOrderSupporter(SBlockOrderSupporter* pSupporter, int32_t numOfTables) {
// taosMemoryFreeClear(pSupporter->numOfBlocksPerTable);
// taosMemoryFreeClear(pSupporter->blockIndexArray);
// for (int32_t i = 0; i < numOfTables; ++i) {
// STableBlockInfo* pBlockInfo = pSupporter->pDataBlockInfo[i];
// taosMemoryFreeClear(pBlockInfo);
// }
// taosMemoryFreeClear(pSupporter->pDataBlockInfo);
// }
// static int32_t dataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
// int32_t leftTableIndex = *(int32_t*)pLeft;
// int32_t rightTableIndex = *(int32_t*)pRight;
// SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
static
void
cleanupBlockOrderSupporter
(
SBlockOrderSupporter
*
pSup
)
{
taosMemoryFreeClear
(
pSup
->
numOfBlocksPerTable
);
taosMemoryFreeClear
(
pSup
->
indexPerTable
);
// int32_t leftTableBlockIndex = pSupporter->blockIndexArray[leftTableIndex];
// int32_t rightTableBlockIndex = pSupporter->blockIndexArray[rightTableIndex];
for
(
int32_t
i
=
0
;
i
<
pSup
->
numOfTables
;
++
i
)
{
SBlockOrderWrapper
*
pBlockInfo
=
pSup
->
pDataBlockInfo
[
i
];
taosMemoryFreeClear
(
pBlockInfo
);
}
// if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftTableIndex]) {
// /* left block is empty */
// return 1;
// } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightTableIndex]) {
// /* right block is empty */
// return -1;
// }
taosMemoryFreeClear
(
pSup
->
pDataBlockInfo
);
}
// STableBlockInfo* pLeftBlockInfoEx = &pSupporter->pDataBlockInfo[leftTableIndex][leftTableBlockIndex];
// STableBlockInfo* pRightBlockInfoEx = &pSupporter->pDataBlockInfo[rightTableIndex][rightTableBlockIndex]
;
static
int32_t
initBlockOrderSupporter
(
SBlockOrderSupporter
*
pSup
,
int32_t
numOfTables
)
{
ASSERT
(
numOfTables
>=
1
)
;
// // assert(pLeftBlockInfoEx->compBlock->offset != pRightBlockInfoEx->compBlock->offset);
// #if 0 // TODO: temporarily comment off requested by Dr. Liao
// if (pLeftBlockInfoEx->compBlock->offset == pRightBlockInfoEx->compBlock->offset &&
// pLeftBlockInfoEx->compBlock->last == pRightBlockInfoEx->compBlock->last) {
// tsdbError("error in header file, two block with same offset:%" PRId64,
// (int64_t)pLeftBlockInfoEx->compBlock->offset);
// }
// #endif
pSup
->
numOfBlocksPerTable
=
taosMemoryCalloc
(
1
,
sizeof
(
int32_t
)
*
numOfTables
);
pSup
->
indexPerTable
=
taosMemoryCalloc
(
1
,
sizeof
(
int32_t
)
*
numOfTables
);
pSup
->
pDataBlockInfo
=
taosMemoryCalloc
(
1
,
POINTER_BYTES
*
numOfTables
);
// return pLeftBlockInfoEx->compBlock->offset > pRightBlockInfoEx->compBlock->offset ? 1 : -1;
// }
// static int32_t createDataBlocksInfo(STsdbReader* pTsdbReadHandle, int32_t numOfBlocks, int32_t* numOfAllocBlocks) {
// size_t size = sizeof(STableBlockInfo) * numOfBlocks;
if
(
pSup
->
numOfBlocksPerTable
==
NULL
||
pSup
->
indexPerTable
==
NULL
||
pSup
->
pDataBlockInfo
==
NULL
)
{
cleanupBlockOrderSupporter
(
pSup
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
// if (pTsdbReadHandle->allocSize < size) {
// pTsdbReadHandle->allocSize = (int32_t)size;
// char* tmp = taosMemoryRealloc(pTsdbReadHandle->pDataBlockInfo, pTsdbReadHandle->allocSize);
// if (tmp == NULL) {
// return TSDB_CODE_TDB_OUT_OF_MEMORY;
// }
return
TSDB_CODE_SUCCESS
;
}
// pTsdbReadHandle->pDataBlockInfo = (STableBlockInfo*)tmp;
// }
static
int32_t
fileDataBlockOrderCompar
(
const
void
*
pLeft
,
const
void
*
pRight
,
void
*
param
)
{
int32_t
leftIndex
=
*
(
int32_t
*
)
pLeft
;
int32_t
rightIndex
=
*
(
int32_t
*
)
pRight
;
// memset(pTsdbReadHandle->pDataBlockInfo, 0, size);
// *numOfAllocBlocks = numOfBlocks;
SBlockOrderSupporter
*
pSupporter
=
(
SBlockOrderSupporter
*
)
param
;
// // access data blocks according to the offset of each block in asc/desc order.
// int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo)
;
int32_t
leftTableBlockIndex
=
pSupporter
->
indexPerTable
[
leftIndex
];
int32_t
rightTableBlockIndex
=
pSupporter
->
indexPerTable
[
rightIndex
]
;
// SBlockOrderSupporter sup = {0};
// sup.numOfTables = numOfTables;
// sup.numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
// sup.blockIndexArray = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
// sup.pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
if
(
leftTableBlockIndex
>
pSupporter
->
numOfBlocksPerTable
[
leftIndex
])
{
/* left block is empty */
return
1
;
}
else
if
(
rightTableBlockIndex
>
pSupporter
->
numOfBlocksPerTable
[
rightIndex
])
{
/* right block is empty */
return
-
1
;
}
// if (sup.numOfBlocksPerTable == NULL || sup.blockIndexArray == NULL || sup.pDataBlockInfo == NULL) {
// cleanBlockOrderSupporter(&sup, 0);
// return TSDB_CODE_TDB_OUT_OF_MEMORY;
// }
SBlockOrderWrapper
*
pLeftBlock
=
&
pSupporter
->
pDataBlockInfo
[
leftIndex
][
leftTableBlockIndex
];
SBlockOrderWrapper
*
pRightBlock
=
&
pSupporter
->
pDataBlockInfo
[
rightIndex
][
rightTableBlockIndex
];
// int32_t cnt = 0
;
// int32_t numOfQualTables = 0;
return
pLeftBlock
->
pBlock
->
aSubBlock
[
0
].
offset
>
pRightBlock
->
pBlock
->
aSubBlock
[
0
].
offset
?
1
:
-
1
;
}
// for (int32_t j = 0; j < numOfTables; ++j) {
// STableCheckInfo* pTableCheck = (STableCheckInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, j);
// if (pTableCheck->numOfBlocks <= 0) {
// continue;
// }
static
int32_t
initBlockIterator
(
STsdbReader
*
pReader
,
SDataBlockIter
*
pBlockIter
,
int32_t
numOfBlocks
)
{
pBlockIter
->
numOfBlocks
=
numOfBlocks
;
// SBlock* pBlock = pTableCheck->pCompInfo->blocks;
// sup.numOfBlocksPerTable[numOfQualTables] = pTableCheck->numOfBlocks
;
// access data blocks according to the offset of each block in asc/desc order.
int32_t
numOfTables
=
(
int32_t
)
taosHashGetSize
(
pReader
->
status
.
pTableMap
)
;
// char* buf = taosMemoryMalloc(sizeof(STableBlockInfo) * pTableCheck->numOfBlocks)
;
// if (buf == NULL) {
// cleanBlockOrderSupporter(&sup, numOfQualTables);
// return TSDB_CODE_TDB_OUT_OF_MEMORY
;
//
}
SBlockOrderSupporter
sup
=
{
0
}
;
int32_t
code
=
initBlockOrderSupporter
(
&
sup
,
numOfTables
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
// sup.pDataBlockInfo[numOfQualTables] = (STableBlockInfo*)buf;
int32_t
cnt
=
0
;
void
*
ptr
=
NULL
;
while
(
1
)
{
ptr
=
taosHashIterate
(
pReader
->
status
.
pTableMap
,
&
ptr
);
if
(
ptr
==
NULL
)
{
break
;
}
// for (int32_t k = 0; k < pTableCheck->numOfBlocks; ++k) {
// STableBlockInfo* pBlockInfo = &sup.pDataBlockInfo[numOfQualTables][k];
STableBlockScanInfo
*
pTableScanInfo
=
(
STableBlockScanInfo
*
)
ptr
;
if
(
pTableScanInfo
->
pBlockList
==
NULL
||
taosArrayGetSize
(
pTableScanInfo
->
pBlockList
)
==
0
)
{
continue
;
}
// pBlockInfo->compBlock = &pBlock[k];
// pBlockInfo->pTableCheckInfo = pTableCheck;
// cnt++;
// }
size_t
num
=
taosArrayGetSize
(
pTableScanInfo
->
pBlockList
);
sup
.
numOfBlocksPerTable
[
sup
.
numOfTables
]
=
num
;
// numOfQualTables++;
// }
char
*
buf
=
taosMemoryMalloc
(
sizeof
(
SBlockOrderWrapper
)
*
num
);
if
(
buf
==
NULL
)
{
cleanupBlockOrderSupporter
(
&
sup
);
return
TSDB_CODE_TDB_OUT_OF_MEMORY
;
}
// assert(numOfBlocks == cnt);
sup
.
pDataBlockInfo
[
sup
.
numOfTables
]
=
(
SBlockOrderWrapper
*
)
buf
;
for
(
int32_t
k
=
0
;
k
<
num
;
++
k
)
{
SBlockOrderWrapper
wrapper
=
{
0
};
wrapper
.
pBlock
=
(
SBlock
*
)
taosArrayGet
(
pTableScanInfo
->
pBlockList
,
k
);
wrapper
.
uid
=
pTableScanInfo
->
uid
;
// // since there is only one table qualified, blocks are not sorted
// if (numOfQualTables == 1) {
// memcpy(pTsdbReadHandle->pDataBlockInfo, sup.pDataBlockInfo[0], sizeof(STableBlockInfo) * numOfBlocks);
// cleanBlockOrderSupporter(&sup, numOfQualTables);
sup
.
pDataBlockInfo
[
sup
.
numOfTables
][
k
]
=
wrapper
;
cnt
++
;
}
// tsdbDebug("%p create data blocks info struct completed for 1 table, %d blocks not sorted %s", pTsdbReadHandle,
// cnt,
// pTsdbReadHandle->idStr);
// return TSDB_CODE_SUCCESS;
// }
sup
.
numOfTables
+=
1
;
}
// tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pTsdbReadHandle, cnt,
// numOfQualTables, pTsdbReadHandle->idStr);
ASSERT
(
numOfBlocks
==
cnt
);
// assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables); // the pTableQueryInfo[j]->numOfBlocks may be 0
// sup.numOfTables = numOfQualTables;
// since there is only one table qualified, blocks are not sorted
if
(
sup
.
numOfTables
==
1
)
{
for
(
int32_t
i
=
0
;
i
<
numOfBlocks
;
++
i
)
{
SFileDataBlockInfo
blockInfo
=
{.
uid
=
sup
.
pDataBlockInfo
[
0
][
i
].
uid
,
.
tbBlockIdx
=
i
};
taosArrayPush
(
pBlockIter
->
blockList
,
&
blockInfo
);
}
tsdbDebug
(
"%p create blocks info struct completed for one table, %d blocks not sorted %s"
,
pReader
,
cnt
,
pReader
->
idStr
);
return
TSDB_CODE_SUCCESS
;
}
// SMultiwayMergeTreeInfo* pTree = NULL;
// uint8_t ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, dataBlockOrderCompar);
// if (ret != TSDB_CODE_SUCCESS) {
// cleanBlockOrderSupporter(&sup, numOfTables);
// return TSDB_CODE_TDB_OUT_OF_MEMORY;
// }
tsdbDebug
(
"%p create data blocks info struct completed, %d blocks in %d tables %s"
,
pReader
,
cnt
,
sup
.
numOfTables
,
pReader
->
idStr
);
assert
(
cnt
<=
numOfBlocks
&&
sup
.
numOfTables
<=
numOfTables
);
// the pTableQueryInfo[j]->numOfBlocks may be 0
// int32_t numOfTotal = 0
;
SMultiwayMergeTreeInfo
*
pTree
=
NULL
;
// while (numOfTotal < cnt) {
// int32_t pos = tMergeTreeGetChosenIndex(pTree);
// int32_t index = sup.blockIndexArray[pos]++;
uint8_t
ret
=
tMergeTreeCreate
(
&
pTree
,
sup
.
numOfTables
,
&
sup
,
fileDataBlockOrderCompar
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
cleanupBlockOrderSupporter
(
&
sup
);
return
TSDB_CODE_TDB_OUT_OF_MEMORY
;
}
// STableBlockInfo* pBlocksInfo = sup.pDataBlockInfo[pos];
// pTsdbReadHandle->pDataBlockInfo[numOfTotal++] = pBlocksInfo[index];
int32_t
numOfTotal
=
0
;
while
(
numOfTotal
<
cnt
)
{
int32_t
pos
=
tMergeTreeGetChosenIndex
(
pTree
);
int32_t
index
=
sup
.
indexPerTable
[
pos
]
++
;
// // set data block index overflow, in order to disable the offset comparator
// if (sup.blockIndexArray[pos] >= sup.numOfBlocksPerTable[pos]) {
// sup.blockIndexArray[pos] = sup.numOfBlocksPerTable[pos] + 1;
// }
SFileDataBlockInfo
blockInfo
=
{.
uid
=
sup
.
pDataBlockInfo
[
pos
][
index
].
uid
,
.
tbBlockIdx
=
index
};
taosArrayPush
(
pBlockIter
->
blockList
,
&
blockInfo
);
// tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
// }
// set data block index overflow, in order to disable the offset comparator
if
(
sup
.
indexPerTable
[
pos
]
>=
sup
.
numOfBlocksPerTable
[
pos
])
{
sup
.
indexPerTable
[
pos
]
=
sup
.
numOfBlocksPerTable
[
pos
]
+
1
;
}
// /*
// * available when no import exists
// * for(int32_t i = 0; i < cnt - 1; ++i) {
// * assert((*pDataBlockInfo)[i].compBlock->offset < (*pDataBlockInfo)[i+1].compBlock->offset);
// * }
// */
numOfTotal
+=
1
;
tMergeTreeAdjust
(
pTree
,
tMergeTreeGetAdjustIndex
(
pTree
));
}
// tsdbDebug("%p %d data blocks sort completed, %s", pTsdbReadHandle, cnt, pTsdbReadHandle
->idStr);
// cleanBlockOrderSupporter(&sup, numOfTables
);
//
taosMemoryFree(pTree);
tsdbDebug
(
"%p %d data blocks sort completed, %s"
,
pReader
,
cnt
,
pReader
->
idStr
);
cleanupBlockOrderSupporter
(
&
sup
);
taosMemoryFree
(
pTree
);
// return TSDB_CODE_SUCCESS;
// }
pBlockIter
->
index
=
0
;
return
TSDB_CODE_SUCCESS
;
}
// static int32_t getFirstFileDataBlock(STsdbReader* pTsdbReadHandle, bool* exists);
//
static int32_t getDataBlock(STsdbReader* pTsdbReadHandle, STab
leBlockInfo* pNext, bool* exists) {
//
static int32_t getDataBlock(STsdbReader* pTsdbReadHandle, SFi
leBlockInfo* pNext, bool* exists) {
// int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
// SQueryFilePos* cur = &pTsdbReadHandle->cur;
//
// while (1) {
// int32_t code = loadFileDataBlock(pTsdbReadHandle, pNext->compBlock, pNext->pTableCheckInfo, exists);
// if (code != TSDB_CODE_SUCCESS || *exists) {
// return code;
// }
//
// if ((cur->slot == pTsdbReadHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
// (cur->slot == 0 && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
// // all data blocks in current file has been checked already, try next file if exists
...
...
@@ -2111,7 +2106,7 @@ _err:
// pNext = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
// }
// }
//
}
//}
// static int32_t getFirstFileDataBlock(STsdbReader* pTsdbReadHandle, bool* exists) {
// pTsdbReadHandle->numOfBlocks = 0;
...
...
@@ -2198,7 +2193,7 @@ _err:
// cur->slot = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 0 : pTsdbReadHandle->numOfBlocks - 1;
// cur->fid = pTsdbReadHandle->pFileGroup->fid;
// S
Tab
leBlockInfo* pBlockInfo = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
// S
Fi
leBlockInfo* pBlockInfo = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
// return getDataBlock(pTsdbReadHandle, pBlockInfo, exists);
// }
...
...
@@ -2222,52 +2217,163 @@ _err:
// return (numOfRows - startRow) / bucketRange;
// }
// static int32_t getDataBlocksInFiles(STsdbReader* pTsdbReadHandle, bool* exists) {
// STsdbFS* pFileHandle = REPO_FS(pTsdbReadHandle->pTsdb);
// SQueryFilePos* cur = &pTsdbReadHandle->cur;
// query ended in/started from current block
static
int32_t
dataBlockPartialRequired
(
STimeWindow
*
pWindow
,
SBlock
*
pBlock
)
{
return
((
pWindow
->
ekey
<
pBlock
->
maxKey
.
ts
&&
pWindow
->
ekey
>=
pBlock
->
minKey
.
ts
)
||
(
pWindow
->
skey
<=
pBlock
->
maxKey
.
ts
&&
pWindow
->
skey
>
pBlock
->
minKey
.
ts
));
}
// // find the start data block in file
// if (!pTsdbReadHandle->locateStart) {
// pTsdbReadHandle->locateStart = true;
// STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdbReadHandle->pTsdb);
// int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision);
static
SFileDataBlockInfo
*
getCurrentBlockInfo
(
SDataBlockIter
*
pBlockIter
)
{
SFileDataBlockInfo
*
pFBlockInfo
=
taosArrayGet
(
pBlockIter
->
blockList
,
pBlockIter
->
index
);
return
pFBlockInfo
;
}
// tsdbRLockFS(pFileHandle);
// tsdbFSIterInit(&pTsdbReadHandle->fileIter, pFileHandle, pTsdbReadHandle->order);
// tsdbFSIterSeek(&pTsdbReadHandle->fileIter, fid);
// tsdbUnLockFS(pFileHandle);
static
bool
overlapWithNeighborBlock
(
SBlock
*
pBlock
,
int32_t
blockIndex
)
{
return
false
;
}
// return getFirstFileDataBlock(pTsdbReadHandle, exists);
// } else {
// // check if current file block is all consumed
// STableBlockInfo* pBlockInfo = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
// STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
static
bool
bufferDataInFileBlockGap
(
int32_t
order
,
int64_t
key
,
SBlock
*
pBlock
)
{
bool
ascScan
=
ASCENDING_TRAVERSE
(
order
);
// // current block is done, try next
// if ((!cur->mixBlock) || cur->blockCompleted) {
// // all data blocks in current file has been checked already, try next file if exists
// } else {
// tsdbDebug("%p continue in current data block, index:%d, pos:%d, %s", pTsdbReadHandle, cur->slot, cur->pos,
// pTsdbReadHandle->idStr);
// int32_t code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlockInfo->compBlock, pCheckInfo);
// *exists = (pTsdbReadHandle->realNumOfRows > 0);
return
(
ascScan
&&
(
key
!=
TSKEY_INITIAL_VAL
&&
key
<=
pBlock
->
minKey
.
ts
))
||
(
!
ascScan
&&
(
key
!=
TSKEY_INITIAL_VAL
&&
key
>=
pBlock
->
maxKey
.
ts
));
}
// if (code != TSDB_CODE_SUCCESS || *exists) {
// return code;
// }
// }
static
int32_t
buildInmemDataBlock
(
STsdbReader
*
pReader
,
STableBlockScanInfo
*
pBlockScanInfo
,
SBlock
*
pBlock
,
TSDBKEY
*
key
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
// // current block is empty, try next block in file
// // all data blocks in current file has been checked already, try next file if exists
// if (isEndFileDataBlock(cur, pTsdbReadHandle->numOfBlocks, ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
// return getFirstFileDataBlock(pTsdbReadHandle, exists);
// } else {
// moveToNextDataBlockInCurrentFile(pTsdbReadHandle);
// STableBlockInfo* pNext = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
// return getDataBlock(pTsdbReadHandle, pNext, exists);
// }
// }
// }
bool
ascScan
=
ASCENDING_TRAVERSE
(
pReader
->
order
);
bool
cacheDataInFileBlockHole
=
(
ascScan
&&
(
key
->
ts
!=
TSKEY_INITIAL_VAL
&&
key
->
ts
<
pBlock
->
minKey
.
ts
))
||
(
!
ascScan
&&
(
key
->
ts
!=
TSKEY_INITIAL_VAL
&&
key
->
ts
>
pBlock
->
maxKey
.
ts
));
ASSERT
(
cacheDataInFileBlockHole
);
// do not load file block into buffer
int32_t
step
=
ascScan
?
1
:
-
1
;
TSDBKEY
maxKey
=
{.
version
=
pReader
->
endVersion
};
maxKey
.
ts
=
ascScan
?
(
pBlock
->
minKey
.
ts
-
step
)
:
(
pBlock
->
maxKey
.
ts
-
step
);
pBlockScanInfo
->
memHasVal
=
tsdbTbDataIterNext
(
&
pBlockScanInfo
->
iter
);
pBlockScanInfo
->
imemHasVal
=
tsdbTbDataIterNext
(
&
pBlockScanInfo
->
iiter
);
code
=
tsdbReadRowsFromCache
(
pBlockScanInfo
,
maxKey
,
pReader
->
capacity
,
pReader
);
return
code
;
}
static
int32_t
buildComposedDataBlock
(
STsdbReader
*
pReader
,
STableBlockScanInfo
*
pBlockScanInfo
)
{
SFileBlockDumpInfo
*
pDumpInfo
=
&
pReader
->
status
.
fBlockDumpInfo
;
SBlockData
*
pData
=
&
pReader
->
status
.
fileBlockData
;
STSchema
*
pSchema
=
NULL
;
SRowMerger
merge
=
{
0
};
int64_t
key
=
pData
->
aTSKEY
[
0
];
TSDBROW
*
pRow
=
getValidRow
(
&
pBlockScanInfo
->
iter
,
&
pBlockScanInfo
->
memHasVal
,
pReader
);
TSDBROW
*
piRow
=
getValidRow
(
&
pBlockScanInfo
->
iiter
,
&
pBlockScanInfo
->
imemHasVal
,
pReader
);
if
(
pBlockScanInfo
->
memHasVal
&&
pBlockScanInfo
->
imemHasVal
)
{
TSDBKEY
k
=
TSDBROW_KEY
(
pRow
);
TSDBKEY
ik
=
TSDBROW_KEY
(
piRow
);
// todo check version in file
if
(
key
<
k
.
ts
||
key
<
ik
.
ts
)
{
tRowMergerInit
(
&
merge
,
NULL
,
pSchema
);
STSRow
*
pTsRow
=
NULL
;
doLoadRowsOfIdenticalTsInFileBlock
(
pData
,
pDumpInfo
,
key
,
&
merge
,
pReader
,
&
pTsRow
);
}
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
loadDataInFiles
(
STsdbReader
*
pReader
,
bool
*
exists
)
{
SReaderStatus
*
pStatus
=
&
pReader
->
status
;
SFileSetIter
*
pFIter
=
&
pStatus
->
fileIter
;
if
(
pFIter
->
index
<
pFIter
->
numOfFiles
)
{
if
(
pReader
->
status
.
blockIter
.
index
==
-
1
)
{
int32_t
numOfBlocks
=
0
;
while
(
1
)
{
bool
hasNext
=
nextFilesetIterator
(
&
pStatus
->
fileIter
,
pReader
->
order
,
pReader
);
if
(
!
hasNext
)
{
// no data files on disk
break
;
}
SArray
*
pIndexList
=
taosArrayInit
(
4
,
sizeof
(
SBlockIdx
));
int32_t
code
=
doLoadBlockIndex
(
pReader
,
pReader
->
pFileReader
,
pIndexList
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
if
(
taosArrayGetSize
(
pIndexList
)
>
0
)
{
uint32_t
numOfValidTable
=
0
;
code
=
doLoadFileBlock
(
pReader
,
pIndexList
,
&
numOfValidTable
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
if
(
numOfValidTable
>
0
)
{
break
;
}
}
// no blocks in current file, try next files
}
SDataBlockIter
*
pBlockIter
=
&
pReader
->
status
.
blockIter
;
int32_t
code
=
initBlockIterator
(
pReader
,
pBlockIter
,
numOfBlocks
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
int64_t
key
=
0
;
// todo get the first qualified key in buffer
SFileDataBlockInfo
*
pFBlock
=
getCurrentBlockInfo
(
pBlockIter
);
STableBlockScanInfo
*
pScanInfo
=
taosHashGet
(
pReader
->
status
.
pTableMap
,
&
pFBlock
->
uid
,
sizeof
(
pFBlock
->
uid
));
SBlock
*
pBlock
=
taosArrayGet
(
pScanInfo
->
pBlockList
,
pFBlock
->
tbBlockIdx
);
if
(
pScanInfo
->
iterInit
==
false
)
{
STbData
*
d
=
NULL
;
tsdbGetTbDataFromMemTable
(
pReader
->
pTsdb
->
mem
,
pReader
->
suid
,
pScanInfo
->
uid
,
&
d
);
TSDBKEY
startKey
=
{.
ts
=
pReader
->
window
.
skey
,
.
version
=
pReader
->
startVersion
};
tsdbTbDataIterOpen
(
d
,
&
startKey
,
0
,
&
pScanInfo
->
iter
);
STbData
*
di
=
NULL
;
tsdbGetTbDataFromMemTable
(
pReader
->
pTsdb
->
imem
,
pReader
->
suid
,
pScanInfo
->
uid
,
&
di
);
tsdbTbDataIterOpen
(
di
,
&
startKey
,
0
,
&
pScanInfo
->
iiter
);
pScanInfo
->
iterInit
=
true
;
}
if
(
dataBlockPartialRequired
(
&
pReader
->
window
,
pBlock
)
||
overlapWithNeighborBlock
(
pBlock
,
pFBlock
->
tbBlockIdx
))
{
SBlockData
data
=
{
0
};
doLoadFileBlockData
(
pReader
,
pBlockIter
,
pScanInfo
,
&
data
);
// build composed data block
}
else
if
(
bufferDataInFileBlockGap
(
pReader
->
order
,
key
,
pBlock
))
{
// data in memory that are earlier than current file block
TSDBKEY
maxKey
=
{.
ts
=
pReader
->
window
.
ekey
,
.
version
=
pReader
->
endVersion
};
buildInmemDataBlock
(
pReader
,
pScanInfo
,
pBlock
,
&
maxKey
);
// build data block from in-memory buffer data completed.
}
else
{
// whole block is required, return it directly
SDataBlockInfo
info
=
{
0
};
info
.
rows
=
pBlock
->
nRow
;
info
.
uid
=
pScanInfo
->
uid
;
info
.
window
.
skey
=
pBlock
->
minKey
.
ts
;
info
.
window
.
ekey
=
pBlock
->
maxKey
.
ts
;
}
}
else
{
}
}
return
TSDB_CODE_SUCCESS
;
}
// static bool doHasDataInBuffer(STsdbReader* pTsdbReadHandle) {
// size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
...
...
@@ -2293,7 +2399,7 @@ _err:
// int32_t i = 0;
// while (i < numOfTables) {
// STable
Check
Info* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
// STable
BlockScan
Info* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
// // the first qualified table for interpolation query
// // if ((pTsdbReadHandle->window.skey <= pCheckInfo->pTableObj->lastKey) &&
...
...
@@ -2309,71 +2415,188 @@ _err:
// return;
// }
// STable
CheckInfo info = *(STableCheck
Info*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
// STable
BlockScanInfo info = *(STableBlockScan
Info*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
// taosArrayClear(pTsdbReadHandle->pTableCheckInfo);
// info.lastKey = pTsdbReadHandle->window.skey;
// taosArrayPush(pTsdbReadHandle->pTableCheckInfo, &info);
// }
// static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
// STsdbReader* pTsdbReadHandle) {
// int numOfRows = 0;
// int curRows = 0;
// int32_t numOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns);
// STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
// win->skey = TSKEY_INITIAL_VAL;
TSDBROW
*
getValidRow
(
STbDataIter
*
pIter
,
bool
*
hasVal
,
STsdbReader
*
pReader
)
{
if
(
!
hasVal
)
{
return
NULL
;
}
// int64_t st = taosGetTimestampUs();
// int16_t rv = -1;
// STSchema* pSchema = NULL;
// TSKEY lastRowKey = TSKEY_INITIAL_VAL;
TSDBROW
*
pRow
=
tsdbTbDataIterGet
(
pIter
);
// do {
// STSRow* row = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, NULL, TD_VER_MAX);
// if (row == NULL) {
// break
;
//
}
TSDBKEY
key
=
TSDBROW_KEY
(
pRow
);
if
(
key
.
ts
>
pReader
->
window
.
ekey
)
{
*
hasVal
=
false
;
return
NULL
;
}
// TSKEY key = TD_ROW_KEY(row);
// if ((key > maxKey && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
// (key < maxKey && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
// tsdbDebug("%p key:%" PRIu64 " beyond qrange:%" PRId64 " - %" PRId64 ", no more data in buffer",
// pTsdbReadHandle,
// key, pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey);
if
(
key
.
version
<=
pReader
->
endVersion
)
{
return
pRow
;
}
// break;
// }
while
(
1
)
{
*
hasVal
=
tsdbTbDataIterNext
(
pIter
);
if
(
!
(
*
hasVal
))
{
return
NULL
;
}
// if (win->skey == INT64_MIN) {
// win->skey = key;
// }
pRow
=
tsdbTbDataIterGet
(
pIter
);
// win->ekey = key;
// if (rv != TD_ROW_SVER(row)) {
// pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, TD_ROW_SVER(row));
// rv = TD_ROW_SVER(row);
// }
// numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, &curRows, row, NULL, numOfCols,
// pCheckInfo->tableId,
// pSchema, NULL, pCfg->update, &lastRowKey);
key
=
TSDBROW_KEY
(
pRow
);
if
(
key
.
ts
>
pReader
->
window
.
ekey
)
{
*
hasVal
=
false
;
return
NULL
;
}
// if (numOfRows >= maxRowsToRead) {
// moveToNextRowInMem(pCheckInfo);
// break;
// }
if
(
key
.
version
<=
pReader
->
endVersion
)
{
return
pRow
;
}
}
}
// } while (moveToNextRowInMem(pCheckInfo));
int32_t
doLoadRowsOfIdenticalTs
(
STbDataIter
*
pIter
,
bool
*
hasVal
,
int64_t
ts
,
SRowMerger
*
pMerger
,
STsdbReader
*
pReader
)
{
while
(
1
)
{
*
hasVal
=
tsdbTbDataIterNext
(
pIter
);
if
(
!*
hasVal
)
{
break
;
}
// taosMemoryFreeClear(pSchema); // free the STSChema
// assert(numOfRows <= maxRowsToRead);
TSDBROW
*
pRow
=
getValidRow
(
pIter
,
hasVal
,
pReader
);
TSDBKEY
k
=
TSDBROW_KEY
(
pRow
);
if
(
k
.
ts
>
ts
)
{
break
;
}
// int64_t elapsedTime = taosGetTimestampUs() - st;
// tsdbDebug("%p build data block from cache completed, elapsed time:%" PRId64 " us, numOfRows:%d, numOfCols:%d, %s",
// pTsdbReadHandle, elapsedTime, numOfRows, numOfCols, pTsdbReadHandle->idStr);
tRowMerge
(
pMerger
,
pRow
);
}
// return numOfRows;
// }
return
TSDB_CODE_SUCCESS
;
}
int32_t
doLoadRowsOfIdenticalTsInFileBlock
(
SBlockData
*
pData
,
SFileBlockDumpInfo
*
pDumpInfo
,
int64_t
ts
,
SRowMerger
*
pMerger
,
STsdbReader
*
pReader
,
STSRow
**
pRow
)
{
int64_t
key
=
pData
->
aTSKEY
[
pDumpInfo
->
rowIndex
];
if
((
pDumpInfo
->
rowIndex
<
pData
->
nRow
-
1
))
{
if
(
pData
->
aTSKEY
[
pDumpInfo
->
rowIndex
+
1
]
<
key
)
{
SRowMerger
merger
=
{
0
};
tRowMergerInit
(
&
merger
,
NULL
,
NULL
);
int32_t
rowIndex
=
pDumpInfo
->
rowIndex
+
1
;
while
(
pData
->
aTSKEY
[
rowIndex
]
==
key
)
{
tRowMerge
(
&
merger
,
NULL
);
}
tRowMergerGetRow
(
&
merger
,
pRow
);
tRowMergerClear
(
&
merger
);
}
}
else
{
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
tsdbGetNextRowInMem
(
STableBlockScanInfo
*
pBlockScanInfo
,
STsdbReader
*
pReader
,
STSRow
**
pTSRow
)
{
STSchema
*
pSchema
=
NULL
;
// todo set the correct schema
TSKEY
mergeTs
=
TSKEY_INITIAL_VAL
;
SRowMerger
merge
=
{
0
};
TSDBROW
*
pRow
=
getValidRow
(
&
pBlockScanInfo
->
iter
,
&
pBlockScanInfo
->
memHasVal
,
pReader
);
TSDBROW
*
piRow
=
getValidRow
(
&
pBlockScanInfo
->
iiter
,
&
pBlockScanInfo
->
imemHasVal
,
pReader
);
TSDBKEY
k
=
{.
ts
=
TSKEY_INITIAL_VAL
};
TSDBKEY
ik
=
{.
ts
=
TSKEY_INITIAL_VAL
};
if
(
pBlockScanInfo
->
memHasVal
&&
pBlockScanInfo
->
imemHasVal
)
{
k
=
TSDBROW_KEY
(
pRow
);
ik
=
TSDBROW_KEY
(
piRow
);
if
(
ik
.
ts
<=
k
.
ts
)
{
tRowMergerInit
(
&
merge
,
piRow
,
pSchema
);
doLoadRowsOfIdenticalTs
(
&
pBlockScanInfo
->
iiter
,
&
pBlockScanInfo
->
imemHasVal
,
ik
.
ts
,
&
merge
,
pReader
);
if
(
k
.
ts
==
mergeTs
)
{
doLoadRowsOfIdenticalTs
(
&
pBlockScanInfo
->
iter
,
&
pBlockScanInfo
->
memHasVal
,
k
.
ts
,
&
merge
,
pReader
);
}
tRowMergerGetRow
(
&
merge
,
pTSRow
);
return
TSDB_CODE_SUCCESS
;
}
else
{
// k.ts < ik.ts
tRowMergerInit
(
&
merge
,
pRow
,
pSchema
);
doLoadRowsOfIdenticalTs
(
&
pBlockScanInfo
->
iter
,
&
pBlockScanInfo
->
memHasVal
,
k
.
ts
,
&
merge
,
pReader
);
tRowMergerGetRow
(
&
merge
,
pTSRow
);
return
TSDB_CODE_SUCCESS
;
}
}
if
(
pBlockScanInfo
->
memHasVal
)
{
tRowMergerInit
(
&
merge
,
pRow
,
pSchema
);
doLoadRowsOfIdenticalTs
(
&
pBlockScanInfo
->
iter
,
&
pBlockScanInfo
->
memHasVal
,
k
.
ts
,
&
merge
,
pReader
);
tRowMergerGetRow
(
&
merge
,
pTSRow
);
return
TSDB_CODE_SUCCESS
;
}
if
(
pBlockScanInfo
->
imemHasVal
)
{
tRowMergerInit
(
&
merge
,
piRow
,
pSchema
);
doLoadRowsOfIdenticalTs
(
&
pBlockScanInfo
->
iiter
,
&
pBlockScanInfo
->
imemHasVal
,
ik
.
ts
,
&
merge
,
pReader
);
tRowMergerGetRow
(
&
merge
,
pTSRow
);
return
TSDB_CODE_SUCCESS
;
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
tsdbReadRowsFromCache
(
STableBlockScanInfo
*
pBlockScanInfo
,
TSDBKEY
maxKey
,
int32_t
capacity
,
STsdbReader
*
pReader
)
{
int32_t
numOfRows
=
0
;
int32_t
numOfCols
=
(
int32_t
)
taosArrayGetSize
(
pReader
->
pResBlock
->
pDataBlock
);
SSDataBlock
*
pBlock
=
pReader
->
pResBlock
;
int64_t
st
=
taosGetTimestampUs
();
STSchema
*
pSchema
=
NULL
;
do
{
STSRow
*
pTSRow
=
NULL
;
tsdbGetNextRowInMem
(
pBlockScanInfo
,
pReader
,
&
pTSRow
);
// todo assign to ssdatablock
if
(
pBlockScanInfo
->
memHasVal
)
{
TSDBROW
*
pRow
=
tsdbTbDataIterGet
(
&
pBlockScanInfo
->
iter
);
TSDBKEY
k
=
TSDBROW_KEY
(
pRow
);
if
(
k
.
ts
>=
maxKey
.
ts
)
{
break
;
}
}
if
(
pBlockScanInfo
->
imemHasVal
)
{
TSDBROW
*
pRow
=
tsdbTbDataIterGet
(
&
pBlockScanInfo
->
iiter
);
TSDBKEY
k
=
TSDBROW_KEY
(
pRow
);
if
(
k
.
ts
>=
maxKey
.
ts
)
{
break
;
}
}
// no data in buffer, return immediately
if
(
!
(
pBlockScanInfo
->
memHasVal
||
pBlockScanInfo
->
imemHasVal
))
{
break
;
}
if
(
numOfRows
>=
capacity
)
{
break
;
}
}
while
(
1
);
taosMemoryFreeClear
(
pSchema
);
assert
(
numOfRows
<=
capacity
);
int64_t
elapsedTime
=
taosGetTimestampUs
()
-
st
;
tsdbDebug
(
"%p build data block from cache completed, elapsed time:%"
PRId64
" us, numOfRows:%d, numOfCols:%d, %s"
,
pReader
,
elapsedTime
,
pBlock
->
info
.
rows
,
numOfCols
,
pReader
->
idStr
);
return
TSDB_CODE_SUCCESS
;
}
// static void destroyHelper(void* param) {
// if (param == NULL) {
...
...
@@ -2398,7 +2621,7 @@ _err:
// // check if the query range overlaps with the file data block
// bool exists = true;
// int32_t code =
getDataBlocks
InFiles(pTsdbReadHandle, &exists);
// int32_t code =
loadData
InFiles(pTsdbReadHandle, &exists);
// if (code != TSDB_CODE_SUCCESS) {
// pTsdbReadHandle->checkFiles = false;
// return false;
...
...
@@ -2459,7 +2682,7 @@ _err:
// int32_t curRow = 0;
// if (++pTsdbReadHandle->activeIndex < numOfTables) {
// STable
Check
Info* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
// STable
BlockScan
Info* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
// // int32_t ret = tsdbGetCachedLastRow(pCheckInfo->pTableObj, &pRow, &key);
// // if (ret != TSDB_CODE_SUCCESS) {
// // return false;
...
...
@@ -2494,7 +2717,7 @@ _err:
// return true;
// }
// STable
Check
Info* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
// STable
BlockScan
Info* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
// pCheckInfo->numOfBlocks = 0;
// pTsdbReadHandle->activeIndex += 1;
...
...
@@ -2543,17 +2766,6 @@ _err:
// return true;
// }
// static int tsdbCheckInfoCompar(const void* key1, const void* key2) {
// if (((STableCheckInfo*)key1)->tableId < ((STableCheckInfo*)key2)->tableId) {
// return -1;
// } else if (((STableCheckInfo*)key1)->tableId > ((STableCheckInfo*)key2)->tableId) {
// return 1;
// } else {
// ASSERT(false);
// return 0;
// }
// }
// static void* doFreeColumnInfoData(SArray* pColumnInfoData) {
// if (pColumnInfoData == NULL) {
// return NULL;
...
...
@@ -2572,7 +2784,7 @@ _err:
// static void* destroyTableCheckInfo(SArray* pTableCheckInfo) {
// size_t size = taosArrayGetSize(pTableCheckInfo);
// for (int32_t i = 0; i < size; ++i) {
// STable
Check
Info* p = taosArrayGet(pTableCheckInfo, i);
// STable
BlockScan
Info* p = taosArrayGet(pTableCheckInfo, i);
// destroyTableMemIterator(p);
// taosMemoryFreeClear(p->pCompInfo);
...
...
@@ -2583,25 +2795,31 @@ _err:
// }
// ====================================== EXPOSED APIs ======================================
int32_t
tsdbReaderOpen
(
SVnode
*
pVnode
,
SQueryTableDataCond
*
pCond
,
STableListInfo
*
tableList
,
uint64_t
qId
,
uint64_t
taskId
,
STsdbReader
**
ppReader
)
{
int32_t
code
=
0
;
int32_t
tsdbReaderOpen
(
SVnode
*
pVnode
,
SQueryTableDataCond
*
pCond
,
STableListInfo
*
pTableList
,
uint64_t
qId
,
uint64_t
taskId
,
STsdbReader
**
ppReader
)
{
char
buf
[
128
]
=
{
0
};
snprintf
(
buf
,
tListLen
(
buf
),
"TID:0x%"
PRIx64
" QID:0x%"
PRIx64
,
taskId
,
qId
)
;
code
=
tsdbReaderCreate
(
pVnode
,
pCond
,
qId
,
taskId
,
ppReader
);
if
(
code
)
goto
_err
;
int32_t
code
=
tsdbReaderCreate
(
pVnode
,
pCond
,
ppReader
,
buf
);
if
(
code
)
{
goto
_err
;
}
// if (emptyQueryTimewindow(pReader)) {
// return (STsdbReader*)pReader;
// }
STsdbReader
*
pReader
=
*
ppReader
;
if
(
isEmptyQueryTimeWindow
(
pReader
))
{
tsdbDebug
(
"%p query window not overlaps with the data set, no result returned, %s"
,
pReader
,
pReader
->
idStr
);
return
TSDB_CODE_SUCCESS
;
}
// // todo apply the lastkey of table check to avoid to load header file
// pReader->pTableCheckInfo = createCheckInfoFromTableGroup(pReader, tableList);
// if (pReader->pTableCheckInfo == NULL) {
// // tsdbReaderClose(pTsdbReadHandle);
// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
// return NULL;
// }
pReader
->
status
.
pTableMap
=
createDataBlockScanInfo
(
pReader
,
pCond
->
uidList
,
pCond
->
numOfTables
);
if
(
pReader
->
status
.
pTableMap
==
NULL
)
{
tsdbReaderClose
(
pReader
);
*
ppReader
=
NULL
;
code
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
goto
_err
;
}
#if 0
// int32_t code = setCurrentSchema(pVnode, pReader);
// if (code != TSDB_CODE_SUCCESS) {
// terrno = code;
...
...
@@ -2627,94 +2845,90 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInf
// return NULL;
// }
// }
#endif
// tsdbDebug("%p total numOfTable:%" PRIzu " in this query, table %" PRIzu " %s", pReader,
// taosArrayGetSize(pReader->pTableCheckInfo), taosArrayGetSize(tableList->pTableList), pReader->idStr);
tsdbDebug
(
"%p total numOfTable:%d in this query %s"
,
pReader
,
pCond
->
numOfTables
,
pReader
->
idStr
);
return
code
;
_err:
// tsdbError(""
);
tsdbError
(
"failed to create tsdb reader, code: %s %s"
,
tstrerror
(
code
),
pReader
->
idStr
);
return
code
;
}
void
tsdbReaderClose
(
STsdbReader
*
pReader
)
{
// if (pReader == NULL) {
// return;
// }
// pReader->pColumns = doFreeColumnInfoData(pReader->pColumns);
// taosArrayDestroy(pReader->suppInfo.defaultLoadColumn);
// taosMemoryFreeClear(pReader->pDataBlockInfo);
// taosMemoryFreeClear(pReader->suppInfo.pstatis);
// taosMemoryFreeClear(pReader->suppInfo.plist);
// taosMemoryFree(pReader->suppInfo.slotIds);
if
(
pReader
==
NULL
)
{
return
;
}
// if (!emptyQueryTimewindow(pReader)) {
// // tsdbMayUnTakeMemSnapshot(pTsdbReadHandle);
// } else {
// assert(pReader->pTableCheckInfo == NULL);
// }
blockDataDestroy
(
pReader
->
pResBlock
);
// if (pReader->pTableCheckInfo != NULL) {
// pReader->pTableCheckInfo = destroyTableCheckInfo(pReader->pTableCheckInfo
);
// }
taosMemoryFreeClear
(
pReader
->
suppInfo
.
pstatis
);
taosMemoryFreeClear
(
pReader
->
suppInfo
.
plist
);
taosMemoryFree
(
pReader
->
suppInfo
.
slotIds
);
// tsdbDestroyReadH(&pReader->rhelper);
if
(
!
isEmptyQueryTimeWindow
(
pReader
))
{
// tsdbMayUnTakeMemSnapshot(pTsdbReadHandle);
}
else
{
ASSERT
(
pReader
->
status
.
pTableMap
==
NULL
);
}
#if 0
// if (pReader->status.pTableScanInfo != NULL) {
// pReader->status.pTableScanInfo = destroyTableCheckInfo(pReader->status.pTableScanInfo);
// }
// tdFreeDataCols(pReader->pDataCols);
// pReader->pDataCols = NULL;
// tsdbDestroyReadH(&pReader->rhelper);
// pReader->prev = doFreeColumnInfoData(pReader->prev);
// pReader->next = doFreeColumnInfoData(pReader->next);
// tdFreeDataCols(pReader->pDataCols);
// pReader->pDataCols = NULL;
//
// pReader->prev = doFreeColumnInfoData(pReader->prev);
// pReader->next = doFreeColumnInfoData(pReader->next);
#endif
//
SIOCostSummary* pCost = &pReader->cost;
SIOCostSummary
*
pCost
=
&
pReader
->
cost
;
// tsdbDebug("%p :io-cost summary: head-file read cnt:%" PRIu64 ", head-file time:%" PRIu64 " us, statis-info:%"
// PRId64
// " us, datablock:%" PRId64 " us, check data:%" PRId64 " us, %s",
// pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->statisInfoLoadTime, pCost->blockLoadTime,
// pCost->checkForNextTime, pReader->idStr);
tsdbDebug
(
"%p :io-cost summary: head-file read cnt:%"
PRIu64
", head-file time:%"
PRIu64
" us, statis-info:%"
PRId64
" us, datablock:%"
PRId64
" us, check data:%"
PRId64
" us, %s"
,
pReader
,
pCost
->
headFileLoad
,
pCost
->
headFileLoadTime
,
pCost
->
statisInfoLoadTime
,
pCost
->
blockLoadTime
,
pCost
->
checkForNextTime
,
pReader
->
idStr
);
//
taosMemoryFree(pReader->idStr);
//
taosMemoryFree(pReader->pSchema);
//
taosMemoryFreeClear(pReader);
taosMemoryFree
(
pReader
->
idStr
);
taosMemoryFree
(
pReader
->
pSchema
);
taosMemoryFreeClear
(
pReader
);
}
bool
tsdbNextDataBlock
(
STsdbReader
*
pReader
)
{
bool
ret
=
false
;
// size_t numOfCols = taosArrayGetSize(pReader->pColumns);
// for (int32_t i = 0; i < numOfCols; ++i) {
// SColumnInfoData* pColInfo = taosArrayGet(pReader->pColumns, i);
// colInfoDataCleanup(pColInfo, pReader->outputCapacity);
// }
if
(
isEmptyQueryTimeWindow
(
pReader
))
{
return
false
;
}
// if (emptyQueryTimewindow(pReader)) {
// tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
// return false;
// }
// cleanup the data that belongs to the previous data block
blockDataCleanup
(
pReader
->
pResBlock
);
// int64_t stime = taosGetTimestampUs();
// int64_t elapsedTime = stime;
// // TODO refactor: remove "type"
// if (pReader->type == TSDB_QUERY_TYPE_LAST) {
// if (pReader->cachelastrow == TSDB_CACHED_TYPE_LASTROW) {
// // return loadCachedLastRow(pTsdbReadHandle);
// } else if (pReader->cachelastrow == TSDB_CACHED_TYPE_LAST) {
// // return loadCachedLast(pTsdbReadHandle);
// }
// }
int64_t
stime
=
taosGetTimestampUs
();
int64_t
elapsedTime
=
stime
;
if
(
pReader
->
type
==
BLOCK_LOAD_OFFSET_ORDER
)
{
if
(
pReader
->
status
.
loadFromFile
)
{
bool
exists
=
true
;
int32_t
code
=
loadDataInFiles
(
pReader
,
&
exists
);
}
else
{
// no data in files, let's try in-memory buffer
}
}
else
if
(
pReader
->
type
==
BLOCK_LOAD_TABLESEQ_ORDER
)
{
}
else
if
(
pReader
->
type
==
BLOCK_LOAD_EXTERN_ORDER
)
{
}
// if (pReader->loadType == BLOCK_LOAD_TABLE_SEQ_ORDER) {
// return loadDataBlockFromTableSeq(pReader);
// } else { // loadType == RR and Offset Order
// if (pReader->checkFiles) {
// // check if the query range overlaps with the file data block
// bool exists = true;
// int32_t code = getDataBlocksInFiles(pReader, &exists);
// int32_t code = loadDataInFiles(pReader, &exists);
// if (code != TSDB_CODE_SUCCESS) {
// pReader->activeIndex = 0;
// pReader->checkFiles = false;
...
...
@@ -2749,10 +2963,10 @@ void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockI
// // there are data in file
// if (pReader->cur.fid != INT32_MIN) {
// S
Tab
leBlockInfo* pBlockInfo = &pReader->pDataBlockInfo[cur->slot];
// S
Fi
leBlockInfo* pBlockInfo = &pReader->pDataBlockInfo[cur->slot];
// uid = pBlockInfo->pTableCheckInfo->tableId;
// } else {
// STable
Check
Info* pCheckInfo = taosArrayGet(pReader->pTableCheckInfo, pReader->activeIndex);
// STable
BlockScan
Info* pCheckInfo = taosArrayGet(pReader->pTableCheckInfo, pReader->activeIndex);
// uid = pCheckInfo->tableId;
// }
...
...
@@ -2781,7 +2995,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader* pReader, SColumnDataAgg***
// return TSDB_CODE_SUCCESS;
// }
// S
Tab
leBlockInfo* pBlockInfo = &pReader->pDataBlockInfo[c->slot];
// S
Fi
leBlockInfo* pBlockInfo = &pReader->pDataBlockInfo[c->slot];
// assert((c->slot >= 0 && c->slot < pReader->numOfBlocks) || ((c->slot == pReader->numOfBlocks) && (c->slot == 0)));
// // file block with sub-blocks has no statistics data
...
...
@@ -2854,8 +3068,8 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
// if (pReader->cur.fid == INT32_MIN) {
// return pReader->pColumns;
// } else {
// S
Tab
leBlockInfo* pBlockInfo = &pReader->pDataBlockInfo[pReader->cur.slot];
// STable
Check
Info* pCheckInfo = pBlockInfo->pTableCheckInfo;
// S
Fi
leBlockInfo* pBlockInfo = &pReader->pDataBlockInfo[pReader->cur.slot];
// STable
BlockScan
Info* pCheckInfo = pBlockInfo->pTableCheckInfo;
// if (pReader->cur.mixBlock) {
// return pReader->pColumns;
...
...
@@ -2871,7 +3085,7 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
// return pReader->pColumns;
// } else { // only load the file block
// SBlock* pBlock = pBlockInfo->compBlock;
// if (doLoadFile
DataBlock
(pReader, pBlock, pCheckInfo, pReader->cur.slot) != TSDB_CODE_SUCCESS) {
// if (doLoadFile
BlockData
(pReader, pBlock, pCheckInfo, pReader->cur.slot) != TSDB_CODE_SUCCESS) {
// return NULL;
// }
...
...
@@ -2884,7 +3098,7 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
}
void
tsdbResetReadHandle
(
STsdbReader
*
pReader
,
SQueryTableDataCond
*
pCond
,
int32_t
tWinIdx
)
{
// if (
emptyQueryTimew
indow(pReader)) {
// if (
isEmptyQueryTimeW
indow(pReader)) {
// if (pCond->order != pReader->order) {
// pReader->order = pCond->order;
// TSWAP(pReader->window.skey, pReader->window.ekey);
...
...
@@ -2998,7 +3212,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
// pTableBlockInfo->numOfBlocks += numOfBlocks;
// for (int32_t i = 0; i < numOfTables; ++i) {
// STable
Check
Info* pCheckInfo = taosArrayGet(pReader->pTableCheckInfo, i);
// STable
BlockScan
Info* pCheckInfo = taosArrayGet(pReader->pTableCheckInfo, i);
// SBlock* pBlock = pCheckInfo->pCompInfo->blocks;
...
...
@@ -3040,7 +3254,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
// size_t size = taosArrayGetSize(pReader->pTableCheckInfo);
// for (int32_t i = 0; i < size; ++i) {
// STable
Check
Info* pCheckInfo = taosArrayGet(pReader->pTableCheckInfo, i);
// STable
BlockScan
Info* pCheckInfo = taosArrayGet(pReader->pTableCheckInfo, i);
// // if (pMemT && pCheckInfo->tableId < pMemT->maxTables) {
// // pMem = pMemT->tData[pCheckInfo->tableId];
...
...
source/libs/executor/src/executil.c
浏览文件 @
dbd8c300
...
...
@@ -620,8 +620,6 @@ SColumn extractColumnFromColumnNode(SColumnNode* pColNode) {
}
int32_t
initQueryTableDataCond
(
SQueryTableDataCond
*
pCond
,
const
STableScanPhysiNode
*
pTableScanNode
)
{
pCond
->
loadExternalRows
=
false
;
pCond
->
order
=
pTableScanNode
->
scanSeq
[
0
]
>
0
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
pCond
->
numOfCols
=
LIST_LENGTH
(
pTableScanNode
->
scan
.
pScanCols
);
pCond
->
colList
=
taosMemoryCalloc
(
pCond
->
numOfCols
,
sizeof
(
SColumnInfo
));
...
...
@@ -647,15 +645,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
}
#endif
for
(
int32_t
i
=
0
;
i
<
pCond
->
numOfTWindows
;
++
i
)
{
if
((
pCond
->
order
==
TSDB_ORDER_ASC
&&
pCond
->
twindows
[
i
].
skey
>
pCond
->
twindows
[
i
].
ekey
)
||
(
pCond
->
order
==
TSDB_ORDER_DESC
&&
pCond
->
twindows
[
i
].
skey
<
pCond
->
twindows
[
i
].
ekey
))
{
TSWAP
(
pCond
->
twindows
[
i
].
skey
,
pCond
->
twindows
[
i
].
ekey
);
}
}
taosqsort
(
pCond
->
twindows
,
pCond
->
numOfTWindows
,
sizeof
(
STimeWindow
),
pCond
,
compareTimeWindow
);
pCond
->
type
=
BLOCK_LOAD_OFFSET_SEQ_ORDER
;
pCond
->
type
=
BLOCK_LOAD_OFFSET_ORDER
;
// pCond->type = pTableScanNode->scanFlag;
int32_t
j
=
0
;
...
...
@@ -677,6 +667,5 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
}
void
cleanupQueryTableDataCond
(
SQueryTableDataCond
*
pCond
)
{
taosMemoryFree
(
pCond
->
twindows
);
taosMemoryFree
(
pCond
->
colList
);
}
\ No newline at end of file
source/libs/executor/src/executorimpl.c
浏览文件 @
dbd8c300
...
...
@@ -4306,7 +4306,11 @@ STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle
}
STsdbReader
*
pReader
;
tsdbReaderOpen
(
pHandle
->
vnode
,
&
cond
,
pTableListInfo
,
queryId
,
taskId
,
&
pReader
);
code
=
tsdbReaderOpen
(
pHandle
->
vnode
,
&
cond
,
pTableListInfo
,
queryId
,
taskId
,
&
pReader
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
cleanupQueryTableDataCond
(
&
cond
);
return
pReader
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录