Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
2225411e
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
2225411e
编写于
7月 10, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor(stream)
上级
8afb5454
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
29 addition
and
16 deletion
+29
-16
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+1
-1
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+9
-6
source/dnode/vnode/src/tq/tqExec.c
source/dnode/vnode/src/tq/tqExec.c
+16
-6
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-0
source/libs/executor/src/executorMain.c
source/libs/executor/src/executorMain.c
+0
-2
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+2
-1
未找到文件。
source/dnode/vnode/src/inc/tq.h
浏览文件 @
2225411e
...
...
@@ -129,7 +129,7 @@ typedef struct {
static
STqMgmt
tqMgmt
=
{
0
};
// tqRead
int64_t
tqScan
Log
(
STQ
*
pTq
,
const
STqExecHandle
*
pExec
,
SMqDataRsp
*
pRsp
,
STqOffsetVal
*
offset
);
int64_t
tqScan
(
STQ
*
pTq
,
const
STqExecHandle
*
pExec
,
SMqDataRsp
*
pRsp
,
STqOffsetVal
*
offset
);
int64_t
tqFetchLog
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
int64_t
*
fetchOffset
,
SWalCkHead
**
pHeadWithCkSum
);
// tqExec
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
2225411e
...
...
@@ -112,7 +112,8 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq,
};
tmsgSendRsp
(
&
resp
);
tqDebug
(
"vgId:%d from consumer:%"
PRId64
", (epoch %d) send rsp, res msg type %d, reqOffset:%"
PRId64
", rspOffset:%"
PRId64
,
tqDebug
(
"vgId:%d from consumer:%"
PRId64
", (epoch %d) send rsp, res msg type %d, reqOffset:%"
PRId64
", rspOffset:%"
PRId64
,
TD_VID
(
pTq
->
pVnode
),
pReq
->
consumerId
,
pReq
->
epoch
,
pRsp
->
resMsgType
,
pRsp
->
reqOffset
,
pRsp
->
rspOffset
);
return
0
;
...
...
@@ -179,8 +180,8 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) {
tDecoderClear
(
&
decoder
);
if
(
offset
.
val
.
type
==
TMQ_OFFSET__SNAPSHOT_DATA
)
{
tqDebug
(
"receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%"
PRId64
", ts:%"
PRId64
,
offset
.
subKey
,
TD_VID
(
pTq
->
pVnode
),
offset
.
val
.
uid
,
offset
.
val
.
ts
);
tqDebug
(
"receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%"
PRId64
", ts:%"
PRId64
,
offset
.
subKey
,
TD_VID
(
pTq
->
pVnode
),
offset
.
val
.
uid
,
offset
.
val
.
ts
);
}
else
if
(
offset
.
val
.
type
==
TMQ_OFFSET__LOG
)
{
tqDebug
(
"receive offset commit msg to %s on vgId:%d, offset(type:log) version:%"
PRId64
,
offset
.
subKey
,
TD_VID
(
pTq
->
pVnode
),
offset
.
val
.
version
);
...
...
@@ -316,9 +317,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
}
// 3.query
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
&&
fetchOffsetNew
.
type
==
TMQ_OFFSET__LOG
)
{
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
fetchOffsetNew
.
version
++
;
if
(
tqScan
Log
(
pTq
,
&
pHandle
->
execHandle
,
&
dataRsp
,
&
fetchOffsetNew
)
<
0
)
{
if
(
tqScan
(
pTq
,
&
pHandle
->
execHandle
,
&
dataRsp
,
&
fetchOffsetNew
)
<
0
)
{
ASSERT
(
0
);
code
=
-
1
;
goto
OVER
;
...
...
@@ -333,7 +334,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
goto
OVER
;
}
if
(
pHandle
->
execHandle
.
subType
!=
TOPIC_SUB_TYPE__COLUMN
&&
fetchOffsetNew
.
type
==
TMQ_OFFSET__LOG
)
{
if
(
pHandle
->
execHandle
.
subType
!=
TOPIC_SUB_TYPE__COLUMN
)
{
int64_t
fetchVer
=
fetchOffsetNew
.
version
+
1
;
SWalCkHead
*
pCkHead
=
taosMemoryMalloc
(
sizeof
(
SWalCkHead
)
+
2048
);
if
(
pCkHead
==
NULL
)
{
...
...
@@ -411,6 +412,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
}
taosMemoryFree
(
pCkHead
);
#if 0
} else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_DATA) {
tqInfo("retrieve using snapshot actual offset: uid %" PRId64 " ts %" PRId64, fetchOffsetNew.uid, fetchOffsetNew.ts);
if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) {
...
...
@@ -421,6 +423,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1;
}
#endif
}
else
if
(
fetchOffsetNew
.
type
==
TMQ_OFFSET__SNAPSHOT_META
)
{
ASSERT
(
0
);
}
...
...
source/dnode/vnode/src/tq/tqExec.c
浏览文件 @
2225411e
...
...
@@ -59,15 +59,17 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
return
0
;
}
int64_t
tqScan
Log
(
STQ
*
pTq
,
const
STqExecHandle
*
pExec
,
SMqDataRsp
*
pRsp
,
STqOffsetVal
*
pOffset
)
{
int64_t
tqScan
(
STQ
*
pTq
,
const
STqExecHandle
*
pExec
,
SMqDataRsp
*
pRsp
,
STqOffsetVal
*
pOffset
)
{
qTaskInfo_t
task
=
pExec
->
execCol
.
task
[
0
];
if
(
qStreamPrepareScan
(
task
,
pOffset
)
<
0
)
{
ASSERT
(
pOffset
->
type
==
TMQ_OFFSET__LOG
);
pRsp
->
rspOffset
=
*
pOffset
;
pRsp
->
rspOffset
.
version
--
;
return
0
;
}
int32_t
rowCnt
=
0
;
while
(
1
)
{
SSDataBlock
*
pDataBlock
=
NULL
;
uint64_t
ts
=
0
;
...
...
@@ -77,11 +79,19 @@ int64_t tqScanLog(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOff
if
(
pDataBlock
!=
NULL
)
{
tqAddBlockDataToRsp
(
pDataBlock
,
pRsp
);
pRsp
->
blockNum
++
;
if
(
pRsp
->
withTbName
)
{
int64_t
uid
=
pExec
->
pExecReader
[
0
]
->
msgIter
.
uid
;
tqAddTbNameToRsp
(
pTq
,
uid
,
pRsp
);
if
(
pOffset
->
type
==
TMQ_OFFSET__LOG
)
{
int64_t
uid
=
pExec
->
pExecReader
[
0
]
->
msgIter
.
uid
;
tqAddTbNameToRsp
(
pTq
,
uid
,
pRsp
);
}
else
{
pRsp
->
withTbName
=
0
;
}
}
if
(
pOffset
->
type
==
TMQ_OFFSET__LOG
)
{
rowCnt
+=
pDataBlock
->
info
.
rows
;
if
(
rowCnt
>=
4096
)
break
;
}
pRsp
->
blockNum
++
;
continue
;
}
...
...
@@ -94,12 +104,12 @@ int64_t tqScanLog(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOff
ASSERT
(
0
);
}
ASSERT
(
pRsp
->
rspOffset
.
type
!=
0
);
if
(
pRsp
->
rspOffset
.
type
==
TMQ_OFFSET__LOG
)
{
ASSERT
(
pRsp
->
rspOffset
.
version
+
1
>=
pRsp
->
reqOffset
.
version
);
}
ASSERT
(
pRsp
->
rspOffset
.
type
!=
0
);
break
;
}
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
2225411e
...
...
@@ -144,6 +144,7 @@ typedef struct {
void
*
metaBlk
;
// for tmq fetching meta
SSDataBlock
*
pullOverBlk
;
// for streaming
SWalFilterCond
cond
;
int64_t
lastScanUid
;
}
SStreamTaskInfo
;
typedef
struct
SExecTaskInfo
{
...
...
source/libs/executor/src/executorMain.c
浏览文件 @
2225411e
...
...
@@ -336,8 +336,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
qDebug
(
"tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d"
,
uid
,
ts
,
pTableScanInfo
->
currentTable
,
tableSz
);
}
else
{
// switch to log
}
}
else
{
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
2225411e
...
...
@@ -1273,6 +1273,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
/*pTaskInfo->streamInfo.lastStatus.version = pTaskInfo->streamInfo.prepareStatus.version - 1;*/
/*} else {*/
pTaskInfo
->
streamInfo
.
lastStatus
=
ret
.
offset
;
/*pTaskInfo->streamInfo.lastScanUid */
ASSERT
(
pTaskInfo
->
streamInfo
.
lastStatus
.
version
+
1
>=
pTaskInfo
->
streamInfo
.
prepareStatus
.
version
);
/*}*/
return
NULL
;
...
...
@@ -1490,7 +1491,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
return
(
pBlockInfo
->
rows
==
0
)
?
NULL
:
pInfo
->
pRes
;
}
else
if
(
pInfo
->
blockType
==
STREAM_INPUT__TABLE_SCAN
)
{
/*ASSERT(0);*/
ASSERT
(
0
);
// check reader last status
// if not match, reset status
SSDataBlock
*
pResult
=
doTableScan
(
pInfo
->
pTableScanOp
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录