Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
85afbf0d
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
85afbf0d
编写于
5月 23, 2023
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor: do some internal refactor.
上级
a63473b1
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
78 addition
and
77 deletion
+78
-77
include/libs/executor/storageapi.h
include/libs/executor/storageapi.h
+18
-17
source/libs/executor/inc/executorInt.h
source/libs/executor/inc/executorInt.h
+2
-2
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+8
-8
source/libs/executor/src/executorInt.c
source/libs/executor/src/executorInt.c
+1
-1
source/libs/executor/src/operator.c
source/libs/executor/src/operator.c
+2
-2
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+44
-44
source/libs/executor/src/sysscanoperator.c
source/libs/executor/src/sysscanoperator.c
+3
-3
未找到文件。
include/libs/executor/storageapi.h
浏览文件 @
85afbf0d
...
...
@@ -225,22 +225,23 @@ typedef int32_t (*__store_reader_open_fn_t)(void *pVnode, SQueryTableDataCond *p
int32_t
numOfTables
,
SSDataBlock
*
pResBlock
,
void
**
ppReader
,
const
char
*
idstr
,
bool
countOnly
,
SHashObj
**
pIgnoreTables
);
typedef
struct
SStoreTSDReader
{
__store_reader_open_fn_t
storeReaderOpen
;
void
(
*
storeReaderClose
)();
void
(
*
setReaderId
)(
void
*
pReader
,
const
char
*
pId
);
void
(
*
storeReaderSetTableList
)();
int32_t
(
*
storeReaderNextDataBlock
)();
int32_t
(
*
storeReaderRetrieveBlockSMA
)();
SSDataBlock
*
(
*
storeReaderRetrieveDataBlock
)();
void
(
*
storeReaderReleaseDataBlock
)();
void
(
*
storeReaderResetStatus
)();
void
(
*
storeReaderGetDataBlockDistInfo
)();
void
(
*
storeReaderGetNumOfInMemRows
)();
void
(
*
storeReaderNotifyClosing
)();
}
SStoreTSDReader
;
typedef
struct
TsdReader
{
__store_reader_open_fn_t
tsdReaderOpen
;
void
(
*
tsdReaderClose
)();
void
(
*
tsdSetReaderTaskId
)(
void
*
pReader
,
const
char
*
pId
);
void
(
*
tsdSetQueryTableList
)();
int32_t
(
*
tsdReaderNextDataBlock
)();
int32_t
(
*
tsdReaderRetrieveBlockSMAInfo
)();
SSDataBlock
*
(
*
tsdReaderRetrieveDataBlock
)();
void
(
*
tsdReaderReleaseDataBlock
)();
void
(
*
tsdReaderResetStatus
)();
void
(
*
tsdReaderGetDataBlockDistInfo
)();
void
(
*
tsdReaderGetNumOfInMemRows
)();
void
(
*
tsdReaderNotifyClosing
)();
}
TsdReader
;
/**
* int32_t tsdbReuseCacherowsReader(void* pReader, void* pTableIdList, int32_t numOfTables);
...
...
@@ -551,7 +552,7 @@ typedef struct SStateStore {
typedef
struct
SStorageAPI
{
SStoreMeta
metaFn
;
// todo: refactor
SStoreTSD
Reader
tsdReader
;
Tsd
Reader
tsdReader
;
SStoreMetaReader
metaReaderFn
;
SStoreCacheReader
cacheFn
;
SStoreSnapshotFn
snapshotFn
;
...
...
source/libs/executor/inc/executorInt.h
浏览文件 @
85afbf0d
...
...
@@ -208,7 +208,7 @@ typedef struct STableScanBase {
SLimitInfo
limitInfo
;
// there are more than one table list exists in one task, if only one vnode exists.
STableListInfo
*
pTableListInfo
;
SStoreTSD
Reader
readerAPI
;
Tsd
Reader
readerAPI
;
}
STableScanBase
;
typedef
struct
STableScanInfo
{
...
...
@@ -224,7 +224,7 @@ typedef struct STableScanInfo {
int8_t
assignBlockUid
;
bool
hasGroupByTag
;
bool
countOnly
;
SStoreTSD
Reader
readerAPI
;
Tsd
Reader
readerAPI
;
}
STableScanInfo
;
typedef
struct
STableMergeScanInfo
{
...
...
source/libs/executor/src/executor.c
浏览文件 @
85afbf0d
...
...
@@ -166,7 +166,7 @@ void doSetTaskId(SOperatorInfo* pOperator, SStorageAPI *pAPI) {
if
(
pStreamScanInfo
->
pTableScanOp
!=
NULL
)
{
STableScanInfo
*
pScanInfo
=
pStreamScanInfo
->
pTableScanOp
->
info
;
if
(
pScanInfo
->
base
.
dataReader
!=
NULL
)
{
pAPI
->
tsdReader
.
setReader
Id
(
pScanInfo
->
base
.
dataReader
,
pTaskInfo
->
id
.
str
);
pAPI
->
tsdReader
.
tsdSetReaderTask
Id
(
pScanInfo
->
base
.
dataReader
,
pTaskInfo
->
id
.
str
);
}
}
}
else
{
...
...
@@ -1087,7 +1087,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
if
(
pOffset
->
type
==
TMQ_OFFSET__LOG
)
{
// todo refactor: move away
pTaskInfo
->
storageAPI
.
tsdReader
.
store
ReaderClose
(
pScanBaseInfo
->
dataReader
);
pTaskInfo
->
storageAPI
.
tsdReader
.
tsd
ReaderClose
(
pScanBaseInfo
->
dataReader
);
pScanBaseInfo
->
dataReader
=
NULL
;
ASSERT
(
0
);
...
...
@@ -1148,7 +1148,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
pScanInfo
->
scanTimes
=
0
;
if
(
pScanBaseInfo
->
dataReader
==
NULL
)
{
int32_t
code
=
pTaskInfo
->
storageAPI
.
tsdReader
.
store
ReaderOpen
(
pScanBaseInfo
->
readHandle
.
vnode
,
&
pScanBaseInfo
->
cond
,
&
keyInfo
,
1
,
int32_t
code
=
pTaskInfo
->
storageAPI
.
tsdReader
.
tsd
ReaderOpen
(
pScanBaseInfo
->
readHandle
.
vnode
,
&
pScanBaseInfo
->
cond
,
&
keyInfo
,
1
,
pScanInfo
->
pResBlock
,
(
void
**
)
&
pScanBaseInfo
->
dataReader
,
id
,
false
,
NULL
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"prepare read tsdb snapshot failed, uid:%"
PRId64
", code:%s %s"
,
pOffset
->
uid
,
tstrerror
(
code
),
id
);
...
...
@@ -1159,8 +1159,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
qDebug
(
"tsdb reader created with offset(snapshot) uid:%"
PRId64
" ts:%"
PRId64
" table index:%d, total:%d, %s"
,
uid
,
pScanBaseInfo
->
cond
.
twindows
.
skey
,
pScanInfo
->
currentTable
,
numOfTables
,
id
);
}
else
{
pTaskInfo
->
storageAPI
.
tsdReader
.
storeReaderSet
TableList
(
pScanBaseInfo
->
dataReader
,
&
keyInfo
,
1
);
pTaskInfo
->
storageAPI
.
tsdReader
.
store
ReaderResetStatus
(
pScanBaseInfo
->
dataReader
,
&
pScanBaseInfo
->
cond
);
pTaskInfo
->
storageAPI
.
tsdReader
.
tsdSetQuery
TableList
(
pScanBaseInfo
->
dataReader
,
&
keyInfo
,
1
);
pTaskInfo
->
storageAPI
.
tsdReader
.
tsd
ReaderResetStatus
(
pScanBaseInfo
->
dataReader
,
&
pScanBaseInfo
->
cond
);
qDebug
(
"tsdb reader offset seek snapshot to uid:%"
PRId64
" ts %"
PRId64
" table index:%d numOfTable:%d, %s"
,
uid
,
pScanBaseInfo
->
cond
.
twindows
.
skey
,
pScanInfo
->
currentTable
,
numOfTables
,
id
);
}
...
...
@@ -1189,7 +1189,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
}
SMetaTableInfo
mtInfo
=
pTaskInfo
->
storageAPI
.
snapshotFn
.
getTableInfoFromSnapshot
(
sContext
);
pTaskInfo
->
storageAPI
.
tsdReader
.
store
ReaderClose
(
pInfo
->
dataReader
);
pTaskInfo
->
storageAPI
.
tsdReader
.
tsd
ReaderClose
(
pInfo
->
dataReader
);
pInfo
->
dataReader
=
NULL
;
cleanupQueryTableDataCond
(
&
pTaskInfo
->
streamInfo
.
tableCond
);
...
...
@@ -1207,7 +1207,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
STableKeyInfo
*
pList
=
tableListGetInfo
(
pTableListInfo
,
0
);
int32_t
size
=
tableListGetSize
(
pTableListInfo
);
pTaskInfo
->
storageAPI
.
tsdReader
.
store
ReaderOpen
(
pInfo
->
vnode
,
&
pTaskInfo
->
streamInfo
.
tableCond
,
pList
,
size
,
NULL
,
(
void
**
)
&
pInfo
->
dataReader
,
NULL
,
pTaskInfo
->
storageAPI
.
tsdReader
.
tsd
ReaderOpen
(
pInfo
->
vnode
,
&
pTaskInfo
->
streamInfo
.
tableCond
,
pList
,
size
,
NULL
,
(
void
**
)
&
pInfo
->
dataReader
,
NULL
,
false
,
NULL
);
cleanupQueryTableDataCond
(
&
pTaskInfo
->
streamInfo
.
tableCond
);
...
...
@@ -1228,7 +1228,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
id
);
}
else
if
(
pOffset
->
type
==
TMQ_OFFSET__LOG
)
{
SStreamRawScanInfo
*
pInfo
=
pOperator
->
info
;
pTaskInfo
->
storageAPI
.
tsdReader
.
store
ReaderClose
(
pInfo
->
dataReader
);
pTaskInfo
->
storageAPI
.
tsdReader
.
tsd
ReaderClose
(
pInfo
->
dataReader
);
pInfo
->
dataReader
=
NULL
;
qDebug
(
"tmqsnap qStreamPrepareScan snapshot log, %s"
,
id
);
}
...
...
source/libs/executor/src/executorInt.c
浏览文件 @
85afbf0d
...
...
@@ -1195,7 +1195,7 @@ void qStreamCloseTsdbReader(void* task) {
qDebug
(
"wait for the reader stopping"
);
}
pTaskInfo
->
storageAPI
.
tsdReader
.
store
ReaderClose
(
pTSInfo
->
base
.
dataReader
);
pTaskInfo
->
storageAPI
.
tsdReader
.
tsd
ReaderClose
(
pTSInfo
->
base
.
dataReader
);
pTSInfo
->
base
.
dataReader
=
NULL
;
// restore the status, todo refactor.
...
...
source/libs/executor/src/operator.c
浏览文件 @
85afbf0d
...
...
@@ -239,7 +239,7 @@ static ERetType doStopDataReader(SOperatorInfo* pOperator, STraverParam* pParam,
STableScanInfo
*
pInfo
=
pOperator
->
info
;
if
(
pInfo
->
base
.
dataReader
!=
NULL
)
{
pAPI
->
tsdReader
.
store
ReaderNotifyClosing
(
pInfo
->
base
.
dataReader
);
pAPI
->
tsdReader
.
tsd
ReaderNotifyClosing
(
pInfo
->
base
.
dataReader
);
}
return
OPTR_FN_RET_ABORT
;
}
else
if
(
pOperator
->
operatorType
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
...
...
@@ -248,7 +248,7 @@ static ERetType doStopDataReader(SOperatorInfo* pOperator, STraverParam* pParam,
if
(
pInfo
->
pTableScanOp
!=
NULL
)
{
STableScanInfo
*
pTableScanInfo
=
pInfo
->
pTableScanOp
->
info
;
if
(
pTableScanInfo
!=
NULL
&&
pTableScanInfo
->
base
.
dataReader
!=
NULL
)
{
pAPI
->
tsdReader
.
store
ReaderNotifyClosing
(
pTableScanInfo
->
base
.
dataReader
);
pAPI
->
tsdReader
.
tsd
ReaderNotifyClosing
(
pTableScanInfo
->
base
.
dataReader
);
}
}
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
85afbf0d
...
...
@@ -267,7 +267,7 @@ static bool doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBlock,
SStorageAPI
*
pAPI
=
&
pTaskInfo
->
storageAPI
;
bool
allColumnsHaveAgg
=
true
;
int32_t
code
=
pAPI
->
tsdReader
.
storeReaderRetrieveBlockSMA
(
pTableScanInfo
->
dataReader
,
pBlock
,
&
allColumnsHaveAgg
);
int32_t
code
=
pAPI
->
tsdReader
.
tsdReaderRetrieveBlockSMAInfo
(
pTableScanInfo
->
dataReader
,
pBlock
,
&
allColumnsHaveAgg
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
...
...
@@ -350,7 +350,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
pBlockInfo
->
rows
);
pCost
->
filterOutBlocks
+=
1
;
pCost
->
totalRows
+=
pBlock
->
info
.
rows
;
pAPI
->
tsdReader
.
store
ReaderReleaseDataBlock
(
pTableScanInfo
->
dataReader
);
pAPI
->
tsdReader
.
tsd
ReaderReleaseDataBlock
(
pTableScanInfo
->
dataReader
);
return
TSDB_CODE_SUCCESS
;
}
else
if
(
*
status
==
FUNC_DATA_REQUIRED_NOT_LOAD
)
{
qDebug
(
"%s data block skipped, brange:%"
PRId64
"-%"
PRId64
", rows:%"
PRId64
", uid:%"
PRIu64
,
...
...
@@ -358,7 +358,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
pBlockInfo
->
id
.
uid
);
doSetTagColumnData
(
pTableScanInfo
,
pBlock
,
pTaskInfo
,
pBlock
->
info
.
rows
);
pCost
->
skipBlocks
+=
1
;
pAPI
->
tsdReader
.
store
ReaderReleaseDataBlock
(
pTableScanInfo
->
dataReader
);
pAPI
->
tsdReader
.
tsd
ReaderReleaseDataBlock
(
pTableScanInfo
->
dataReader
);
return
TSDB_CODE_SUCCESS
;
}
else
if
(
*
status
==
FUNC_DATA_REQUIRED_SMA_LOAD
)
{
pCost
->
loadBlockStatis
+=
1
;
...
...
@@ -368,7 +368,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
qDebug
(
"%s data block SMA loaded, brange:%"
PRId64
"-%"
PRId64
", rows:%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
pBlockInfo
->
rows
);
doSetTagColumnData
(
pTableScanInfo
,
pBlock
,
pTaskInfo
,
pBlock
->
info
.
rows
);
pAPI
->
tsdReader
.
store
ReaderReleaseDataBlock
(
pTableScanInfo
->
dataReader
);
pAPI
->
tsdReader
.
tsd
ReaderReleaseDataBlock
(
pTableScanInfo
->
dataReader
);
return
TSDB_CODE_SUCCESS
;
}
else
{
qDebug
(
"%s failed to load SMA, since not all columns have SMA"
,
GET_TASKID
(
pTaskInfo
));
...
...
@@ -390,7 +390,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
pCost
->
filterOutBlocks
+=
1
;
(
*
status
)
=
FUNC_DATA_REQUIRED_FILTEROUT
;
pAPI
->
tsdReader
.
store
ReaderReleaseDataBlock
(
pTableScanInfo
->
dataReader
);
pAPI
->
tsdReader
.
tsd
ReaderReleaseDataBlock
(
pTableScanInfo
->
dataReader
);
return
TSDB_CODE_SUCCESS
;
}
}
...
...
@@ -405,7 +405,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
qDebug
(
"%s data block skipped due to dynamic prune, brange:%"
PRId64
"-%"
PRId64
", rows:%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
pBlockInfo
->
rows
);
pCost
->
skipBlocks
+=
1
;
pAPI
->
tsdReader
.
store
ReaderReleaseDataBlock
(
pTableScanInfo
->
dataReader
);
pAPI
->
tsdReader
.
tsd
ReaderReleaseDataBlock
(
pTableScanInfo
->
dataReader
);
STableScanInfo
*
p1
=
pOperator
->
info
;
if
(
taosHashGetSize
(
p1
->
pIgnoreTables
)
==
taosArrayGetSize
(
p1
->
base
.
pTableListInfo
->
pTableList
))
{
...
...
@@ -419,7 +419,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
pCost
->
totalCheckedRows
+=
pBlock
->
info
.
rows
;
pCost
->
loadBlocks
+=
1
;
SSDataBlock
*
p
=
pAPI
->
tsdReader
.
store
ReaderRetrieveDataBlock
(
pTableScanInfo
->
dataReader
,
NULL
);
SSDataBlock
*
p
=
pAPI
->
tsdReader
.
tsd
ReaderRetrieveDataBlock
(
pTableScanInfo
->
dataReader
,
NULL
);
if
(
p
==
NULL
)
{
return
terrno
;
}
...
...
@@ -693,9 +693,9 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
int64_t
st
=
taosGetTimestampUs
();
while
(
true
)
{
code
=
pAPI
->
tsdReader
.
store
ReaderNextDataBlock
(
pTableScanInfo
->
base
.
dataReader
,
&
hasNext
);
code
=
pAPI
->
tsdReader
.
tsd
ReaderNextDataBlock
(
pTableScanInfo
->
base
.
dataReader
,
&
hasNext
);
if
(
code
)
{
pAPI
->
tsdReader
.
store
ReaderReleaseDataBlock
(
pTableScanInfo
->
base
.
dataReader
);
pAPI
->
tsdReader
.
tsd
ReaderReleaseDataBlock
(
pTableScanInfo
->
base
.
dataReader
);
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
...
...
@@ -704,12 +704,12 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
}
if
(
isTaskKilled
(
pTaskInfo
))
{
pAPI
->
tsdReader
.
store
ReaderReleaseDataBlock
(
pTableScanInfo
->
base
.
dataReader
);
pAPI
->
tsdReader
.
tsd
ReaderReleaseDataBlock
(
pTableScanInfo
->
base
.
dataReader
);
T_LONG_JMP
(
pTaskInfo
->
env
,
pTaskInfo
->
code
);
}
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
pAPI
->
tsdReader
.
store
ReaderReleaseDataBlock
(
pTableScanInfo
->
base
.
dataReader
);
pAPI
->
tsdReader
.
tsd
ReaderReleaseDataBlock
(
pTableScanInfo
->
base
.
dataReader
);
break
;
}
...
...
@@ -774,7 +774,7 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
qDebug
(
"start to repeat ascending order scan data blocks due to query func required, %s"
,
GET_TASKID
(
pTaskInfo
));
// do prepare for the next round table scan operation
pAPI
->
tsdReader
.
store
ReaderResetStatus
(
pTableScanInfo
->
base
.
dataReader
,
&
pTableScanInfo
->
base
.
cond
);
pAPI
->
tsdReader
.
tsd
ReaderResetStatus
(
pTableScanInfo
->
base
.
dataReader
,
&
pTableScanInfo
->
base
.
cond
);
}
}
...
...
@@ -782,7 +782,7 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
if
(
pTableScanInfo
->
scanTimes
<
total
)
{
if
(
pTableScanInfo
->
base
.
cond
.
order
==
TSDB_ORDER_ASC
)
{
prepareForDescendingScan
(
&
pTableScanInfo
->
base
,
pOperator
->
exprSupp
.
pCtx
,
0
);
pAPI
->
tsdReader
.
store
ReaderResetStatus
(
pTableScanInfo
->
base
.
dataReader
,
&
pTableScanInfo
->
base
.
cond
);
pAPI
->
tsdReader
.
tsd
ReaderResetStatus
(
pTableScanInfo
->
base
.
dataReader
,
&
pTableScanInfo
->
base
.
cond
);
qDebug
(
"%s start to descending order scan data blocks due to query func required"
,
GET_TASKID
(
pTaskInfo
));
}
...
...
@@ -800,7 +800,7 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
pTableScanInfo
->
base
.
scanFlag
=
MAIN_SCAN
;
qDebug
(
"%s start to repeat descending order scan data blocks"
,
GET_TASKID
(
pTaskInfo
));
pAPI
->
tsdReader
.
store
ReaderResetStatus
(
pTableScanInfo
->
base
.
dataReader
,
&
pTableScanInfo
->
base
.
cond
);
pAPI
->
tsdReader
.
tsd
ReaderResetStatus
(
pTableScanInfo
->
base
.
dataReader
,
&
pTableScanInfo
->
base
.
cond
);
}
}
}
...
...
@@ -839,11 +839,11 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
tInfo
=
*
(
STableKeyInfo
*
)
tableListGetInfo
(
pInfo
->
base
.
pTableListInfo
,
pInfo
->
currentTable
);
taosRUnLockLatch
(
&
pTaskInfo
->
lock
);
pAPI
->
tsdReader
.
storeReaderSet
TableList
(
pInfo
->
base
.
dataReader
,
&
tInfo
,
1
);
pAPI
->
tsdReader
.
tsdSetQuery
TableList
(
pInfo
->
base
.
dataReader
,
&
tInfo
,
1
);
qDebug
(
"set uid:%"
PRIu64
" into scanner, total tables:%d, index:%d/%d %s"
,
tInfo
.
uid
,
numOfTables
,
pInfo
->
currentTable
,
numOfTables
,
GET_TASKID
(
pTaskInfo
));
pAPI
->
tsdReader
.
store
ReaderResetStatus
(
pInfo
->
base
.
dataReader
,
&
pInfo
->
base
.
cond
);
pAPI
->
tsdReader
.
tsd
ReaderResetStatus
(
pInfo
->
base
.
dataReader
,
&
pInfo
->
base
.
cond
);
pInfo
->
scanTimes
=
0
;
}
}
else
{
// scan table group by group sequentially
...
...
@@ -858,7 +858,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
tableListGetGroupList
(
pInfo
->
base
.
pTableListInfo
,
pInfo
->
currentGroupId
,
&
pList
,
&
num
);
ASSERT
(
pInfo
->
base
.
dataReader
==
NULL
);
int32_t
code
=
pAPI
->
tsdReader
.
store
ReaderOpen
(
pInfo
->
base
.
readHandle
.
vnode
,
&
pInfo
->
base
.
cond
,
pList
,
num
,
pInfo
->
pResBlock
,
int32_t
code
=
pAPI
->
tsdReader
.
tsd
ReaderOpen
(
pInfo
->
base
.
readHandle
.
vnode
,
&
pInfo
->
base
.
cond
,
pList
,
num
,
pInfo
->
pResBlock
,
(
void
**
)
&
pInfo
->
base
.
dataReader
,
GET_TASKID
(
pTaskInfo
),
pInfo
->
countOnly
,
&
pInfo
->
pIgnoreTables
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
...
...
@@ -887,8 +887,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
STableKeyInfo
*
pList
=
NULL
;
tableListGetGroupList
(
pInfo
->
base
.
pTableListInfo
,
pInfo
->
currentGroupId
,
&
pList
,
&
num
);
pAPI
->
tsdReader
.
storeReaderSet
TableList
(
pInfo
->
base
.
dataReader
,
pList
,
num
);
pAPI
->
tsdReader
.
store
ReaderResetStatus
(
pInfo
->
base
.
dataReader
,
&
pInfo
->
base
.
cond
);
pAPI
->
tsdReader
.
tsdSetQuery
TableList
(
pInfo
->
base
.
dataReader
,
pList
,
num
);
pAPI
->
tsdReader
.
tsd
ReaderResetStatus
(
pInfo
->
base
.
dataReader
,
&
pInfo
->
base
.
cond
);
pInfo
->
scanTimes
=
0
;
result
=
doGroupedTableScan
(
pOperator
);
...
...
@@ -910,10 +910,10 @@ static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptr
return
0
;
}
static
void
destroyTableScanBase
(
STableScanBase
*
pBase
,
SStoreTSD
Reader
*
pAPI
)
{
static
void
destroyTableScanBase
(
STableScanBase
*
pBase
,
Tsd
Reader
*
pAPI
)
{
cleanupQueryTableDataCond
(
&
pBase
->
cond
);
pAPI
->
store
ReaderClose
(
pBase
->
dataReader
);
pAPI
->
tsd
ReaderClose
(
pBase
->
dataReader
);
pBase
->
dataReader
=
NULL
;
if
(
pBase
->
matchInfo
.
pList
!=
NULL
)
{
...
...
@@ -1074,7 +1074,7 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin, uint6
pTableScanInfo
->
base
.
cond
.
endVersion
=
version
;
pTableScanInfo
->
scanTimes
=
0
;
pTableScanInfo
->
currentGroupId
=
-
1
;
pTableScanInfo
->
readerAPI
.
store
ReaderClose
(
pTableScanInfo
->
base
.
dataReader
);
pTableScanInfo
->
readerAPI
.
tsd
ReaderClose
(
pTableScanInfo
->
base
.
dataReader
);
pTableScanInfo
->
base
.
dataReader
=
NULL
;
}
...
...
@@ -1094,7 +1094,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
SSDataBlock
*
pBlock
=
pTableScanInfo
->
pResBlock
;
STsdbReader
*
pReader
=
NULL
;
int32_t
code
=
pAPI
->
tsdReader
.
store
ReaderOpen
(
pTableScanInfo
->
base
.
readHandle
.
vnode
,
&
cond
,
&
tblInfo
,
1
,
pBlock
,
int32_t
code
=
pAPI
->
tsdReader
.
tsd
ReaderOpen
(
pTableScanInfo
->
base
.
readHandle
.
vnode
,
&
cond
,
&
tblInfo
,
1
,
pBlock
,
(
void
**
)
&
pReader
,
GET_TASKID
(
pTaskInfo
),
false
,
NULL
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
code
;
...
...
@@ -1103,7 +1103,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
}
bool
hasNext
=
false
;
code
=
pAPI
->
tsdReader
.
store
ReaderNextDataBlock
(
pReader
,
&
hasNext
);
code
=
pAPI
->
tsdReader
.
tsd
ReaderNextDataBlock
(
pReader
,
&
hasNext
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
code
;
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
...
...
@@ -1111,12 +1111,12 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
}
if
(
hasNext
)
{
/*SSDataBlock* p = */
pAPI
->
tsdReader
.
store
ReaderRetrieveDataBlock
(
pReader
,
NULL
);
/*SSDataBlock* p = */
pAPI
->
tsdReader
.
tsd
ReaderRetrieveDataBlock
(
pReader
,
NULL
);
doSetTagColumnData
(
&
pTableScanInfo
->
base
,
pBlock
,
pTaskInfo
,
pBlock
->
info
.
rows
);
pBlock
->
info
.
id
.
groupId
=
getTableGroupId
(
pTableScanInfo
->
base
.
pTableListInfo
,
pBlock
->
info
.
id
.
uid
);
}
pAPI
->
tsdReader
.
store
ReaderClose
(
pReader
);
pAPI
->
tsdReader
.
tsd
ReaderClose
(
pReader
);
qDebug
(
"retrieve prev rows:%"
PRId64
", skey:%"
PRId64
", ekey:%"
PRId64
" uid:%"
PRIu64
", max ver:%"
PRId64
", suid:%"
PRIu64
,
pBlock
->
info
.
rows
,
startTs
,
endTs
,
tbUid
,
maxVersion
,
cond
.
suid
);
...
...
@@ -1249,7 +1249,7 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32
*
pRowIndex
=
0
;
pInfo
->
updateWin
=
(
STimeWindow
){.
skey
=
INT64_MIN
,
.
ekey
=
INT64_MAX
};
STableScanInfo
*
pTableScanInfo
=
pInfo
->
pTableScanOp
->
info
;
pTableScanInfo
->
readerAPI
.
store
ReaderClose
(
pTableScanInfo
->
base
.
dataReader
);
pTableScanInfo
->
readerAPI
.
tsd
ReaderClose
(
pTableScanInfo
->
base
.
dataReader
);
pTableScanInfo
->
base
.
dataReader
=
NULL
;
return
NULL
;
}
...
...
@@ -1657,7 +1657,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
return
pResult
;
}
STableScanInfo
*
pTSInfo
=
pInfo
->
pTableScanOp
->
info
;
pAPI
->
tsdReader
.
store
ReaderClose
(
pTSInfo
->
base
.
dataReader
);
pAPI
->
tsdReader
.
tsd
ReaderClose
(
pTSInfo
->
base
.
dataReader
);
pTSInfo
->
base
.
dataReader
=
NULL
;
qDebug
(
"queue scan tsdb over, switch to wal ver %"
PRId64
""
,
pTaskInfo
->
streamInfo
.
snapshotVer
+
1
);
...
...
@@ -1821,7 +1821,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pTaskInfo
->
streamInfo
.
recoverStep
=
STREAM_RECOVER_STEP__SCAN2
;
}
pAPI
->
tsdReader
.
store
ReaderClose
(
pTSInfo
->
base
.
dataReader
);
pAPI
->
tsdReader
.
tsd
ReaderClose
(
pTSInfo
->
base
.
dataReader
);
pTSInfo
->
base
.
dataReader
=
NULL
;
pInfo
->
pTableScanOp
->
status
=
OP_OPENED
;
...
...
@@ -1901,7 +1901,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
}
pTaskInfo
->
streamInfo
.
recoverStep
=
STREAM_RECOVER_STEP__NONE
;
STableScanInfo
*
pTSInfo
=
pInfo
->
pTableScanOp
->
info
;
pAPI
->
tsdReader
.
store
ReaderClose
(
pTSInfo
->
base
.
dataReader
);
pAPI
->
tsdReader
.
tsd
ReaderClose
(
pTSInfo
->
base
.
dataReader
);
pTSInfo
->
base
.
dataReader
=
NULL
;
...
...
@@ -2162,20 +2162,20 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
if
(
pTaskInfo
->
streamInfo
.
currentOffset
.
type
==
TMQ_OFFSET__SNAPSHOT_DATA
)
{
bool
hasNext
=
false
;
if
(
pInfo
->
dataReader
)
{
code
=
pAPI
->
tsdReader
.
store
ReaderNextDataBlock
(
pInfo
->
dataReader
,
&
hasNext
);
code
=
pAPI
->
tsdReader
.
tsd
ReaderNextDataBlock
(
pInfo
->
dataReader
,
&
hasNext
);
if
(
code
)
{
pAPI
->
tsdReader
.
store
ReaderReleaseDataBlock
(
pInfo
->
dataReader
);
pAPI
->
tsdReader
.
tsd
ReaderReleaseDataBlock
(
pInfo
->
dataReader
);
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
}
if
(
pInfo
->
dataReader
&&
hasNext
)
{
if
(
isTaskKilled
(
pTaskInfo
))
{
pAPI
->
tsdReader
.
store
ReaderReleaseDataBlock
(
pInfo
->
dataReader
);
pAPI
->
tsdReader
.
tsd
ReaderReleaseDataBlock
(
pInfo
->
dataReader
);
T_LONG_JMP
(
pTaskInfo
->
env
,
pTaskInfo
->
code
);
}
SSDataBlock
*
pBlock
=
pAPI
->
tsdReader
.
store
ReaderRetrieveDataBlock
(
pInfo
->
dataReader
,
NULL
);
SSDataBlock
*
pBlock
=
pAPI
->
tsdReader
.
tsd
ReaderRetrieveDataBlock
(
pInfo
->
dataReader
,
NULL
);
if
(
pBlock
==
NULL
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
terrno
);
}
...
...
@@ -2227,7 +2227,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
static
void
destroyRawScanOperatorInfo
(
void
*
param
)
{
SStreamRawScanInfo
*
pRawScan
=
(
SStreamRawScanInfo
*
)
param
;
pRawScan
->
pAPI
->
tsdReader
.
store
ReaderClose
(
pRawScan
->
dataReader
);
pRawScan
->
pAPI
->
tsdReader
.
tsd
ReaderClose
(
pRawScan
->
dataReader
);
pRawScan
->
pAPI
->
snapshotFn
.
destroySnapshot
(
pRawScan
->
sContext
);
tableListDestroy
(
pRawScan
->
pTableListInfo
);
taosMemoryFree
(
pRawScan
);
...
...
@@ -2662,7 +2662,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
SReadHandle
*
pHandle
=
&
pInfo
->
base
.
readHandle
;
if
(
NULL
==
source
->
dataReader
||
!
source
->
multiReader
)
{
code
=
pAPI
->
tsdReader
.
store
ReaderOpen
(
pHandle
->
vnode
,
pQueryCond
,
p
,
1
,
pBlock
,
(
void
**
)
&
source
->
dataReader
,
GET_TASKID
(
pTaskInfo
),
false
,
NULL
);
code
=
pAPI
->
tsdReader
.
tsd
ReaderOpen
(
pHandle
->
vnode
,
pQueryCond
,
p
,
1
,
pBlock
,
(
void
**
)
&
source
->
dataReader
,
GET_TASKID
(
pTaskInfo
),
false
,
NULL
);
if
(
code
!=
0
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
...
...
@@ -2674,9 +2674,9 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
qTrace
(
"tsdb/read-table-data: %p, enter next reader"
,
reader
);
while
(
true
)
{
code
=
pAPI
->
tsdReader
.
store
ReaderNextDataBlock
(
reader
,
&
hasNext
);
code
=
pAPI
->
tsdReader
.
tsd
ReaderNextDataBlock
(
reader
,
&
hasNext
);
if
(
code
!=
0
)
{
pAPI
->
tsdReader
.
store
ReaderReleaseDataBlock
(
reader
);
pAPI
->
tsdReader
.
tsd
ReaderReleaseDataBlock
(
reader
);
pInfo
->
base
.
dataReader
=
NULL
;
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
...
...
@@ -2686,7 +2686,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
}
if
(
isTaskKilled
(
pTaskInfo
))
{
pAPI
->
tsdReader
.
store
ReaderReleaseDataBlock
(
reader
);
pAPI
->
tsdReader
.
tsd
ReaderReleaseDataBlock
(
reader
);
pInfo
->
base
.
dataReader
=
NULL
;
T_LONG_JMP
(
pTaskInfo
->
env
,
pTaskInfo
->
code
);
}
...
...
@@ -2726,7 +2726,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
qTrace
(
"tsdb/read-table-data: %p, close reader"
,
reader
);
if
(
!
source
->
multiReader
)
{
pAPI
->
tsdReader
.
store
ReaderClose
(
pInfo
->
base
.
dataReader
);
pAPI
->
tsdReader
.
tsd
ReaderClose
(
pInfo
->
base
.
dataReader
);
source
->
dataReader
=
NULL
;
}
pInfo
->
base
.
dataReader
=
NULL
;
...
...
@@ -2734,7 +2734,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
}
if
(
!
source
->
multiReader
)
{
pAPI
->
tsdReader
.
store
ReaderClose
(
pInfo
->
base
.
dataReader
);
pAPI
->
tsdReader
.
tsd
ReaderClose
(
pInfo
->
base
.
dataReader
);
source
->
dataReader
=
NULL
;
}
pInfo
->
base
.
dataReader
=
NULL
;
...
...
@@ -2853,7 +2853,7 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
for
(
int32_t
i
=
0
;
i
<
numOfTable
;
++
i
)
{
STableMergeScanSortSourceParam
*
param
=
taosArrayGet
(
pInfo
->
sortSourceParams
,
i
);
blockDataDestroy
(
param
->
inputBlock
);
pAPI
->
tsdReader
.
store
ReaderClose
(
param
->
dataReader
);
pAPI
->
tsdReader
.
tsd
ReaderClose
(
param
->
dataReader
);
param
->
dataReader
=
NULL
;
}
taosArrayClear
(
pInfo
->
sortSourceParams
);
...
...
@@ -2965,11 +2965,11 @@ void destroyTableMergeScanOperatorInfo(void* param) {
for
(
int32_t
i
=
0
;
i
<
numOfTable
;
i
++
)
{
STableMergeScanSortSourceParam
*
p
=
taosArrayGet
(
pTableScanInfo
->
sortSourceParams
,
i
);
blockDataDestroy
(
p
->
inputBlock
);
pTableScanInfo
->
base
.
readerAPI
.
store
ReaderClose
(
p
->
dataReader
);
pTableScanInfo
->
base
.
readerAPI
.
tsd
ReaderClose
(
p
->
dataReader
);
p
->
dataReader
=
NULL
;
}
pTableScanInfo
->
base
.
readerAPI
.
store
ReaderClose
(
pTableScanInfo
->
base
.
dataReader
);
pTableScanInfo
->
base
.
readerAPI
.
tsd
ReaderClose
(
pTableScanInfo
->
base
.
dataReader
);
pTableScanInfo
->
base
.
dataReader
=
NULL
;
taosArrayDestroy
(
pTableScanInfo
->
sortSourceParams
);
...
...
source/libs/executor/src/sysscanoperator.c
浏览文件 @
85afbf0d
...
...
@@ -2198,7 +2198,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
pAPI
->
tsdReader
.
store
ReaderGetDataBlockDistInfo
(
pBlockScanInfo
->
pHandle
,
&
blockDistInfo
);
pAPI
->
tsdReader
.
tsd
ReaderGetDataBlockDistInfo
(
pBlockScanInfo
->
pHandle
,
&
blockDistInfo
);
blockDistInfo
.
numOfInmemRows
=
(
int32_t
)
pAPI
->
metaFn
.
getNumOfRowsInMem
(
pBlockScanInfo
->
pHandle
);
SSDataBlock
*
pBlock
=
pBlockScanInfo
->
pResBlock
;
...
...
@@ -2229,7 +2229,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
static
void
destroyBlockDistScanOperatorInfo
(
void
*
param
)
{
SBlockDistInfo
*
pDistInfo
=
(
SBlockDistInfo
*
)
param
;
blockDataDestroy
(
pDistInfo
->
pResBlock
);
pDistInfo
->
readHandle
.
api
.
tsdReader
.
store
ReaderClose
(
pDistInfo
->
pHandle
);
pDistInfo
->
readHandle
.
api
.
tsdReader
.
tsd
ReaderClose
(
pDistInfo
->
pHandle
);
tableListDestroy
(
pDistInfo
->
pTableListInfo
);
taosMemoryFreeClear
(
param
);
}
...
...
@@ -2284,7 +2284,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
size_t
num
=
tableListGetSize
(
pTableListInfo
);
void
*
pList
=
tableListGetInfo
(
pTableListInfo
,
0
);
code
=
readHandle
->
api
.
tsdReader
.
store
ReaderOpen
(
readHandle
->
vnode
,
&
cond
,
pList
,
num
,
pInfo
->
pResBlock
,
(
void
**
)
&
pInfo
->
pHandle
,
pTaskInfo
->
id
.
str
,
false
,
NULL
);
code
=
readHandle
->
api
.
tsdReader
.
tsd
ReaderOpen
(
readHandle
->
vnode
,
&
cond
,
pList
,
num
,
pInfo
->
pResBlock
,
(
void
**
)
&
pInfo
->
pHandle
,
pTaskInfo
->
id
.
str
,
false
,
NULL
);
cleanupQueryTableDataCond
(
&
cond
);
if
(
code
!=
0
)
{
goto
_error
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录