Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b7c8346b
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
b7c8346b
编写于
4月 28, 2023
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
差异文件
other: merge other branch.
上级
2984c5a7
f9a64cbc
变更
35
展开全部
隐藏空白更改
内联
并排
Showing
35 changed file
with
1851 addition
and
1728 deletion
+1851
-1728
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+11
-9
source/dnode/vnode/src/tq/tqRestore.c
source/dnode/vnode/src/tq/tqRestore.c
+1
-1
source/dnode/vnode/src/tq/tqScan.c
source/dnode/vnode/src/tq/tqScan.c
+1
-0
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+0
-2
source/libs/executor/inc/executorInt.h
source/libs/executor/inc/executorInt.h
+640
-4
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+0
-882
source/libs/executor/inc/operator.h
source/libs/executor/inc/operator.h
+166
-0
source/libs/executor/inc/querytask.h
source/libs/executor/inc/querytask.h
+112
-0
source/libs/executor/src/aggregateoperator.c
source/libs/executor/src/aggregateoperator.c
+5
-8
source/libs/executor/src/cachescanoperator.c
source/libs/executor/src/cachescanoperator.c
+3
-1
source/libs/executor/src/dataDeleter.c
source/libs/executor/src/dataDeleter.c
+1
-1
source/libs/executor/src/dataDispatcher.c
source/libs/executor/src/dataDispatcher.c
+1
-1
source/libs/executor/src/dataInserter.c
source/libs/executor/src/dataInserter.c
+1
-1
source/libs/executor/src/eventwindowoperator.c
source/libs/executor/src/eventwindowoperator.c
+3
-1
source/libs/executor/src/exchangeoperator.c
source/libs/executor/src/exchangeoperator.c
+8
-6
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+2
-1
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+26
-5
source/libs/executor/src/executorInt.c
source/libs/executor/src/executorInt.c
+3
-779
source/libs/executor/src/filloperator.c
source/libs/executor/src/filloperator.c
+4
-2
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+2
-1
source/libs/executor/src/joinoperator.c
source/libs/executor/src/joinoperator.c
+3
-1
source/libs/executor/src/operator.c
source/libs/executor/src/operator.c
+578
-0
source/libs/executor/src/projectoperator.c
source/libs/executor/src/projectoperator.c
+3
-1
source/libs/executor/src/querytask.c
source/libs/executor/src/querytask.c
+235
-0
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+4
-2
source/libs/executor/src/sortoperator.c
source/libs/executor/src/sortoperator.c
+3
-1
source/libs/executor/src/sysscanoperator.c
source/libs/executor/src/sysscanoperator.c
+4
-1
source/libs/executor/src/tfill.c
source/libs/executor/src/tfill.c
+1
-1
source/libs/executor/src/timesliceoperator.c
source/libs/executor/src/timesliceoperator.c
+3
-1
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+10
-3
source/libs/executor/test/executorTests.cpp
source/libs/executor/test/executorTests.cpp
+2
-5
source/libs/executor/test/lhashTests.cpp
source/libs/executor/test/lhashTests.cpp
+1
-1
source/libs/executor/test/sortTests.cpp
source/libs/executor/test/sortTests.cpp
+1
-1
source/libs/stream/src/streamExec.c
source/libs/stream/src/streamExec.c
+8
-4
source/libs/stream/src/streamRecover.c
source/libs/stream/src/streamRecover.c
+5
-1
未找到文件。
source/dnode/vnode/src/tq/tq.c
浏览文件 @
b7c8346b
...
...
@@ -821,13 +821,18 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
}
// do recovery step 1
streamSourceRecoverScanStep1
(
pTask
);
tqDebug
(
"s-task:%s start recover step 1 scan"
,
pTask
->
id
.
idStr
);
int64_t
st
=
taosGetTimestampMs
();
streamSourceRecoverScanStep1
(
pTask
);
if
(
atomic_load_8
(
&
pTask
->
status
.
taskStatus
)
==
TASK_STATUS__DROPPING
)
{
streamMetaReleaseTask
(
pTq
->
pStreamMeta
,
pTask
);
return
0
;
}
double
el
=
(
taosGetTimestampMs
()
-
st
)
/
1000
.
0
;
tqDebug
(
"s-task:%s recover step 1 ended, elapsed time:%.2fs"
,
pTask
->
id
.
idStr
,
el
);
// build msg to launch next step
SStreamRecoverStep2Req
req
;
code
=
streamBuildSourceRecover2Req
(
pTask
,
&
req
);
...
...
@@ -853,20 +858,17 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
memcpy
(
serializedReq
,
&
req
,
len
);
// dispatch msg
SRpcMsg
rpcMsg
=
{
.
code
=
0
,
.
contLen
=
len
,
.
msgType
=
TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE
,
.
pCont
=
serializedReq
,
};
tqDebug
(
"s-task:%s start recover block stage"
,
pTask
->
id
.
idStr
);
SRpcMsg
rpcMsg
=
{
.
code
=
0
,
.
contLen
=
len
,
.
msgType
=
TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE
,
.
pCont
=
serializedReq
};
tmsgPutToQueue
(
&
pTq
->
pVnode
->
msgCb
,
WRITE_QUEUE
,
&
rpcMsg
);
return
0
;
}
int32_t
tqProcessTaskRecover2Req
(
STQ
*
pTq
,
int64_t
sversion
,
char
*
msg
,
int32_t
msgLen
)
{
int32_t
code
;
int32_t
code
=
0
;
SStreamRecoverStep2Req
*
pReq
=
(
SStreamRecoverStep2Req
*
)
msg
;
SStreamTask
*
pTask
=
streamMetaAcquireTask
(
pTq
->
pStreamMeta
,
pReq
->
taskId
);
if
(
pTask
==
NULL
)
{
...
...
source/dnode/vnode/src/tq/tqRestore.c
浏览文件 @
b7c8346b
...
...
@@ -107,7 +107,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
if
(
streamTaskShouldStop
(
&
pTask
->
status
)
||
status
==
TASK_STATUS__RECOVER_PREPARE
||
status
==
TASK_STATUS__WAIT_DOWNSTREAM
||
status
==
TASK_STATUS__PAUSE
)
{
tqDebug
(
"s-task:%s
skip push data, not ready for processing, status
%d"
,
pTask
->
id
.
idStr
,
status
);
tqDebug
(
"s-task:%s
not ready for new submit block from wal, status:
%d"
,
pTask
->
id
.
idStr
,
status
);
streamMetaReleaseTask
(
pStreamMeta
,
pTask
);
continue
;
}
...
...
source/dnode/vnode/src/tq/tqScan.c
浏览文件 @
b7c8346b
...
...
@@ -130,6 +130,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
tqError
(
"vgId:%d, task exec error since %s"
,
pTq
->
pVnode
->
config
.
vgId
,
terrstr
());
return
-
1
;
}
tqDebug
(
"tmqsnap task execute end, get %p"
,
pDataBlock
);
if
(
pDataBlock
!=
NULL
&&
pDataBlock
->
info
.
rows
>
0
)
{
...
...
source/libs/executor/inc/executil.h
浏览文件 @
b7c8346b
...
...
@@ -39,8 +39,6 @@
#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t))
#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str)
typedef
struct
SGroupResInfo
{
int32_t
index
;
SArray
*
pRows
;
// SArray<SResKeyPos>
...
...
source/libs/executor/inc/executorInt.h
浏览文件 @
b7c8346b
此差异已折叠。
点击以展开。
source/libs/executor/inc/executorimpl.h
浏览文件 @
b7c8346b
此差异已折叠。
点击以展开。
source/libs/executor/inc/operator.h
0 → 100644
浏览文件 @
b7c8346b
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_OPERATOR_H
#define TDENGINE_OPERATOR_H
#ifdef __cplusplus
extern
"C"
{
#endif
typedef
struct
SOperatorCostInfo
{
double
openCost
;
double
totalCost
;
}
SOperatorCostInfo
;
struct
SOperatorInfo
;
typedef
int32_t
(
*
__optr_encode_fn_t
)(
struct
SOperatorInfo
*
pOperator
,
char
**
result
,
int32_t
*
length
);
typedef
int32_t
(
*
__optr_decode_fn_t
)(
struct
SOperatorInfo
*
pOperator
,
char
*
result
);
typedef
int32_t
(
*
__optr_open_fn_t
)(
struct
SOperatorInfo
*
pOptr
);
typedef
SSDataBlock
*
(
*
__optr_fn_t
)(
struct
SOperatorInfo
*
pOptr
);
typedef
void
(
*
__optr_close_fn_t
)(
void
*
param
);
typedef
int32_t
(
*
__optr_explain_fn_t
)(
struct
SOperatorInfo
*
pOptr
,
void
**
pOptrExplain
,
uint32_t
*
len
);
typedef
int32_t
(
*
__optr_reqBuf_fn_t
)(
struct
SOperatorInfo
*
pOptr
);
typedef
struct
SOperatorFpSet
{
__optr_open_fn_t
_openFn
;
// DO NOT invoke this function directly
__optr_fn_t
getNextFn
;
__optr_fn_t
cleanupFn
;
// call this function to release the allocated resources ASAP
__optr_close_fn_t
closeFn
;
__optr_reqBuf_fn_t
reqBufFn
;
// total used buffer for blocking operator
__optr_encode_fn_t
encodeResultRow
;
__optr_decode_fn_t
decodeResultRow
;
__optr_explain_fn_t
getExplainFn
;
}
SOperatorFpSet
;
enum
{
OP_NOT_OPENED
=
0x0
,
OP_OPENED
=
0x1
,
OP_RES_TO_RETURN
=
0x5
,
OP_EXEC_DONE
=
0x9
,
};
typedef
struct
SOperatorInfo
{
uint16_t
operatorType
;
int16_t
resultDataBlockId
;
bool
blocking
;
// block operator or not
uint8_t
status
;
// denote if current operator is completed
char
*
name
;
// name, for debug purpose
void
*
info
;
// extension attribution
SExprSupp
exprSupp
;
SExecTaskInfo
*
pTaskInfo
;
SOperatorCostInfo
cost
;
SResultInfo
resultInfo
;
struct
SOperatorInfo
**
pDownstream
;
// downstram pointer list
int32_t
numOfDownstream
;
// number of downstream. The value is always ONE expect for join operator
SOperatorFpSet
fpSet
;
}
SOperatorInfo
;
// operator creater functions
// clang-format off
SOperatorInfo
*
createExchangeOperatorInfo
(
void
*
pTransporter
,
SExchangePhysiNode
*
pExNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTableScanOperatorInfo
(
STableScanPhysiNode
*
pTableScanNode
,
SReadHandle
*
pHandle
,
STableListInfo
*
pTableList
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTableMergeScanOperatorInfo
(
STableScanPhysiNode
*
pTableScanNode
,
SReadHandle
*
readHandle
,
STableListInfo
*
pTableListInfo
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTagScanOperatorInfo
(
SReadHandle
*
pReadHandle
,
STagScanPhysiNode
*
pPhyNode
,
STableListInfo
*
pTableListInfo
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createSysTableScanOperatorInfo
(
void
*
readHandle
,
SSystemTableScanPhysiNode
*
pScanPhyNode
,
const
char
*
pUser
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTableCountScanOperatorInfo
(
SReadHandle
*
handle
,
STableCountScanPhysiNode
*
pNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createAggregateOperatorInfo
(
SOperatorInfo
*
downstream
,
SAggPhysiNode
*
pNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createIndefinitOutputOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createProjectOperatorInfo
(
SOperatorInfo
*
downstream
,
SProjectPhysiNode
*
pProjPhyNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createSortOperatorInfo
(
SOperatorInfo
*
downstream
,
SSortPhysiNode
*
pSortNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createMultiwayMergeOperatorInfo
(
SOperatorInfo
**
dowStreams
,
size_t
numStreams
,
SMergePhysiNode
*
pMergePhysiNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createCacherowsScanOperator
(
SLastRowScanPhysiNode
*
pTableScanNode
,
SReadHandle
*
readHandle
,
STableListInfo
*
pTableListInfo
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SIntervalPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createMergeIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SMergeIntervalPhysiNode
*
pIntervalPhyNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createMergeAlignedIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SMergeAlignedIntervalPhysiNode
*
pNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamFinalIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
,
int32_t
numOfChild
);
SOperatorInfo
*
createSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SSessionWinodwPhysiNode
*
pSessionNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createGroupOperatorInfo
(
SOperatorInfo
*
downstream
,
SAggPhysiNode
*
pAggNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createDataBlockInfoScanOperator
(
SReadHandle
*
readHandle
,
SBlockDistScanPhysiNode
*
pBlockScanNode
,
STableListInfo
*
pTableListInfo
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamScanOperatorInfo
(
SReadHandle
*
pHandle
,
STableScanPhysiNode
*
pTableScanNode
,
SNode
*
pTagCond
,
STableListInfo
*
pTableListInfo
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createRawScanOperatorInfo
(
SReadHandle
*
pHandle
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createFillOperatorInfo
(
SOperatorInfo
*
downstream
,
SFillPhysiNode
*
pPhyFillNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStatewindowOperatorInfo
(
SOperatorInfo
*
downstream
,
SStateWinodwPhysiNode
*
pStateNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createPartitionOperatorInfo
(
SOperatorInfo
*
downstream
,
SPartitionPhysiNode
*
pPartNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamPartitionOperatorInfo
(
SOperatorInfo
*
downstream
,
SStreamPartitionPhysiNode
*
pPartNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTimeSliceOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createMergeJoinOperatorInfo
(
SOperatorInfo
**
pDownstream
,
int32_t
numOfDownstream
,
SSortMergeJoinPhysiNode
*
pJoinNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamFinalSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
,
int32_t
numOfChild
);
SOperatorInfo
*
createStreamIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamStateAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamFillOperatorInfo
(
SOperatorInfo
*
downstream
,
SStreamFillPhysiNode
*
pPhyFillNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createGroupSortOperatorInfo
(
SOperatorInfo
*
downstream
,
SGroupSortPhysiNode
*
pSortPhyNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createEventwindowOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
physiNode
,
SExecTaskInfo
*
pTaskInfo
);
// clang-format on
SOperatorFpSet
createOperatorFpSet
(
__optr_open_fn_t
openFn
,
__optr_fn_t
nextFn
,
__optr_fn_t
cleanup
,
__optr_close_fn_t
closeFn
,
__optr_reqBuf_fn_t
reqBufFn
,
__optr_explain_fn_t
explain
);
int32_t
optrDummyOpenFn
(
SOperatorInfo
*
pOperator
);
int32_t
appendDownstream
(
SOperatorInfo
*
p
,
SOperatorInfo
**
pDownstream
,
int32_t
num
);
void
setOperatorCompleted
(
SOperatorInfo
*
pOperator
);
void
setOperatorInfo
(
SOperatorInfo
*
pOperator
,
const
char
*
name
,
int32_t
type
,
bool
blocking
,
int32_t
status
,
void
*
pInfo
,
SExecTaskInfo
*
pTaskInfo
);
int32_t
optrDefaultBufFn
(
SOperatorInfo
*
pOperator
);
SOperatorInfo
*
createOperator
(
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
,
SReadHandle
*
pHandle
,
SNode
*
pTagCond
,
SNode
*
pTagIndexCond
,
const
char
*
pUser
,
const
char
*
dbname
);
void
destroyOperator
(
SOperatorInfo
*
pOperator
);
SOperatorInfo
*
extractOperatorInTree
(
SOperatorInfo
*
pOperator
,
int32_t
type
,
const
char
*
id
);
int32_t
getTableScanInfo
(
SOperatorInfo
*
pOperator
,
int32_t
*
order
,
int32_t
*
scanFlag
,
bool
inheritUsOrder
);
int32_t
stopTableScanOperator
(
SOperatorInfo
*
pOperator
,
const
char
*
pIdStr
);
int32_t
getOperatorExplainExecInfo
(
struct
SOperatorInfo
*
operatorInfo
,
SArray
*
pExecInfoList
);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_OPERATOR_H
\ No newline at end of file
source/libs/executor/inc/querytask.h
0 → 100644
浏览文件 @
b7c8346b
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_QUERYTASK_H
#define TDENGINE_QUERYTASK_H
#ifdef __cplusplus
extern
"C"
{
#endif
#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str)
enum
{
// when this task starts to execute, this status will set
TASK_NOT_COMPLETED
=
0x1u
,
/* Task is over
* 1. this status is used in one row result query process, e.g., count/sum/first/last/ avg...etc.
* 2. when all data within queried time window, it is also denoted as query_completed
*/
TASK_COMPLETED
=
0x2u
,
};
typedef
struct
STaskIdInfo
{
uint64_t
queryId
;
// this is also a request id
uint64_t
subplanId
;
uint64_t
templateId
;
char
*
str
;
int32_t
vgId
;
}
STaskIdInfo
;
typedef
struct
STaskCostInfo
{
int64_t
created
;
int64_t
start
;
uint64_t
elapsedTime
;
double
extractListTime
;
double
groupIdMapTime
;
SFileBlockLoadRecorder
*
pRecoder
;
}
STaskCostInfo
;
typedef
struct
STaskStopInfo
{
SRWLatch
lock
;
SArray
*
pStopInfo
;
}
STaskStopInfo
;
typedef
struct
{
STqOffsetVal
currentOffset
;
// for tmq
SMqMetaRsp
metaRsp
;
// for tmq fetching meta
int64_t
snapshotVer
;
SPackedData
submit
;
// todo remove it
SSchemaWrapper
*
schema
;
char
tbName
[
TSDB_TABLE_NAME_LEN
];
// this is the current scan table: todo refactor
int8_t
recoverStep
;
int8_t
recoverScanFinished
;
SQueryTableDataCond
tableCond
;
int64_t
fillHistoryVer1
;
int64_t
fillHistoryVer2
;
SStreamState
*
pState
;
int64_t
dataVersion
;
int64_t
checkPointId
;
}
SStreamTaskInfo
;
struct
SExecTaskInfo
{
STaskIdInfo
id
;
uint32_t
status
;
STimeWindow
window
;
STaskCostInfo
cost
;
int64_t
owner
;
// if it is in execution
int32_t
code
;
int32_t
qbufQuota
;
// total available buffer (in KB) during execution query
int64_t
version
;
// used for stream to record wal version, why not move to sschemainfo
SStreamTaskInfo
streamInfo
;
SSchemaInfo
schemaInfo
;
const
char
*
sql
;
// query sql string
jmp_buf
env
;
// jump to this position when error happens.
EOPTR_EXEC_MODEL
execModel
;
// operator execution model [batch model|stream model]
SSubplan
*
pSubplan
;
struct
SOperatorInfo
*
pRoot
;
SLocalFetch
localFetch
;
SArray
*
pResultBlockList
;
// result block list
STaskStopInfo
stopInfo
;
SRWLatch
lock
;
// secure the access of STableListInfo
};
void
buildTaskId
(
uint64_t
taskId
,
uint64_t
queryId
,
char
*
dst
);
SExecTaskInfo
*
doCreateTask
(
uint64_t
queryId
,
uint64_t
taskId
,
int32_t
vgId
,
EOPTR_EXEC_MODEL
model
);
void
doDestroyTask
(
SExecTaskInfo
*
pTaskInfo
);
bool
isTaskKilled
(
SExecTaskInfo
*
pTaskInfo
);
void
setTaskKilled
(
SExecTaskInfo
*
pTaskInfo
,
int32_t
rspCode
);
void
setTaskStatus
(
SExecTaskInfo
*
pTaskInfo
,
int8_t
status
);
int32_t
createExecTaskInfo
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
SReadHandle
*
pHandle
,
uint64_t
taskId
,
int32_t
vgId
,
char
*
sql
,
EOPTR_EXEC_MODEL
model
);
int32_t
qAppendTaskStopInfo
(
SExecTaskInfo
*
pTaskInfo
,
SExchangeOpStopInfo
*
pInfo
);
SArray
*
getTableListInfo
(
const
SExecTaskInfo
*
pTaskInfo
);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_QUERYTASK_H
source/libs/executor/src/aggregateoperator.c
浏览文件 @
b7c8346b
...
...
@@ -15,24 +15,21 @@
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
#include "os.h"
#include "querynodes.h"
#include "tfill.h"
#include "tname.h"
#include "tdatablock.h"
#include "tglobal.h"
#include "tmsg.h"
#include "ttime.h"
#include "executorimpl.h"
#include "executorInt.h"
#include "index.h"
#include "operator.h"
#include "query.h"
#include "querytask.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tglobal.h"
#include "thash.h"
#include "ttypes.h"
#include "vnode.h"
typedef
struct
{
bool
hasAgg
;
...
...
source/libs/executor/src/cachescanoperator.c
浏览文件 @
b7c8346b
...
...
@@ -20,7 +20,9 @@
#include "tdatablock.h"
#include "tmsg.h"
#include "executorimpl.h"
#include "executorInt.h"
#include "operator.h"
#include "querytask.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
...
...
source/libs/executor/src/dataDeleter.c
浏览文件 @
b7c8346b
...
...
@@ -15,7 +15,7 @@
#include "dataSinkInt.h"
#include "dataSinkMgt.h"
#include "executor
impl
.h"
#include "executor
Int
.h"
#include "planner.h"
#include "tcompression.h"
#include "tdatablock.h"
...
...
source/libs/executor/src/dataDispatcher.c
浏览文件 @
b7c8346b
...
...
@@ -15,7 +15,7 @@
#include "dataSinkInt.h"
#include "dataSinkMgt.h"
#include "executor
impl
.h"
#include "executor
Int
.h"
#include "planner.h"
#include "tcompression.h"
#include "tdatablock.h"
...
...
source/libs/executor/src/dataInserter.c
浏览文件 @
b7c8346b
...
...
@@ -15,7 +15,7 @@
#include "dataSinkInt.h"
#include "dataSinkMgt.h"
#include "executor
impl
.h"
#include "executor
Int
.h"
#include "planner.h"
#include "tcompression.h"
#include "tdatablock.h"
...
...
source/libs/executor/src/eventwindowoperator.c
浏览文件 @
b7c8346b
...
...
@@ -13,10 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executor
impl
.h"
#include "executor
Int
.h"
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
#include "operator.h"
#include "querytask.h"
#include "tcommon.h"
#include "tcompare.h"
#include "tdatablock.h"
...
...
source/libs/executor/src/exchangeoperator.c
浏览文件 @
b7c8346b
...
...
@@ -13,17 +13,19 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executorInt.h"
#include "filter.h"
#include "function.h"
#include "os.h"
#include "tname.h"
#include "tref.h"
#include "tdatablock.h"
#include "tmsg.h"
#include "executorimpl.h"
#include "index.h"
#include "operator.h"
#include "os.h"
#include "query.h"
#include "querytask.h"
#include "tdatablock.h"
#include "thash.h"
#include "tmsg.h"
#include "tname.h"
#include "tref.h"
typedef
struct
SFetchRspHandleWrapper
{
uint32_t
exchangeId
;
...
...
source/libs/executor/src/executil.c
浏览文件 @
b7c8346b
...
...
@@ -24,7 +24,8 @@
#include "ttime.h"
#include "executil.h"
#include "executorimpl.h"
#include "executorInt.h"
#include "querytask.h"
#include "tcompression.h"
typedef
struct
STableListIdInfo
{
...
...
source/libs/executor/src/executor.c
浏览文件 @
b7c8346b
...
...
@@ -14,9 +14,10 @@
*/
#include "executor.h"
#include
<vnode.h>
#include "
executorimpl
.h"
#include
"executorInt.h"
#include "
operator
.h"
#include "planner.h"
#include "querytask.h"
#include "tdatablock.h"
#include "tref.h"
#include "tudf.h"
...
...
@@ -249,7 +250,7 @@ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks,
qTaskInfo_t
qCreateQueueExecTaskInfo
(
void
*
msg
,
SReadHandle
*
pReaderHandle
,
int32_t
vgId
,
int32_t
*
numOfCols
,
uint64_t
id
)
{
if
(
msg
==
NULL
)
{
// create raw scan
SExecTaskInfo
*
pTaskInfo
=
doCreate
ExecTaskInfo
(
0
,
id
,
vgId
,
OPTR_EXEC_MODEL_QUEUE
,
""
);
SExecTaskInfo
*
pTaskInfo
=
doCreate
Task
(
0
,
id
,
vgId
,
OPTR_EXEC_MODEL_QUEUE
);
if
(
NULL
==
pTaskInfo
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
...
...
@@ -717,8 +718,6 @@ void qRemoveTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
taosArrayRemove
(
pTaskInfo
->
stopInfo
.
pStopInfo
,
idx
);
}
taosWUnLockLatch
(
&
pTaskInfo
->
stopInfo
.
lock
);
return
;
}
void
qStopTaskOperators
(
SExecTaskInfo
*
pTaskInfo
)
{
...
...
@@ -1303,3 +1302,25 @@ SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) {
taosArrayDestroy
(
plist
);
return
pUidList
;
}
static
void
extractTableList
(
SArray
*
pList
,
const
SOperatorInfo
*
pOperator
)
{
if
(
pOperator
->
operatorType
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
SStreamScanInfo
*
pScanInfo
=
pOperator
->
info
;
STableScanInfo
*
pTableScanInfo
=
pScanInfo
->
pTableScanOp
->
info
;
taosArrayPush
(
pList
,
&
pTableScanInfo
->
base
.
pTableListInfo
);
}
else
if
(
pOperator
->
operatorType
==
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
)
{
STableScanInfo
*
pScanInfo
=
pOperator
->
info
;
taosArrayPush
(
pList
,
&
pScanInfo
->
base
.
pTableListInfo
);
}
else
{
if
(
pOperator
->
pDownstream
!=
NULL
&&
pOperator
->
pDownstream
[
0
]
!=
NULL
)
{
extractTableList
(
pList
,
pOperator
->
pDownstream
[
0
]);
}
}
}
SArray
*
getTableListInfo
(
const
SExecTaskInfo
*
pTaskInfo
)
{
SArray
*
pArray
=
taosArrayInit
(
0
,
POINTER_BYTES
);
SOperatorInfo
*
pOperator
=
pTaskInfo
->
pRoot
;
extractTableList
(
pArray
,
pOperator
);
return
pArray
;
}
\ No newline at end of file
source/libs/executor/src/executor
impl
.c
→
source/libs/executor/src/executor
Int
.c
浏览文件 @
b7c8346b
此差异已折叠。
点击以展开。
source/libs/executor/src/filloperator.c
浏览文件 @
b7c8346b
...
...
@@ -20,16 +20,18 @@
#include "tmsg.h"
#include "ttypes.h"
#include "executor
impl
.h"
#include "executor
Int
.h"
#include "tcommon.h"
#include "thash.h"
#include "ttime.h"
#include "executorInt.h"
#include "function.h"
#include "querynodes.h"
#include "tdatablock.h"
#include "tfill.h"
#include "operator.h"
#include "querytask.h"
#define FILL_POS_INVALID 0
#define FILL_POS_START 1
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
b7c8346b
...
...
@@ -22,7 +22,8 @@
#include "tmsg.h"
#include "executorInt.h"
#include "executorimpl.h"
#include "operator.h"
#include "querytask.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
...
...
source/libs/executor/src/joinoperator.c
浏览文件 @
b7c8346b
...
...
@@ -13,11 +13,13 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executorInt.h"
#include "filter.h"
#include "executorimpl.h"
#include "function.h"
#include "operator.h"
#include "os.h"
#include "querynodes.h"
#include "querytask.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "thash.h"
...
...
source/libs/executor/src/operator.c
0 → 100644
浏览文件 @
b7c8346b
此差异已折叠。
点击以展开。
source/libs/executor/src/projectoperator.c
浏览文件 @
b7c8346b
...
...
@@ -13,9 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executor
impl
.h"
#include "executor
Int
.h"
#include "filter.h"
#include "functionMgt.h"
#include "operator.h"
#include "querytask.h"
typedef
struct
SProjectOperatorInfo
{
SOptrBasicInfo
binfo
;
...
...
source/libs/executor/src/querytask.c
0 → 100644
浏览文件 @
b7c8346b
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
#include "os.h"
#include "querynodes.h"
#include "tfill.h"
#include "tname.h"
#include "tdatablock.h"
#include "tmsg.h"
#include "executorInt.h"
#include "index.h"
#include "operator.h"
#include "query.h"
#include "querytask.h"
#include "thash.h"
#include "ttypes.h"
#include "vnode.h"
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
SExecTaskInfo
*
doCreateTask
(
uint64_t
queryId
,
uint64_t
taskId
,
int32_t
vgId
,
EOPTR_EXEC_MODEL
model
)
{
SExecTaskInfo
*
pTaskInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SExecTaskInfo
));
if
(
pTaskInfo
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
setTaskStatus
(
pTaskInfo
,
TASK_NOT_COMPLETED
);
pTaskInfo
->
cost
.
created
=
taosGetTimestampUs
();
pTaskInfo
->
execModel
=
model
;
pTaskInfo
->
stopInfo
.
pStopInfo
=
taosArrayInit
(
4
,
sizeof
(
SExchangeOpStopInfo
));
pTaskInfo
->
pResultBlockList
=
taosArrayInit
(
128
,
POINTER_BYTES
);
taosInitRWLatch
(
&
pTaskInfo
->
lock
);
pTaskInfo
->
id
.
vgId
=
vgId
;
pTaskInfo
->
id
.
queryId
=
queryId
;
pTaskInfo
->
id
.
str
=
taosMemoryMalloc
(
64
);
buildTaskId
(
taskId
,
queryId
,
pTaskInfo
->
id
.
str
);
return
pTaskInfo
;
}
bool
isTaskKilled
(
SExecTaskInfo
*
pTaskInfo
)
{
return
(
0
!=
pTaskInfo
->
code
);
}
void
setTaskKilled
(
SExecTaskInfo
*
pTaskInfo
,
int32_t
rspCode
)
{
pTaskInfo
->
code
=
rspCode
;
stopTableScanOperator
(
pTaskInfo
->
pRoot
,
pTaskInfo
->
id
.
str
);
}
void
setTaskStatus
(
SExecTaskInfo
*
pTaskInfo
,
int8_t
status
)
{
if
(
status
==
TASK_NOT_COMPLETED
)
{
pTaskInfo
->
status
=
status
;
}
else
{
// QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first
CLEAR_QUERY_STATUS
(
pTaskInfo
,
TASK_NOT_COMPLETED
);
pTaskInfo
->
status
|=
status
;
}
}
int32_t
createExecTaskInfo
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
SReadHandle
*
pHandle
,
uint64_t
taskId
,
int32_t
vgId
,
char
*
sql
,
EOPTR_EXEC_MODEL
model
)
{
*
pTaskInfo
=
doCreateTask
(
pPlan
->
id
.
queryId
,
taskId
,
vgId
,
model
);
if
(
*
pTaskInfo
==
NULL
)
{
taosMemoryFree
(
sql
);
return
terrno
;
}
if
(
pHandle
)
{
if
(
pHandle
->
pStateBackend
)
{
(
*
pTaskInfo
)
->
streamInfo
.
pState
=
pHandle
->
pStateBackend
;
}
}
TSWAP
((
*
pTaskInfo
)
->
sql
,
sql
);
(
*
pTaskInfo
)
->
pSubplan
=
pPlan
;
(
*
pTaskInfo
)
->
pRoot
=
createOperator
(
pPlan
->
pNode
,
*
pTaskInfo
,
pHandle
,
pPlan
->
pTagCond
,
pPlan
->
pTagIndexCond
,
pPlan
->
user
,
pPlan
->
dbFName
);
if
(
NULL
==
(
*
pTaskInfo
)
->
pRoot
)
{
int32_t
code
=
(
*
pTaskInfo
)
->
code
;
doDestroyTask
(
*
pTaskInfo
);
return
code
;
}
else
{
return
TSDB_CODE_SUCCESS
;
}
}
void
cleanupQueriedTableScanInfo
(
SSchemaInfo
*
pSchemaInfo
)
{
taosMemoryFreeClear
(
pSchemaInfo
->
dbname
);
taosMemoryFreeClear
(
pSchemaInfo
->
tablename
);
tDeleteSSchemaWrapper
(
pSchemaInfo
->
sw
);
tDeleteSSchemaWrapper
(
pSchemaInfo
->
qsw
);
}
int32_t
initQueriedTableSchemaInfo
(
SReadHandle
*
pHandle
,
SScanPhysiNode
*
pScanNode
,
const
char
*
dbName
,
SExecTaskInfo
*
pTaskInfo
)
{
SMetaReader
mr
=
{
0
};
if
(
pHandle
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_PARA
;
return
terrno
;
}
metaReaderInit
(
&
mr
,
pHandle
->
meta
,
0
);
int32_t
code
=
metaGetTableEntryByUidCache
(
&
mr
,
pScanNode
->
uid
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed to get the table meta, uid:0x%"
PRIx64
", suid:0x%"
PRIx64
", %s"
,
pScanNode
->
uid
,
pScanNode
->
suid
,
GET_TASKID
(
pTaskInfo
));
metaReaderClear
(
&
mr
);
return
terrno
;
}
SSchemaInfo
*
pSchemaInfo
=
&
pTaskInfo
->
schemaInfo
;
pSchemaInfo
->
tablename
=
taosStrdup
(
mr
.
me
.
name
);
pSchemaInfo
->
dbname
=
taosStrdup
(
dbName
);
if
(
mr
.
me
.
type
==
TSDB_SUPER_TABLE
)
{
pSchemaInfo
->
sw
=
tCloneSSchemaWrapper
(
&
mr
.
me
.
stbEntry
.
schemaRow
);
pSchemaInfo
->
tversion
=
mr
.
me
.
stbEntry
.
schemaTag
.
version
;
}
else
if
(
mr
.
me
.
type
==
TSDB_CHILD_TABLE
)
{
tDecoderClear
(
&
mr
.
coder
);
tb_uid_t
suid
=
mr
.
me
.
ctbEntry
.
suid
;
code
=
metaGetTableEntryByUidCache
(
&
mr
,
suid
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
metaReaderClear
(
&
mr
);
return
terrno
;
}
pSchemaInfo
->
sw
=
tCloneSSchemaWrapper
(
&
mr
.
me
.
stbEntry
.
schemaRow
);
pSchemaInfo
->
tversion
=
mr
.
me
.
stbEntry
.
schemaTag
.
version
;
}
else
{
pSchemaInfo
->
sw
=
tCloneSSchemaWrapper
(
&
mr
.
me
.
ntbEntry
.
schemaRow
);
}
metaReaderClear
(
&
mr
);
pSchemaInfo
->
qsw
=
extractQueriedColumnSchema
(
pScanNode
);
return
TSDB_CODE_SUCCESS
;
}
SSchemaWrapper
*
extractQueriedColumnSchema
(
SScanPhysiNode
*
pScanNode
)
{
int32_t
numOfCols
=
LIST_LENGTH
(
pScanNode
->
pScanCols
);
int32_t
numOfTags
=
LIST_LENGTH
(
pScanNode
->
pScanPseudoCols
);
SSchemaWrapper
*
pqSw
=
taosMemoryCalloc
(
1
,
sizeof
(
SSchemaWrapper
));
pqSw
->
pSchema
=
taosMemoryCalloc
(
numOfCols
+
numOfTags
,
sizeof
(
SSchema
));
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
STargetNode
*
pNode
=
(
STargetNode
*
)
nodesListGetNode
(
pScanNode
->
pScanCols
,
i
);
SColumnNode
*
pColNode
=
(
SColumnNode
*
)
pNode
->
pExpr
;
SSchema
*
pSchema
=
&
pqSw
->
pSchema
[
pqSw
->
nCols
++
];
pSchema
->
colId
=
pColNode
->
colId
;
pSchema
->
type
=
pColNode
->
node
.
resType
.
type
;
pSchema
->
bytes
=
pColNode
->
node
.
resType
.
bytes
;
tstrncpy
(
pSchema
->
name
,
pColNode
->
colName
,
tListLen
(
pSchema
->
name
));
}
// this the tags and pseudo function columns, we only keep the tag columns
for
(
int32_t
i
=
0
;
i
<
numOfTags
;
++
i
)
{
STargetNode
*
pNode
=
(
STargetNode
*
)
nodesListGetNode
(
pScanNode
->
pScanPseudoCols
,
i
);
int32_t
type
=
nodeType
(
pNode
->
pExpr
);
if
(
type
==
QUERY_NODE_COLUMN
)
{
SColumnNode
*
pColNode
=
(
SColumnNode
*
)
pNode
->
pExpr
;
SSchema
*
pSchema
=
&
pqSw
->
pSchema
[
pqSw
->
nCols
++
];
pSchema
->
colId
=
pColNode
->
colId
;
pSchema
->
type
=
pColNode
->
node
.
resType
.
type
;
pSchema
->
bytes
=
pColNode
->
node
.
resType
.
bytes
;
tstrncpy
(
pSchema
->
name
,
pColNode
->
colName
,
tListLen
(
pSchema
->
name
));
}
}
return
pqSw
;
}
static
void
cleanupStreamInfo
(
SStreamTaskInfo
*
pStreamInfo
)
{
tDeleteSSchemaWrapper
(
pStreamInfo
->
schema
);
}
static
void
freeBlock
(
void
*
pParam
)
{
SSDataBlock
*
pBlock
=
*
(
SSDataBlock
**
)
pParam
;
blockDataDestroy
(
pBlock
);
}
void
doDestroyTask
(
SExecTaskInfo
*
pTaskInfo
)
{
qDebug
(
"%s execTask is freed"
,
GET_TASKID
(
pTaskInfo
));
destroyOperator
(
pTaskInfo
->
pRoot
);
cleanupQueriedTableScanInfo
(
&
pTaskInfo
->
schemaInfo
);
cleanupStreamInfo
(
&
pTaskInfo
->
streamInfo
);
if
(
!
pTaskInfo
->
localFetch
.
localExec
)
{
nodesDestroyNode
((
SNode
*
)
pTaskInfo
->
pSubplan
);
}
taosArrayDestroyEx
(
pTaskInfo
->
pResultBlockList
,
freeBlock
);
taosArrayDestroy
(
pTaskInfo
->
stopInfo
.
pStopInfo
);
taosMemoryFreeClear
(
pTaskInfo
->
sql
);
taosMemoryFreeClear
(
pTaskInfo
->
id
.
str
);
taosMemoryFreeClear
(
pTaskInfo
);
}
void
buildTaskId
(
uint64_t
taskId
,
uint64_t
queryId
,
char
*
dst
)
{
char
*
p
=
dst
;
int32_t
offset
=
6
;
memcpy
(
p
,
"TID:0x"
,
offset
);
offset
+=
tintToHex
(
taskId
,
&
p
[
offset
]);
memcpy
(
&
p
[
offset
],
" QID:0x"
,
7
);
offset
+=
7
;
offset
+=
tintToHex
(
queryId
,
&
p
[
offset
]);
p
[
offset
]
=
0
;
}
source/libs/executor/src/scanoperator.c
浏览文件 @
b7c8346b
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executor
impl
.h"
#include "executor
Int
.h"
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
...
...
@@ -30,6 +30,8 @@
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
#include "operator.h"
#include "querytask.h"
int32_t
scanDebug
=
0
;
...
...
@@ -2300,7 +2302,7 @@ static void destroyStreamScanOperatorInfo(void* param) {
SStreamScanInfo
*
pStreamScan
=
(
SStreamScanInfo
*
)
param
;
if
(
pStreamScan
->
pTableScanOp
&&
pStreamScan
->
pTableScanOp
->
info
)
{
destroyOperator
Info
(
pStreamScan
->
pTableScanOp
);
destroyOperator
(
pStreamScan
->
pTableScanOp
);
}
if
(
pStreamScan
->
tqReader
)
{
...
...
source/libs/executor/src/sortoperator.c
浏览文件 @
b7c8346b
...
...
@@ -13,8 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executorInt.h"
#include "filter.h"
#include "executorimpl.h"
#include "operator.h"
#include "querytask.h"
#include "tdatablock.h"
typedef
struct
SSortOperatorInfo
{
...
...
source/libs/executor/src/sysscanoperator.c
浏览文件 @
b7c8346b
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executor
impl
.h"
#include "executor
Int
.h"
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
...
...
@@ -31,6 +31,9 @@
#include "thash.h"
#include "ttypes.h"
#include "vnode.h"
#include "operator.h"
#include "querytask.h"
typedef
int
(
*
__optSysFilter
)(
void
*
a
,
void
*
b
,
int16_t
dtype
);
typedef
int32_t
(
*
__sys_filte
)(
void
*
pMeta
,
SNode
*
cond
,
SArray
*
result
);
...
...
source/libs/executor/src/tfill.c
浏览文件 @
b7c8346b
...
...
@@ -20,7 +20,7 @@
#include "tmsg.h"
#include "ttypes.h"
#include "executor
impl
.h"
#include "executor
Int
.h"
#include "tcommon.h"
#include "thash.h"
#include "ttime.h"
...
...
source/libs/executor/src/timesliceoperator.c
浏览文件 @
b7c8346b
...
...
@@ -12,10 +12,12 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executor
impl
.h"
#include "executor
Int
.h"
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
#include "operator.h"
#include "querytask.h"
#include "tcommon.h"
#include "tcompare.h"
#include "tdatablock.h"
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
b7c8346b
...
...
@@ -12,10 +12,12 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executor
impl
.h"
#include "executor
Int
.h"
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
#include "operator.h"
#include "querytask.h"
#include "tcommon.h"
#include "tcompare.h"
#include "tdatablock.h"
...
...
@@ -27,6 +29,11 @@
#define IS_FINAL_OP(op) ((op)->isFinal)
#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL);
typedef
struct
SStateWindowInfo
{
SResultWindowInfo
winInfo
;
SStateKeys
*
pStateKey
;
}
SStateWindowInfo
;
typedef
struct
SSessionAggOperatorInfo
{
SOptrBasicInfo
binfo
;
...
...
@@ -2798,7 +2805,7 @@ void destroyStreamSessionAggOperatorInfo(void* param) {
int32_t
size
=
taosArrayGetSize
(
pInfo
->
pChildren
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
SOperatorInfo
*
pChild
=
taosArrayGetP
(
pInfo
->
pChildren
,
i
);
destroyOperator
Info
(
pChild
);
destroyOperator
(
pChild
);
}
taosArrayDestroy
(
pInfo
->
pChildren
);
}
...
...
@@ -3776,7 +3783,7 @@ void destroyStreamStateOperatorInfo(void* param) {
int32_t
size
=
taosArrayGetSize
(
pInfo
->
pChildren
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
SOperatorInfo
*
pChild
=
taosArrayGetP
(
pInfo
->
pChildren
,
i
);
destroyOperator
Info
(
pChild
);
destroyOperator
(
pChild
);
}
taosArrayDestroy
(
pInfo
->
pChildren
);
}
...
...
source/libs/executor/test/executorTests.cpp
浏览文件 @
b7c8346b
...
...
@@ -24,15 +24,12 @@
#include "os.h"
#include "executor.h"
#include "executor
impl
.h"
#include "executor
Int
.h"
#include "function.h"
#include "operator.h"
#include "taos.h"
#include "tdatablock.h"
#include "tdef.h"
#include "tglobal.h"
#include "tmsg.h"
#include "tname.h"
#include "trpc.h"
#include "tvariant.h"
namespace
{
...
...
source/libs/executor/test/lhashTests.cpp
浏览文件 @
b7c8346b
...
...
@@ -15,7 +15,7 @@
#include <gtest/gtest.h>
#include <iostream>
#include "executor
impl
.h"
#include "executor
Int
.h"
#include "tlinearhash.h"
#pragma GCC diagnostic push
...
...
source/libs/executor/test/sortTests.cpp
浏览文件 @
b7c8346b
...
...
@@ -26,7 +26,7 @@
#include "os.h"
#include "executor.h"
#include "executor
impl
.h"
#include "executor
Int
.h"
#include "taos.h"
#include "tcompare.h"
#include "tdatablock.h"
...
...
source/libs/stream/src/streamExec.c
浏览文件 @
b7c8346b
...
...
@@ -167,20 +167,24 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
batchCnt
++
;
qDebug
(
"
task %d scan exec block num %d, block limit %d"
,
pTask
->
id
.
taskId
,
batchCnt
,
batchSz
);
qDebug
(
"
s-task:%s scan exec block num %d, block limit %d"
,
pTask
->
id
.
idStr
,
batchCnt
,
batchSz
);
if
(
batchCnt
>=
batchSz
)
break
;
if
(
batchCnt
>=
batchSz
)
{
break
;
}
}
if
(
taosArrayGetSize
(
pRes
)
==
0
)
{
if
(
finished
)
{
taosArrayDestroy
(
pRes
);
qDebug
(
"
task %d finish recover exec task "
,
pTask
->
id
.
taskId
);
qDebug
(
"
s-task:%s finish recover exec task "
,
pTask
->
id
.
idStr
);
break
;
}
else
{
qDebug
(
"
task %d continue recover exec task "
,
pTask
->
id
.
taskId
);
qDebug
(
"
s-task:%s continue recover exec task "
,
pTask
->
id
.
idStr
);
continue
;
}
}
SStreamDataBlock
*
qRes
=
taosAllocateQitem
(
sizeof
(
SStreamDataBlock
),
DEF_QITEM
,
0
);
if
(
qRes
==
NULL
)
{
taosArrayDestroyEx
(
pRes
,
(
FDelete
)
blockDataFreeRes
);
...
...
source/libs/stream/src/streamRecover.c
浏览文件 @
b7c8346b
...
...
@@ -20,6 +20,8 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) {
if
(
pTask
->
taskLevel
==
TASK_LEVEL__SOURCE
)
{
atomic_store_8
(
&
pTask
->
status
.
taskStatus
,
TASK_STATUS__RECOVER_PREPARE
);
qDebug
(
"s-task:%s set task status:%d and start recover"
,
pTask
->
id
.
idStr
,
pTask
->
status
.
taskStatus
);
streamSetParamForRecover
(
pTask
);
streamSourceRecoverPrepareStep1
(
pTask
,
version
);
...
...
@@ -197,7 +199,6 @@ int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req*
}
int32_t
streamSourceRecoverScanStep1
(
SStreamTask
*
pTask
)
{
//
return
streamScanExec
(
pTask
,
100
);
}
...
...
@@ -210,8 +211,11 @@ int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req*
int32_t
streamSourceRecoverScanStep2
(
SStreamTask
*
pTask
,
int64_t
ver
)
{
void
*
exec
=
pTask
->
exec
.
pExecutor
;
qDebug
(
"s-task:%s recover step2(blocking stage) started"
,
pTask
->
id
.
idStr
);
if
(
qStreamSourceRecoverStep2
(
exec
,
ver
)
<
0
)
{
}
return
streamScanExec
(
pTask
,
100
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录