Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8d9cc366
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
8d9cc366
编写于
5月 04, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into feature/vnode_refact1
上级
cfada1cc
d20d7872
变更
9
展开全部
隐藏空白更改
内联
并排
Showing
9 changed file
with
1710 addition
and
1898 deletion
+1710
-1898
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+2
-1
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+1
-1
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+26
-8
source/libs/executor/src/executorMain.c
source/libs/executor/src/executorMain.c
+3
-5
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+199
-1862
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+7
-6
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+8
-13
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+1462
-0
source/libs/executor/test/executorTests.cpp
source/libs/executor/test/executorTests.cpp
+2
-2
未找到文件。
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
8d9cc366
...
...
@@ -155,7 +155,6 @@ static int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle);
static
void
changeQueryHandleForInterpQuery
(
tsdbReaderT
pHandle
);
static
void
doMergeTwoLevelData
(
STsdbReadHandle
*
pTsdbReadHandle
,
STableCheckInfo
*
pCheckInfo
,
SBlock
*
pBlock
);
static
int32_t
binarySearchForKey
(
char
*
pValue
,
int
num
,
TSKEY
key
,
int
order
);
static
int32_t
tsdbReadRowsFromCache
(
STableCheckInfo
*
pCheckInfo
,
TSKEY
maxKey
,
int
maxRowsToRead
,
STimeWindow
*
win
,
STsdbReadHandle
*
pTsdbReadHandle
);
static
int32_t
tsdbCheckInfoCompar
(
const
void
*
key1
,
const
void
*
key2
);
...
...
@@ -1337,6 +1336,8 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
return
code
;
}
static
int32_t
binarySearchForKey
(
char
*
pValue
,
int
num
,
TSKEY
key
,
int
order
);
static
int32_t
loadFileDataBlock
(
STsdbReadHandle
*
pTsdbReadHandle
,
SBlock
*
pBlock
,
STableCheckInfo
*
pCheckInfo
,
bool
*
exists
)
{
SQueryFilePos
*
cur
=
&
pTsdbReadHandle
->
cur
;
...
...
source/libs/executor/inc/executil.h
浏览文件 @
8d9cc366
...
...
@@ -73,7 +73,7 @@ typedef struct SResKeyPos {
}
SResKeyPos
;
typedef
struct
SResultRowInfo
{
SResultRowPosition
*
pPosition
;
SResultRowPosition
*
pPosition
;
// todo remove this
int32_t
size
;
// number of result set
int32_t
capacity
;
// max capacity
SResultRowPosition
cur
;
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
8d9cc366
...
...
@@ -197,7 +197,7 @@ typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, struct SAggS
struct
SOptrBasicInfo
*
pInfo
,
char
*
result
,
int32_t
length
);
typedef
int32_t
(
*
__optr_open_fn_t
)(
struct
SOperatorInfo
*
pOptr
);
typedef
SSDataBlock
*
(
*
__optr_fn_t
)(
struct
SOperatorInfo
*
pOptr
,
bool
*
newgroup
);
typedef
SSDataBlock
*
(
*
__optr_fn_t
)(
struct
SOperatorInfo
*
pOptr
);
typedef
void
(
*
__optr_close_fn_t
)(
void
*
param
,
int32_t
num
);
typedef
int32_t
(
*
__optr_get_explain_fn_t
)(
struct
SOperatorInfo
*
pOptr
,
void
**
pOptrExplain
);
...
...
@@ -425,7 +425,7 @@ typedef struct STimeWindowSupp {
SColumnInfoData
timeWindowData
;
// query time window info for scalar function execution.
}
STimeWindowAggSupp
;
typedef
struct
S
TableInterval
OperatorInfo
{
typedef
struct
S
IntervalAgg
OperatorInfo
{
SOptrBasicInfo
binfo
;
// basic info
SGroupResInfo
groupResInfo
;
// multiple results build supporter
SInterval
interval
;
// interval info
...
...
@@ -440,7 +440,7 @@ typedef struct STableIntervalOperatorInfo {
SArray
*
pUpdatedWindow
;
// updated time window due to the input data block from the downstream operator.
STimeWindowAggSupp
twAggSup
;
struct
SFillInfo
*
pFillInfo
;
// fill info
}
S
TableInterval
OperatorInfo
;
}
S
IntervalAgg
OperatorInfo
;
typedef
struct
SAggOperatorInfo
{
SOptrBasicInfo
binfo
;
...
...
@@ -533,6 +533,7 @@ typedef struct SSessionAggOperatorInfo {
SWindowRowsSup
winSup
;
bool
reptScan
;
// next round scan
int64_t
gap
;
// session window gap
int32_t
tsSlotId
;
// primary timestamp slot id
STimeWindowAggSupp
twAggSup
;
}
SSessionAggOperatorInfo
;
...
...
@@ -550,6 +551,7 @@ typedef struct SStateWindowOperatorInfo {
int32_t
colIndex
;
// start row index
bool
hasKey
;
SStateKeys
stateKey
;
int32_t
tsSlotId
;
// primary timestamp column slot id
STimeWindowAggSupp
twAggSup
;
// bool reptScan;
}
SStateWindowOperatorInfo
;
...
...
@@ -606,6 +608,9 @@ typedef struct SJoinOperatorInfo {
SNode
*
pOnCondition
;
}
SJoinOperatorInfo
;
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
SOperatorFpSet
createOperatorFpSet
(
__optr_open_fn_t
openFn
,
__optr_fn_t
nextFn
,
__optr_fn_t
streamFn
,
__optr_fn_t
cleanup
,
__optr_close_fn_t
closeFn
,
__optr_encode_fn_t
encode
,
__optr_decode_fn_t
decode
,
__optr_get_explain_fn_t
explain
);
...
...
@@ -616,8 +621,8 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
int32_t
initAggInfo
(
SOptrBasicInfo
*
pBasicInfo
,
SAggSupporter
*
pAggSup
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
size_t
keyBufSize
,
const
char
*
pkey
);
void
initResultSizeInfo
(
SOperatorInfo
*
pOperator
,
int32_t
numOfRows
);
void
doBuildResultDatablock
(
SSDataBlock
*
pBlock
,
SGroupResInfo
*
pGroupResInfo
,
SExprInfo
*
pExprInfo
,
SDiskbasedBuf
*
pBuf
,
int32_t
*
rowCellOffset
,
SqlFunctionCtx
*
pCtx
);
void
doBuildResultDatablock
(
SOptrBasicInfo
*
pbInfo
,
SGroupResInfo
*
pGroupResInfo
,
SExprInfo
*
pExprInfo
,
SDiskbasedBuf
*
pBuf
);
void
finalizeMultiTupleQueryResult
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
SDiskbasedBuf
*
pBuf
,
SResultRowInfo
*
pResultRowInfo
,
int32_t
*
rowCellInfoOffset
);
void
doApplyFunctions
(
SqlFunctionCtx
*
pCtx
,
STimeWindow
*
pWin
,
SColumnInfoData
*
pTimeWindowData
,
int32_t
offset
,
...
...
@@ -635,6 +640,16 @@ void doSetOperatorCompleted(SOperatorInfo* pOperator);
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
);
SqlFunctionCtx
*
createSqlFunctionCtx
(
SExprInfo
*
pExprInfo
,
int32_t
numOfOutput
,
int32_t
**
rowCellInfoOffset
);
void
relocateColumnData
(
SSDataBlock
*
pBlock
,
const
SArray
*
pColMatchInfo
,
SArray
*
pCols
);
void
initExecTimeWindowInfo
(
SColumnInfoData
*
pColData
,
STimeWindow
*
pQueryWindow
);
void
cleanupAggSup
(
SAggSupporter
*
pAggSup
);
void
destroyBasicOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
);
void
setResultRowInitCtx
(
SResultRow
*
pResult
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
*
rowCellInfoOffset
);
SResultRow
*
doSetResultOutBufByKey
(
SDiskbasedBuf
*
pResultBuf
,
SResultRowInfo
*
pResultRowInfo
,
char
*
pData
,
int16_t
bytes
,
bool
masterscan
,
uint64_t
groupId
,
SExecTaskInfo
*
pTaskInfo
,
bool
isIntervalQuery
,
SAggSupporter
*
pSup
);
SOperatorInfo
*
createExchangeOperatorInfo
(
const
SNodeList
*
pSources
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
);
...
...
@@ -656,10 +671,12 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SSDataBlock
*
pResBlock
,
SInterval
*
pInterval
,
int32_t
primaryTsSlotId
,
STimeWindowAggSupp
*
pTwAggSupp
,
const
STableGroupInfo
*
pTableGroupInfo
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SInterval
*
pInterval
,
int32_t
primaryTsSlotId
,
STimeWindowAggSupp
*
pTwAggSupp
,
const
STableGroupInfo
*
pTableGroupInfo
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
int64_t
gap
,
STimeWindowAggSupp
*
pTwAggSupp
,
SExecTaskInfo
*
pTaskInfo
);
SSDataBlock
*
pResBlock
,
int64_t
gap
,
int32_t
tsSlotId
,
STimeWindowAggSupp
*
pTwAggSupp
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createGroupOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
SArray
*
pGroupColList
,
SNode
*
pCondition
,
SExprInfo
*
pScalarExprInfo
,
int32_t
numOfScalarExpr
,
SExecTaskInfo
*
pTaskInfo
,
...
...
@@ -672,11 +689,12 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
SInterval
*
pInterval
,
STimeWindow
*
pWindow
,
SSDataBlock
*
pResBlock
,
int32_t
fillType
,
SNodeListNode
*
fillVal
,
bool
multigroupResult
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStatewindowOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExpr
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
STimeWindowAggSupp
*
pTwAggSupp
,
SExecTaskInfo
*
pTaskInfo
);
SSDataBlock
*
pResBlock
,
STimeWindowAggSupp
*
pTwAggSupp
,
int32_t
tsSlotId
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createPartitionOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
SArray
*
pGroupColList
,
SExecTaskInfo
*
pTaskInfo
,
const
STableGroupInfo
*
pTableGroupInfo
);
SOperatorInfo
*
createTimeSliceOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
SExecTaskInfo
*
pTaskInfo
);
...
...
@@ -697,7 +715,7 @@ void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlo
void
finalizeQueryResult
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
);
void
copyTsColoum
(
SSDataBlock
*
pRes
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
);
STableQueryInfo
*
createTableQueryInfo
(
void
*
buf
,
bool
groupbyColumn
,
STimeWindow
win
);
STableQueryInfo
*
createTableQueryInfo
(
void
*
buf
,
STimeWindow
win
);
bool
isTaskKilled
(
SExecTaskInfo
*
pTaskInfo
);
int32_t
checkForQueryBuf
(
size_t
numOfTables
);
...
...
source/libs/executor/src/executorMain.c
浏览文件 @
8d9cc366
...
...
@@ -154,14 +154,12 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
qDebug
(
"%s execTask is launched"
,
GET_TASKID
(
pTaskInfo
));
bool
newgroup
=
false
;
publishOperatorProfEvent
(
pTaskInfo
->
pRoot
,
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
int64_t
st
=
0
;
st
=
taosGetTimestampUs
();
*
pRes
=
pTaskInfo
->
pRoot
->
fpSet
.
getNextFn
(
pTaskInfo
->
pRoot
,
&
newgroup
);
int64_t
st
=
taosGetTimestampUs
();
*
pRes
=
pTaskInfo
->
pRoot
->
fpSet
.
getNextFn
(
pTaskInfo
->
pRoot
);
uint64_t
el
=
(
taosGetTimestampUs
()
-
st
);
pTaskInfo
->
cost
.
elapsedTime
+=
el
;
publishOperatorProfEvent
(
pTaskInfo
->
pRoot
,
QUERY_PROF_AFTER_OPERATOR_EXEC
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
8d9cc366
此差异已折叠。
点击以展开。
source/libs/executor/src/groupoperator.c
浏览文件 @
8d9cc366
...
...
@@ -256,7 +256,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
}
static
SSDataBlock
*
hashGroupbyAggregate
(
SOperatorInfo
*
pOperator
,
bool
*
newgroup
)
{
static
SSDataBlock
*
hashGroupbyAggregate
(
SOperatorInfo
*
pOperator
)
{
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
}
...
...
@@ -265,7 +265,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
SSDataBlock
*
pRes
=
pInfo
->
binfo
.
pRes
;
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
doBuildResultDatablock
(
pRes
,
&
pInfo
->
groupResInfo
,
pOperator
->
pExpr
,
pInfo
->
aggSup
.
pResultBuf
,
pInfo
->
binfo
.
rowCellInfoOffset
,
pInfo
->
binfo
.
pCtx
);
doBuildResultDatablock
(
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pOperator
->
pExpr
,
pInfo
->
aggSup
.
pResultBuf
);
if
(
pRes
->
info
.
rows
==
0
||
!
hasRemainDataInCurrentGroup
(
&
pInfo
->
groupResInfo
))
{
pOperator
->
status
=
OP_EXEC_DONE
;
}
...
...
@@ -277,7 +277,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
while
(
1
)
{
publishOperatorProfEvent
(
downstream
,
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
,
newgroup
);
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
publishOperatorProfEvent
(
downstream
,
QUERY_PROF_AFTER_OPERATOR_EXEC
);
if
(
pBlock
==
NULL
)
{
break
;
...
...
@@ -311,7 +311,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
false
);
while
(
1
)
{
doBuildResultDatablock
(
pRes
,
&
pInfo
->
groupResInfo
,
pOperator
->
pExpr
,
pInfo
->
aggSup
.
pResultBuf
,
pInfo
->
binfo
.
rowCellInfoOffset
,
pInfo
->
binfo
.
pCtx
);
doBuildResultDatablock
(
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pOperator
->
pExpr
,
pInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pInfo
->
pCondition
,
pRes
);
bool
hasRemain
=
hasRemainDataInCurrentGroup
(
&
pInfo
->
groupResInfo
);
...
...
@@ -537,11 +537,12 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
pInfo
->
pageIndex
+=
1
;
blockDataUpdateTsWindow
(
pInfo
->
binfo
.
pRes
);
pInfo
->
binfo
.
pRes
->
info
.
groupId
=
pGroupInfo
->
groupId
;
return
pInfo
->
binfo
.
pRes
;
}
static
SSDataBlock
*
hashPartition
(
SOperatorInfo
*
pOperator
,
bool
*
newgroup
)
{
static
SSDataBlock
*
hashPartition
(
SOperatorInfo
*
pOperator
)
{
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
}
...
...
@@ -558,7 +559,7 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator, bool* newgroup) {
while
(
1
)
{
publishOperatorProfEvent
(
downstream
,
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
,
newgroup
);
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
publishOperatorProfEvent
(
downstream
,
QUERY_PROF_AFTER_OPERATOR_EXEC
);
if
(
pBlock
==
NULL
)
{
break
;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
8d9cc366
...
...
@@ -257,11 +257,9 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction
pTableScanInfo
->
cond
.
order
=
TSDB_ORDER_DESC
;
}
static
SSDataBlock
*
doTableScanImpl
(
SOperatorInfo
*
pOperator
,
bool
*
newgroup
)
{
static
SSDataBlock
*
doTableScanImpl
(
SOperatorInfo
*
pOperator
)
{
STableScanInfo
*
pTableScanInfo
=
pOperator
->
info
;
SSDataBlock
*
pBlock
=
pTableScanInfo
->
pResBlock
;
*
newgroup
=
false
;
while
(
tsdbNextDataBlock
(
pTableScanInfo
->
dataReader
))
{
if
(
isTaskKilled
(
pOperator
->
pTaskInfo
))
{
...
...
@@ -289,7 +287,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
return
NULL
;
}
static
SSDataBlock
*
doTableScan
(
SOperatorInfo
*
pOperator
,
bool
*
newgroup
)
{
static
SSDataBlock
*
doTableScan
(
SOperatorInfo
*
pOperator
)
{
STableScanInfo
*
pTableScanInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
...
...
@@ -298,10 +296,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) {
return
NULL
;
}
*
newgroup
=
false
;
while
(
pTableScanInfo
->
current
<
pTableScanInfo
->
scanInfo
.
numOfAsc
)
{
SSDataBlock
*
p
=
doTableScanImpl
(
pOperator
,
newgroup
);
SSDataBlock
*
p
=
doTableScanImpl
(
pOperator
);
if
(
p
!=
NULL
)
{
return
p
;
}
...
...
@@ -334,7 +330,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) {
GET_TASKID
(
pTaskInfo
),
pWin
->
skey
,
pWin
->
ekey
);
while
(
pTableScanInfo
->
current
<
total
)
{
SSDataBlock
*
p
=
doTableScanImpl
(
pOperator
,
newgroup
);
SSDataBlock
*
p
=
doTableScanImpl
(
pOperator
);
if
(
p
!=
NULL
)
{
return
p
;
}
...
...
@@ -421,13 +417,12 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle) {
return
pOperator
;
}
static
SSDataBlock
*
doBlockInfoScan
(
SOperatorInfo
*
pOperator
,
bool
*
newgroup
)
{
static
SSDataBlock
*
doBlockInfoScan
(
SOperatorInfo
*
pOperator
)
{
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
}
STableScanInfo
*
pTableScanInfo
=
pOperator
->
info
;
*
newgroup
=
false
;
STableBlockDistInfo
tableBlockDist
=
{
0
};
tableBlockDist
.
numOfTables
=
1
;
// TODO set the correct number of tables
...
...
@@ -514,7 +509,7 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
taosArrayClear
(
pInfo
->
pBlockLists
);
}
static
SSDataBlock
*
doStreamBlockScan
(
SOperatorInfo
*
pOperator
,
bool
*
newgroup
)
{
static
SSDataBlock
*
doStreamBlockScan
(
SOperatorInfo
*
pOperator
)
{
// NOTE: this operator does never check if current status is done or not
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SStreamBlockScanInfo
*
pInfo
=
pOperator
->
info
;
...
...
@@ -859,7 +854,7 @@ static SSDataBlock* buildSysTableMetaBlock() {
return
pBlock
;
}
static
SSDataBlock
*
doSysTableScan
(
SOperatorInfo
*
pOperator
,
bool
*
newgroup
)
{
static
SSDataBlock
*
doSysTableScan
(
SOperatorInfo
*
pOperator
)
{
// build message and send to mnode to fetch the content of system tables.
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SSysTableScanInfo
*
pInfo
=
pOperator
->
info
;
...
...
@@ -1191,7 +1186,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe
return
pOperator
;
}
static
SSDataBlock
*
doTagScan
(
SOperatorInfo
*
pOperator
,
bool
*
newgroup
)
{
static
SSDataBlock
*
doTagScan
(
SOperatorInfo
*
pOperator
)
{
#if 0
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
...
...
source/libs/executor/src/timewindowoperator.c
0 → 100644
浏览文件 @
8d9cc366
此差异已折叠。
点击以展开。
source/libs/executor/test/executorTests.cpp
浏览文件 @
8d9cc366
...
...
@@ -55,7 +55,7 @@ typedef struct SDummyInputInfo {
SSDataBlock
*
pBlock
;
}
SDummyInputInfo
;
SSDataBlock
*
getDummyBlock
(
SOperatorInfo
*
pOperator
,
bool
*
newgroup
)
{
SSDataBlock
*
getDummyBlock
(
SOperatorInfo
*
pOperator
)
{
SDummyInputInfo
*
pInfo
=
static_cast
<
SDummyInputInfo
*>
(
pOperator
->
info
);
if
(
pInfo
->
current
>=
pInfo
->
totalPages
)
{
return
NULL
;
...
...
@@ -121,7 +121,7 @@ SSDataBlock* getDummyBlock(SOperatorInfo* pOperator, bool* newgroup) {
return
pBlock
;
}
SSDataBlock
*
get2ColsDummyBlock
(
SOperatorInfo
*
pOperator
,
bool
*
newgroup
)
{
SSDataBlock
*
get2ColsDummyBlock
(
SOperatorInfo
*
pOperator
)
{
SDummyInputInfo
*
pInfo
=
static_cast
<
SDummyInputInfo
*>
(
pOperator
->
info
);
if
(
pInfo
->
current
>=
pInfo
->
totalPages
)
{
return
NULL
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录