Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
bf78f63e
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
bf78f63e
编写于
3月 24, 2023
作者:
wmmhello
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix:send data batch if consume wal where subscribe db
上级
20e2010f
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
30 addition
and
56 deletion
+30
-56
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+1
-2
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+26
-30
source/dnode/vnode/src/tq/tqExec.c
source/dnode/vnode/src/tq/tqExec.c
+3
-24
未找到文件。
source/dnode/vnode/src/inc/tq.h
浏览文件 @
bf78f63e
...
@@ -144,8 +144,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
...
@@ -144,8 +144,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
int32_t
tqFetchLog
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
int64_t
*
fetchOffset
,
SWalCkHead
**
pHeadWithCkSum
,
uint64_t
reqId
);
int32_t
tqFetchLog
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
int64_t
*
fetchOffset
,
SWalCkHead
**
pHeadWithCkSum
,
uint64_t
reqId
);
// tqExec
// tqExec
int32_t
tqTaosxScanLog
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
SPackedData
submit
,
STaosxRsp
*
pRsp
);
int32_t
tqTaosxScanLog
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
SPackedData
submit
,
STaosxRsp
*
pRsp
,
int32_t
*
totalRows
);
// int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp);
int32_t
tqAddBlockDataToRsp
(
const
SSDataBlock
*
pBlock
,
SMqDataRsp
*
pRsp
,
int32_t
numOfCols
,
int8_t
precision
);
int32_t
tqAddBlockDataToRsp
(
const
SSDataBlock
*
pBlock
,
SMqDataRsp
*
pRsp
,
int32_t
numOfCols
,
int8_t
precision
);
int32_t
tqSendDataRsp
(
STQ
*
pTq
,
const
SRpcMsg
*
pMsg
,
const
SMqPollReq
*
pReq
,
const
SMqDataRsp
*
pRsp
,
int32_t
type
);
int32_t
tqSendDataRsp
(
STQ
*
pTq
,
const
SRpcMsg
*
pMsg
,
const
SMqPollReq
*
pReq
,
const
SMqDataRsp
*
pRsp
,
int32_t
type
);
int32_t
tqPushDataRsp
(
STQ
*
pTq
,
STqPushEntry
*
pPushEntry
);
int32_t
tqPushDataRsp
(
STQ
*
pTq
,
STqPushEntry
*
pPushEntry
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
bf78f63e
...
@@ -596,8 +596,6 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
...
@@ -596,8 +596,6 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
}
}
if
(
offset
.
type
==
TMQ_OFFSET__LOG
)
{
if
(
offset
.
type
==
TMQ_OFFSET__LOG
)
{
// if (offset.type == TMQ_OFFSET__LOG) {
int64_t
fetchVer
=
offset
.
version
+
1
;
int64_t
fetchVer
=
offset
.
version
+
1
;
pCkHead
=
taosMemoryMalloc
(
sizeof
(
SWalCkHead
)
+
2048
);
pCkHead
=
taosMemoryMalloc
(
sizeof
(
SWalCkHead
)
+
2048
);
if
(
pCkHead
==
NULL
)
{
if
(
pCkHead
==
NULL
)
{
...
@@ -605,9 +603,8 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
...
@@ -605,9 +603,8 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
return
-
1
;
}
}
walSetReaderCapacity
(
pHandle
->
pWalReader
,
2048
);
walSetReaderCapacity
(
pHandle
->
pWalReader
,
2048
);
int
totalRows
=
0
;
while
(
1
)
{
while
(
1
)
{
// todo refactor: this is not correct.
// todo refactor: this is not correct.
int32_t
savedEpoch
=
atomic_load_32
(
&
pHandle
->
epoch
);
int32_t
savedEpoch
=
atomic_load_32
(
&
pHandle
->
epoch
);
...
@@ -630,32 +627,8 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
...
@@ -630,32 +627,8 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
tqDebug
(
"tmq poll: consumer:0x%"
PRIx64
" (epoch %d) iter log, vgId:%d offset %"
PRId64
" msgType %d"
,
consumerId
,
tqDebug
(
"tmq poll: consumer:0x%"
PRIx64
" (epoch %d) iter log, vgId:%d offset %"
PRId64
" msgType %d"
,
consumerId
,
pRequest
->
epoch
,
vgId
,
fetchVer
,
pHead
->
msgType
);
pRequest
->
epoch
,
vgId
,
fetchVer
,
pHead
->
msgType
);
if
(
pHead
->
msgType
==
TDMT_VND_SUBMIT
)
{
// process meta
SPackedData
submit
=
{
if
(
pHead
->
msgType
!=
TDMT_VND_SUBMIT
)
{
.
msgStr
=
POINTER_SHIFT
(
pHead
->
body
,
sizeof
(
SSubmitReq2Msg
)),
.
msgLen
=
pHead
->
bodyLen
-
sizeof
(
SSubmitReq2Msg
),
.
ver
=
pHead
->
version
,
};
if
(
tqTaosxScanLog
(
pTq
,
pHandle
,
submit
,
&
taosxRsp
)
<
0
)
{
tqError
(
"tmq poll: tqTaosxScanLog error %"
PRId64
", in vgId:%d, subkey %s"
,
consumerId
,
vgId
,
pRequest
->
subKey
);
return
-
1
;
}
if
(
taosxRsp
.
blockNum
>
0
)
{
tqOffsetResetToLog
(
&
taosxRsp
.
rspOffset
,
fetchVer
);
code
=
tqSendDataRsp
(
pTq
,
pMsg
,
pRequest
,
(
SMqDataRsp
*
)
&
taosxRsp
,
TMQ_MSG_TYPE__TAOSX_RSP
);
tDeleteSTaosxRsp
(
&
taosxRsp
);
taosMemoryFreeClear
(
pCkHead
);
return
code
;
}
else
{
fetchVer
++
;
}
}
else
{
/*A(pHandle->fetchMeta);*/
/*A(IS_META_MSG(pHead->msgType));*/
tqDebug
(
"fetch meta msg, ver:%"
PRId64
", type:%s"
,
pHead
->
version
,
TMSG_INFO
(
pHead
->
msgType
));
tqDebug
(
"fetch meta msg, ver:%"
PRId64
", type:%s"
,
pHead
->
version
,
TMSG_INFO
(
pHead
->
msgType
));
tqOffsetResetToLog
(
&
metaRsp
.
rspOffset
,
fetchVer
);
tqOffsetResetToLog
(
&
metaRsp
.
rspOffset
,
fetchVer
);
metaRsp
.
resMsgType
=
pHead
->
msgType
;
metaRsp
.
resMsgType
=
pHead
->
msgType
;
...
@@ -672,6 +645,29 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
...
@@ -672,6 +645,29 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
tDeleteSTaosxRsp
(
&
taosxRsp
);
tDeleteSTaosxRsp
(
&
taosxRsp
);
return
code
;
return
code
;
}
}
// process data
SPackedData
submit
=
{
.
msgStr
=
POINTER_SHIFT
(
pHead
->
body
,
sizeof
(
SSubmitReq2Msg
)),
.
msgLen
=
pHead
->
bodyLen
-
sizeof
(
SSubmitReq2Msg
),
.
ver
=
pHead
->
version
,
};
if
(
tqTaosxScanLog
(
pTq
,
pHandle
,
submit
,
&
taosxRsp
,
&
totalRows
)
<
0
)
{
tqError
(
"tmq poll: tqTaosxScanLog error %"
PRId64
", in vgId:%d, subkey %s"
,
consumerId
,
vgId
,
pRequest
->
subKey
);
return
-
1
;
}
if
(
totalRows
>=
4096
||
taosxRsp
.
createTableNum
>
0
)
{
tqOffsetResetToLog
(
&
taosxRsp
.
rspOffset
,
fetchVer
);
code
=
tqSendDataRsp
(
pTq
,
pMsg
,
pRequest
,
(
SMqDataRsp
*
)
&
taosxRsp
,
TMQ_MSG_TYPE__TAOSX_RSP
);
tDeleteSTaosxRsp
(
&
taosxRsp
);
taosMemoryFreeClear
(
pCkHead
);
return
code
;
}
else
{
fetchVer
++
;
}
}
}
}
}
...
...
source/dnode/vnode/src/tq/tqExec.c
浏览文件 @
bf78f63e
...
@@ -230,23 +230,15 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
...
@@ -230,23 +230,15 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
return
0
;
return
0
;
}
}
int32_t
tqTaosxScanLog
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
SPackedData
submit
,
STaosxRsp
*
pRsp
)
{
int32_t
tqTaosxScanLog
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
SPackedData
submit
,
STaosxRsp
*
pRsp
,
int32_t
*
totalRows
)
{
STqExecHandle
*
pExec
=
&
pHandle
->
execHandle
;
STqExecHandle
*
pExec
=
&
pHandle
->
execHandle
;
/*A(pExec->subType != TOPIC_SUB_TYPE__COLUMN);*/
SArray
*
pBlocks
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
SArray
*
pBlocks
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
SArray
*
pSchemas
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
SArray
*
pSchemas
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
if
(
pExec
->
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
if
(
pExec
->
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
STqReader
*
pReader
=
pExec
->
pExecReader
;
STqReader
*
pReader
=
pExec
->
pExecReader
;
/*tqReaderSetDataMsg(pReader, pReq, 0);*/
tqReaderSetSubmitReq2
(
pReader
,
submit
.
msgStr
,
submit
.
msgLen
,
submit
.
ver
);
tqReaderSetSubmitReq2
(
pReader
,
submit
.
msgStr
,
submit
.
msgLen
,
submit
.
ver
);
while
(
tqNextDataBlock2
(
pReader
))
{
while
(
tqNextDataBlock2
(
pReader
))
{
/*SSDataBlock block = {0};*/
/*if (tqRetrieveDataBlock(&block, pReader) < 0) {*/
/*if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;*/
/*}*/
taosArrayClear
(
pBlocks
);
taosArrayClear
(
pBlocks
);
taosArrayClear
(
pSchemas
);
taosArrayClear
(
pSchemas
);
SSubmitTbData
*
pSubmitTbDataRet
=
NULL
;
SSubmitTbData
*
pSubmitTbDataRet
=
NULL
;
...
@@ -254,7 +246,6 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
...
@@ -254,7 +246,6 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
if
(
terrno
==
TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND
)
continue
;
if
(
terrno
==
TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND
)
continue
;
}
}
if
(
pRsp
->
withTbName
)
{
if
(
pRsp
->
withTbName
)
{
/*int64_t uid = pExec->pExecReader->msgIter.uid;*/
int64_t
uid
=
pExec
->
pExecReader
->
lastBlkUid
;
int64_t
uid
=
pExec
->
pExecReader
->
lastBlkUid
;
if
(
tqAddTbNameToRsp
(
pTq
,
uid
,
pRsp
,
taosArrayGetSize
(
pBlocks
))
<
0
)
{
if
(
tqAddTbNameToRsp
(
pTq
,
uid
,
pRsp
,
taosArrayGetSize
(
pBlocks
))
<
0
)
{
taosArrayDestroyEx
(
pBlocks
,
(
FDelete
)
blockDataFreeRes
);
taosArrayDestroyEx
(
pBlocks
,
(
FDelete
)
blockDataFreeRes
);
...
@@ -296,6 +287,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
...
@@ -296,6 +287,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
SSDataBlock
*
pBlock
=
taosArrayGet
(
pBlocks
,
i
);
SSDataBlock
*
pBlock
=
taosArrayGet
(
pBlocks
,
i
);
tqAddBlockDataToRsp
(
pBlock
,
(
SMqDataRsp
*
)
pRsp
,
taosArrayGetSize
(
pBlock
->
pDataBlock
),
tqAddBlockDataToRsp
(
pBlock
,
(
SMqDataRsp
*
)
pRsp
,
taosArrayGetSize
(
pBlock
->
pDataBlock
),
pTq
->
pVnode
->
config
.
tsdbCfg
.
precision
);
pTq
->
pVnode
->
config
.
tsdbCfg
.
precision
);
totalRows
+=
pBlock
->
info
.
rows
;
blockDataFreeRes
(
pBlock
);
blockDataFreeRes
(
pBlock
);
SSchemaWrapper
*
pSW
=
taosArrayGetP
(
pSchemas
,
i
);
SSchemaWrapper
*
pSW
=
taosArrayGetP
(
pSchemas
,
i
);
taosArrayPush
(
pRsp
->
blockSchema
,
&
pSW
);
taosArrayPush
(
pRsp
->
blockSchema
,
&
pSW
);
...
@@ -304,13 +296,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
...
@@ -304,13 +296,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
}
}
}
else
if
(
pExec
->
subType
==
TOPIC_SUB_TYPE__DB
)
{
}
else
if
(
pExec
->
subType
==
TOPIC_SUB_TYPE__DB
)
{
STqReader
*
pReader
=
pExec
->
pExecReader
;
STqReader
*
pReader
=
pExec
->
pExecReader
;
/*tqReaderSetDataMsg(pReader, pReq, 0);*/
tqReaderSetSubmitReq2
(
pReader
,
submit
.
msgStr
,
submit
.
msgLen
,
submit
.
ver
);
tqReaderSetSubmitReq2
(
pReader
,
submit
.
msgStr
,
submit
.
msgLen
,
submit
.
ver
);
while
(
tqNextDataBlockFilterOut2
(
pReader
,
pExec
->
execDb
.
pFilterOutTbUid
))
{
while
(
tqNextDataBlockFilterOut2
(
pReader
,
pExec
->
execDb
.
pFilterOutTbUid
))
{
/*SSDataBlock block = {0};*/
/*if (tqRetrieveDataBlock(&block, pReader) < 0) {*/
/*if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;*/
/*}*/
taosArrayClear
(
pBlocks
);
taosArrayClear
(
pBlocks
);
taosArrayClear
(
pSchemas
);
taosArrayClear
(
pSchemas
);
SSubmitTbData
*
pSubmitTbDataRet
=
NULL
;
SSubmitTbData
*
pSubmitTbDataRet
=
NULL
;
...
@@ -355,15 +342,11 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
...
@@ -355,15 +342,11 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
tEncoderClear
(
&
encoder
);
tEncoderClear
(
&
encoder
);
}
}
/*tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock),*/
/*pTq->pVnode->config.tsdbCfg.precision);*/
/*blockDataFreeRes(&block);*/
/*tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp);*/
/*pRsp->blockNum++;*/
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pBlocks
);
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pBlocks
);
i
++
)
{
SSDataBlock
*
pBlock
=
taosArrayGet
(
pBlocks
,
i
);
SSDataBlock
*
pBlock
=
taosArrayGet
(
pBlocks
,
i
);
tqAddBlockDataToRsp
(
pBlock
,
(
SMqDataRsp
*
)
pRsp
,
taosArrayGetSize
(
pBlock
->
pDataBlock
),
tqAddBlockDataToRsp
(
pBlock
,
(
SMqDataRsp
*
)
pRsp
,
taosArrayGetSize
(
pBlock
->
pDataBlock
),
pTq
->
pVnode
->
config
.
tsdbCfg
.
precision
);
pTq
->
pVnode
->
config
.
tsdbCfg
.
precision
);
totalRows
+=
pBlock
->
info
.
rows
;
blockDataFreeRes
(
pBlock
);
blockDataFreeRes
(
pBlock
);
SSchemaWrapper
*
pSW
=
taosArrayGetP
(
pSchemas
,
i
);
SSchemaWrapper
*
pSW
=
taosArrayGetP
(
pSchemas
,
i
);
taosArrayPush
(
pRsp
->
blockSchema
,
&
pSW
);
taosArrayPush
(
pRsp
->
blockSchema
,
&
pSW
);
...
@@ -373,9 +356,5 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
...
@@ -373,9 +356,5 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
}
}
taosArrayDestroy
(
pBlocks
);
taosArrayDestroy
(
pBlocks
);
taosArrayDestroy
(
pSchemas
);
taosArrayDestroy
(
pSchemas
);
// if (pRsp->blockNum == 0) {
// return -1;
// }
return
0
;
return
0
;
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录