Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8ad85146
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
8ad85146
编写于
11月 21, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
11月 21, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #18305 from taosdata/fix/liao_cov
refactor: do some internal refactor, and update the logs.
上级
8552252f
1bd0cc4a
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
13 addition
and
12 deletion
+13
-12
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-0
source/libs/executor/src/exchangeoperator.c
source/libs/executor/src/exchangeoperator.c
+9
-11
source/libs/executor/src/sortoperator.c
source/libs/executor/src/sortoperator.c
+3
-1
未找到文件。
source/libs/executor/inc/executorimpl.h
浏览文件 @
8ad85146
...
...
@@ -239,6 +239,7 @@ typedef struct SSourceDataInfo {
int32_t
index
;
SRetrieveTableRsp
*
pRsp
;
uint64_t
totalRows
;
int64_t
startTime
;
int32_t
code
;
EX_SOURCE_STATUS
status
;
const
char
*
taskId
;
...
...
source/libs/executor/src/exchangeoperator.c
浏览文件 @
8ad85146
...
...
@@ -44,7 +44,7 @@ typedef struct SFetchRspHandleWrapper {
static
void
destroyExchangeOperatorInfo
(
void
*
param
);
static
void
freeBlock
(
void
*
pParam
);
static
void
freeSourceDataInfo
(
void
*
param
);
static
void
*
setAllSourcesCompleted
(
SOperatorInfo
*
pOperator
,
int64_t
startTs
);
static
void
*
setAllSourcesCompleted
(
SOperatorInfo
*
pOperator
);
static
int32_t
loadRemoteDataCallback
(
void
*
param
,
SDataBuf
*
pMsg
,
int32_t
code
);
static
int32_t
doSendFetchDataRequest
(
SExchangeInfo
*
pExchangeInfo
,
SExecTaskInfo
*
pTaskInfo
,
int32_t
sourceIndex
);
...
...
@@ -59,7 +59,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
size_t
totalSources
=
taosArrayGetSize
(
pExchangeInfo
->
pSourceDataInfo
);
int32_t
completed
=
getCompletedSources
(
pExchangeInfo
->
pSourceDataInfo
);
if
(
completed
==
totalSources
)
{
setAllSourcesCompleted
(
pOperator
,
pExchangeInfo
->
openedTs
);
setAllSourcesCompleted
(
pOperator
);
return
;
}
...
...
@@ -113,7 +113,8 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
taosArrayPush
(
pExchangeInfo
->
pResultBlockList
,
&
pb
);
}
updateLoadRemoteInfo
(
pLoadInfo
,
pRetrieveRsp
->
numOfRows
,
pRetrieveRsp
->
compLen
,
pExchangeInfo
->
openedTs
,
pOperator
);
updateLoadRemoteInfo
(
pLoadInfo
,
pRetrieveRsp
->
numOfRows
,
pRetrieveRsp
->
compLen
,
pDataInfo
->
startTime
,
pOperator
);
pDataInfo
->
totalRows
+=
pRetrieveRsp
->
numOfRows
;
if
(
pRsp
->
completed
==
1
)
{
pDataInfo
->
status
=
EX_SOURCE_DATA_EXHAUSTED
;
...
...
@@ -388,6 +389,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
SDownstreamSourceNode
*
pSource
=
taosArrayGet
(
pExchangeInfo
->
pSources
,
sourceIndex
);
SSourceDataInfo
*
pDataInfo
=
taosArrayGet
(
pExchangeInfo
->
pSourceDataInfo
,
sourceIndex
);
pDataInfo
->
startTime
=
taosGetTimestampUs
();
ASSERT
(
pDataInfo
->
status
==
EX_SOURCE_DATA_NOT_READY
);
...
...
@@ -493,18 +495,14 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo
return
TSDB_CODE_SUCCESS
;
}
void
*
setAllSourcesCompleted
(
SOperatorInfo
*
pOperator
,
int64_t
startTs
)
{
void
*
setAllSourcesCompleted
(
SOperatorInfo
*
pOperator
)
{
SExchangeInfo
*
pExchangeInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
int64_t
el
=
taosGetTimestampUs
()
-
startTs
;
SLoadRemoteDataInfo
*
pLoadInfo
=
&
pExchangeInfo
->
loadInfo
;
pLoadInfo
->
totalElapsed
+=
el
;
size_t
totalSources
=
taosArrayGetSize
(
pExchangeInfo
->
pSources
);
qDebug
(
"%s all %"
PRIzu
" sources are exhausted, total rows: %"
PRIu64
"
bytes:%"
PRIu64
"
, elapsed:%.2f ms"
,
GET_TASKID
(
pTaskInfo
),
totalSources
,
pLoadInfo
->
totalRows
,
pLoadInfo
->
totalSize
,
qDebug
(
"%s all %"
PRIzu
" sources are exhausted, total rows: %"
PRIu64
"
, %.2f Kb
, elapsed:%.2f ms"
,
GET_TASKID
(
pTaskInfo
),
totalSources
,
pLoadInfo
->
totalRows
,
pLoadInfo
->
totalSize
/
1024
.
0
,
pLoadInfo
->
totalElapsed
/
1000
.
0
);
setOperatorCompleted
(
pOperator
);
...
...
@@ -566,7 +564,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
while
(
1
)
{
if
(
pExchangeInfo
->
current
>=
totalSources
)
{
setAllSourcesCompleted
(
pOperator
,
startTs
);
setAllSourcesCompleted
(
pOperator
);
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/executor/src/sortoperator.c
浏览文件 @
8ad85146
...
...
@@ -690,6 +690,8 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) {
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
qDebug
(
"start to merge final sorted rows, %s"
,
GET_TASKID
(
pTaskInfo
));
SSDataBlock
*
pBlock
=
getMultiwaySortedBlockData
(
pInfo
->
pSortHandle
,
pInfo
->
binfo
.
pRes
,
pInfo
->
matchInfo
.
pList
,
pOperator
);
if
(
pBlock
!=
NULL
)
{
pOperator
->
resultInfo
.
totalRows
+=
pBlock
->
info
.
rows
;
...
...
@@ -754,7 +756,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
SPhysiNode
*
pChildNode
=
(
SPhysiNode
*
)
nodesListGetNode
(
pPhyNode
->
pChildren
,
0
);
SSDataBlock
*
pInputBlock
=
createResDataBlock
(
pChildNode
->
pOutputDataBlockDesc
);
initResultSizeInfo
(
&
pOperator
->
resultInfo
,
1024
);
initResultSizeInfo
(
&
pOperator
->
resultInfo
,
4096
);
blockDataEnsureCapacity
(
pInfo
->
binfo
.
pRes
,
pOperator
->
resultInfo
.
capacity
);
pInfo
->
groupSort
=
pMergePhyNode
->
groupSort
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录