Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4f81a338
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
4f81a338
编写于
2月 14, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
extract tqRead out of tq
上级
8d243f77
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
166 addition
and
165 deletion
+166
-165
source/dnode/vnode/inc/tq.h
source/dnode/vnode/inc/tq.h
+1
-1
source/dnode/vnode/src/inc/tqInt.h
source/dnode/vnode/src/inc/tqInt.h
+1
-0
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+2
-145
source/dnode/vnode/src/tq/tqRead.c
source/dnode/vnode/src/tq/tqRead.c
+157
-0
source/dnode/vnode/src/vnd/vnodeWrite.c
source/dnode/vnode/src/vnd/vnodeWrite.c
+5
-19
未找到文件。
source/dnode/vnode/inc/tq.h
浏览文件 @
4f81a338
...
@@ -38,7 +38,7 @@ extern "C" {
...
@@ -38,7 +38,7 @@ extern "C" {
typedef
struct
STQ
STQ
;
typedef
struct
STQ
STQ
;
// memory allocator provided by vnode
// memory allocator provided by vnode
typedef
struct
STqMemRef
{
typedef
struct
{
SMemAllocatorFactory
*
pAllocatorFactory
;
SMemAllocatorFactory
*
pAllocatorFactory
;
SMemAllocator
*
pAllocator
;
SMemAllocator
*
pAllocator
;
}
STqMemRef
;
}
STqMemRef
;
...
...
source/dnode/vnode/src/inc/tqInt.h
浏览文件 @
4f81a338
...
@@ -20,6 +20,7 @@
...
@@ -20,6 +20,7 @@
#include "tlog.h"
#include "tlog.h"
#include "tq.h"
#include "tq.h"
#include "trpc.h"
#include "trpc.h"
#ifdef __cplusplus
#ifdef __cplusplus
extern
"C"
{
extern
"C"
{
#endif
#endif
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
4f81a338
...
@@ -268,15 +268,12 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
...
@@ -268,15 +268,12 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
}
}
void
*
abuf
=
buf
;
void
*
abuf
=
buf
;
tEncodeSMqConsumeRsp
(
&
abuf
,
&
rsp
);
tEncodeSMqConsumeRsp
(
&
abuf
,
&
rsp
);
if
(
rsp
.
pBlockData
)
{
if
(
rsp
.
pBlockData
)
{
taosArrayDestroyEx
(
rsp
.
pBlockData
,
(
void
(
*
)(
void
*
))
tDeleteSSDataBlock
);
taosArrayDestroyEx
(
rsp
.
pBlockData
,
(
void
(
*
)(
void
*
))
tDeleteSSDataBlock
);
rsp
.
pBlockData
=
NULL
;
rsp
.
pBlockData
=
NULL
;
/*for (int i = 0; i < taosArrayGetSize(rsp.pBlockData); i++) {*/
/*SSDataBlock* pBlock = taosArrayGet(rsp.pBlockData, i);*/
/*tDeleteSSDataBlock(pBlock);*/
/*}*/
/*taosArrayDestroy(rsp.pBlockData);*/
}
}
pMsg
->
pCont
=
buf
;
pMsg
->
pCont
=
buf
;
pMsg
->
contLen
=
tlen
;
pMsg
->
contLen
=
tlen
;
pMsg
->
code
=
0
;
pMsg
->
code
=
0
;
...
@@ -344,143 +341,3 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
...
@@ -344,143 +341,3 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
terrno
=
TSDB_CODE_SUCCESS
;
terrno
=
TSDB_CODE_SUCCESS
;
return
0
;
return
0
;
}
}
STqReadHandle
*
tqInitSubmitMsgScanner
(
SMeta
*
pMeta
)
{
STqReadHandle
*
pReadHandle
=
malloc
(
sizeof
(
STqReadHandle
));
if
(
pReadHandle
==
NULL
)
{
return
NULL
;
}
pReadHandle
->
pVnodeMeta
=
pMeta
;
pReadHandle
->
pMsg
=
NULL
;
pReadHandle
->
ver
=
-
1
;
pReadHandle
->
pColIdList
=
NULL
;
pReadHandle
->
sver
=
-
1
;
pReadHandle
->
pSchema
=
NULL
;
pReadHandle
->
pSchemaWrapper
=
NULL
;
return
pReadHandle
;
}
void
tqReadHandleSetMsg
(
STqReadHandle
*
pReadHandle
,
SSubmitMsg
*
pMsg
,
int64_t
ver
)
{
pReadHandle
->
pMsg
=
pMsg
;
pMsg
->
length
=
htonl
(
pMsg
->
length
);
pMsg
->
numOfBlocks
=
htonl
(
pMsg
->
numOfBlocks
);
tInitSubmitMsgIter
(
pMsg
,
&
pReadHandle
->
msgIter
);
pReadHandle
->
ver
=
ver
;
memset
(
&
pReadHandle
->
blkIter
,
0
,
sizeof
(
SSubmitBlkIter
));
}
bool
tqNextDataBlock
(
STqReadHandle
*
pHandle
)
{
while
(
1
)
{
if
(
tGetSubmitMsgNext
(
&
pHandle
->
msgIter
,
&
pHandle
->
pBlock
)
<
0
)
{
return
false
;
}
if
(
pHandle
->
pBlock
==
NULL
)
return
false
;
pHandle
->
pBlock
->
uid
=
htobe64
(
pHandle
->
pBlock
->
uid
);
/*if (pHandle->tbUid == pHandle->pBlock->uid) {*/
ASSERT
(
pHandle
->
tbIdHash
);
void
*
ret
=
taosHashGet
(
pHandle
->
tbIdHash
,
&
pHandle
->
pBlock
->
uid
,
sizeof
(
int64_t
));
if
(
ret
!=
NULL
)
{
pHandle
->
pBlock
->
tid
=
htonl
(
pHandle
->
pBlock
->
tid
);
pHandle
->
pBlock
->
sversion
=
htonl
(
pHandle
->
pBlock
->
sversion
);
pHandle
->
pBlock
->
dataLen
=
htonl
(
pHandle
->
pBlock
->
dataLen
);
pHandle
->
pBlock
->
schemaLen
=
htonl
(
pHandle
->
pBlock
->
schemaLen
);
pHandle
->
pBlock
->
numOfRows
=
htons
(
pHandle
->
pBlock
->
numOfRows
);
return
true
;
}
}
return
false
;
}
int
tqRetrieveDataBlockInfo
(
STqReadHandle
*
pHandle
,
SDataBlockInfo
*
pBlockInfo
)
{
/*int32_t sversion = pHandle->pBlock->sversion;*/
/*SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false);*/
pBlockInfo
->
numOfCols
=
taosArrayGetSize
(
pHandle
->
pColIdList
);
pBlockInfo
->
rows
=
pHandle
->
pBlock
->
numOfRows
;
pBlockInfo
->
uid
=
pHandle
->
pBlock
->
uid
;
return
0
;
}
SArray
*
tqRetrieveDataBlock
(
STqReadHandle
*
pHandle
)
{
/*int32_t sversion = pHandle->pBlock->sversion;*/
// TODO set to real sversion
int32_t
sversion
=
0
;
if
(
pHandle
->
sver
!=
sversion
)
{
pHandle
->
pSchema
=
metaGetTbTSchema
(
pHandle
->
pVnodeMeta
,
pHandle
->
pBlock
->
uid
,
sversion
);
tb_uid_t
quid
;
STbCfg
*
pTbCfg
=
metaGetTbInfoByUid
(
pHandle
->
pVnodeMeta
,
pHandle
->
pBlock
->
uid
);
if
(
pTbCfg
->
type
==
META_CHILD_TABLE
)
{
quid
=
pTbCfg
->
ctbCfg
.
suid
;
}
else
{
quid
=
pHandle
->
pBlock
->
uid
;
}
pHandle
->
pSchemaWrapper
=
metaGetTableSchema
(
pHandle
->
pVnodeMeta
,
quid
,
sversion
,
true
);
pHandle
->
sver
=
sversion
;
}
STSchema
*
pTschema
=
pHandle
->
pSchema
;
SSchemaWrapper
*
pSchemaWrapper
=
pHandle
->
pSchemaWrapper
;
int32_t
numOfRows
=
pHandle
->
pBlock
->
numOfRows
;
int32_t
numOfCols
=
pHandle
->
pSchema
->
numOfCols
;
int32_t
colNumNeed
=
taosArrayGetSize
(
pHandle
->
pColIdList
);
// TODO: stable case
if
(
colNumNeed
>
pSchemaWrapper
->
nCols
)
{
colNumNeed
=
pSchemaWrapper
->
nCols
;
}
SArray
*
pArray
=
taosArrayInit
(
colNumNeed
,
sizeof
(
SColumnInfoData
));
if
(
pArray
==
NULL
)
{
return
NULL
;
}
int
j
=
0
;
for
(
int32_t
i
=
0
;
i
<
colNumNeed
;
i
++
)
{
int32_t
colId
=
*
(
int32_t
*
)
taosArrayGet
(
pHandle
->
pColIdList
,
i
);
while
(
j
<
pSchemaWrapper
->
nCols
&&
pSchemaWrapper
->
pSchema
[
j
].
colId
<
colId
)
{
j
++
;
}
SSchema
*
pColSchema
=
&
pSchemaWrapper
->
pSchema
[
j
];
SColumnInfoData
colInfo
=
{
0
};
int
sz
=
numOfRows
*
pColSchema
->
bytes
;
colInfo
.
info
.
bytes
=
pColSchema
->
bytes
;
colInfo
.
info
.
colId
=
colId
;
colInfo
.
info
.
type
=
pColSchema
->
type
;
colInfo
.
pData
=
calloc
(
1
,
sz
);
if
(
colInfo
.
pData
==
NULL
)
{
// TODO free
taosArrayDestroy
(
pArray
);
return
NULL
;
}
taosArrayPush
(
pArray
,
&
colInfo
);
}
STSRowIter
iter
=
{
0
};
tdSTSRowIterInit
(
&
iter
,
pTschema
);
STSRow
*
row
;
// int32_t kvIdx = 0;
int32_t
curRow
=
0
;
tInitSubmitBlkIter
(
pHandle
->
pBlock
,
&
pHandle
->
blkIter
);
while
((
row
=
tGetSubmitBlkNext
(
&
pHandle
->
blkIter
))
!=
NULL
)
{
tdSTSRowIterReset
(
&
iter
,
row
);
// get all wanted col of that block
for
(
int32_t
i
=
0
;
i
<
colNumNeed
;
i
++
)
{
SColumnInfoData
*
pColData
=
taosArrayGet
(
pArray
,
i
);
STColumn
*
pCol
=
schemaColAt
(
pTschema
,
i
);
// TODO
ASSERT
(
pCol
->
colId
==
pColData
->
info
.
colId
);
// void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx);
SCellVal
sVal
=
{
0
};
if
(
!
tdSTSRowIterNext
(
&
iter
,
pCol
->
colId
,
pCol
->
type
,
&
sVal
))
{
// TODO: reach end
break
;
}
memcpy
(
POINTER_SHIFT
(
pColData
->
pData
,
curRow
*
pCol
->
bytes
),
sVal
.
val
,
pCol
->
bytes
);
}
curRow
++
;
}
return
pArray
;
}
source/dnode/vnode/src/tq/tqRead.c
0 → 100644
浏览文件 @
4f81a338
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "tqInt.h"
STqReadHandle
*
tqInitSubmitMsgScanner
(
SMeta
*
pMeta
)
{
STqReadHandle
*
pReadHandle
=
malloc
(
sizeof
(
STqReadHandle
));
if
(
pReadHandle
==
NULL
)
{
return
NULL
;
}
pReadHandle
->
pVnodeMeta
=
pMeta
;
pReadHandle
->
pMsg
=
NULL
;
pReadHandle
->
ver
=
-
1
;
pReadHandle
->
pColIdList
=
NULL
;
pReadHandle
->
sver
=
-
1
;
pReadHandle
->
pSchema
=
NULL
;
pReadHandle
->
pSchemaWrapper
=
NULL
;
return
pReadHandle
;
}
void
tqReadHandleSetMsg
(
STqReadHandle
*
pReadHandle
,
SSubmitMsg
*
pMsg
,
int64_t
ver
)
{
pReadHandle
->
pMsg
=
pMsg
;
pMsg
->
length
=
htonl
(
pMsg
->
length
);
pMsg
->
numOfBlocks
=
htonl
(
pMsg
->
numOfBlocks
);
tInitSubmitMsgIter
(
pMsg
,
&
pReadHandle
->
msgIter
);
pReadHandle
->
ver
=
ver
;
memset
(
&
pReadHandle
->
blkIter
,
0
,
sizeof
(
SSubmitBlkIter
));
}
bool
tqNextDataBlock
(
STqReadHandle
*
pHandle
)
{
while
(
1
)
{
if
(
tGetSubmitMsgNext
(
&
pHandle
->
msgIter
,
&
pHandle
->
pBlock
)
<
0
)
{
return
false
;
}
if
(
pHandle
->
pBlock
==
NULL
)
return
false
;
pHandle
->
pBlock
->
uid
=
htobe64
(
pHandle
->
pBlock
->
uid
);
/*if (pHandle->tbUid == pHandle->pBlock->uid) {*/
ASSERT
(
pHandle
->
tbIdHash
);
void
*
ret
=
taosHashGet
(
pHandle
->
tbIdHash
,
&
pHandle
->
pBlock
->
uid
,
sizeof
(
int64_t
));
if
(
ret
!=
NULL
)
{
pHandle
->
pBlock
->
tid
=
htonl
(
pHandle
->
pBlock
->
tid
);
pHandle
->
pBlock
->
sversion
=
htonl
(
pHandle
->
pBlock
->
sversion
);
pHandle
->
pBlock
->
dataLen
=
htonl
(
pHandle
->
pBlock
->
dataLen
);
pHandle
->
pBlock
->
schemaLen
=
htonl
(
pHandle
->
pBlock
->
schemaLen
);
pHandle
->
pBlock
->
numOfRows
=
htons
(
pHandle
->
pBlock
->
numOfRows
);
return
true
;
}
}
return
false
;
}
int
tqRetrieveDataBlockInfo
(
STqReadHandle
*
pHandle
,
SDataBlockInfo
*
pBlockInfo
)
{
/*int32_t sversion = pHandle->pBlock->sversion;*/
/*SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false);*/
pBlockInfo
->
numOfCols
=
taosArrayGetSize
(
pHandle
->
pColIdList
);
pBlockInfo
->
rows
=
pHandle
->
pBlock
->
numOfRows
;
pBlockInfo
->
uid
=
pHandle
->
pBlock
->
uid
;
return
0
;
}
SArray
*
tqRetrieveDataBlock
(
STqReadHandle
*
pHandle
)
{
/*int32_t sversion = pHandle->pBlock->sversion;*/
// TODO set to real sversion
int32_t
sversion
=
0
;
if
(
pHandle
->
sver
!=
sversion
)
{
pHandle
->
pSchema
=
metaGetTbTSchema
(
pHandle
->
pVnodeMeta
,
pHandle
->
pBlock
->
uid
,
sversion
);
tb_uid_t
quid
;
STbCfg
*
pTbCfg
=
metaGetTbInfoByUid
(
pHandle
->
pVnodeMeta
,
pHandle
->
pBlock
->
uid
);
if
(
pTbCfg
->
type
==
META_CHILD_TABLE
)
{
quid
=
pTbCfg
->
ctbCfg
.
suid
;
}
else
{
quid
=
pHandle
->
pBlock
->
uid
;
}
pHandle
->
pSchemaWrapper
=
metaGetTableSchema
(
pHandle
->
pVnodeMeta
,
quid
,
sversion
,
true
);
pHandle
->
sver
=
sversion
;
}
STSchema
*
pTschema
=
pHandle
->
pSchema
;
SSchemaWrapper
*
pSchemaWrapper
=
pHandle
->
pSchemaWrapper
;
int32_t
numOfRows
=
pHandle
->
pBlock
->
numOfRows
;
int32_t
numOfCols
=
pHandle
->
pSchema
->
numOfCols
;
int32_t
colNumNeed
=
taosArrayGetSize
(
pHandle
->
pColIdList
);
// TODO: stable case
if
(
colNumNeed
>
pSchemaWrapper
->
nCols
)
{
colNumNeed
=
pSchemaWrapper
->
nCols
;
}
SArray
*
pArray
=
taosArrayInit
(
colNumNeed
,
sizeof
(
SColumnInfoData
));
if
(
pArray
==
NULL
)
{
return
NULL
;
}
int
j
=
0
;
for
(
int32_t
i
=
0
;
i
<
colNumNeed
;
i
++
)
{
int32_t
colId
=
*
(
int32_t
*
)
taosArrayGet
(
pHandle
->
pColIdList
,
i
);
while
(
j
<
pSchemaWrapper
->
nCols
&&
pSchemaWrapper
->
pSchema
[
j
].
colId
<
colId
)
{
j
++
;
}
SSchema
*
pColSchema
=
&
pSchemaWrapper
->
pSchema
[
j
];
SColumnInfoData
colInfo
=
{
0
};
int
sz
=
numOfRows
*
pColSchema
->
bytes
;
colInfo
.
info
.
bytes
=
pColSchema
->
bytes
;
colInfo
.
info
.
colId
=
colId
;
colInfo
.
info
.
type
=
pColSchema
->
type
;
colInfo
.
pData
=
calloc
(
1
,
sz
);
if
(
colInfo
.
pData
==
NULL
)
{
// TODO free
taosArrayDestroy
(
pArray
);
return
NULL
;
}
taosArrayPush
(
pArray
,
&
colInfo
);
}
STSRowIter
iter
=
{
0
};
tdSTSRowIterInit
(
&
iter
,
pTschema
);
STSRow
*
row
;
// int32_t kvIdx = 0;
int32_t
curRow
=
0
;
tInitSubmitBlkIter
(
pHandle
->
pBlock
,
&
pHandle
->
blkIter
);
while
((
row
=
tGetSubmitBlkNext
(
&
pHandle
->
blkIter
))
!=
NULL
)
{
tdSTSRowIterReset
(
&
iter
,
row
);
// get all wanted col of that block
for
(
int32_t
i
=
0
;
i
<
colNumNeed
;
i
++
)
{
SColumnInfoData
*
pColData
=
taosArrayGet
(
pArray
,
i
);
STColumn
*
pCol
=
schemaColAt
(
pTschema
,
i
);
// TODO
ASSERT
(
pCol
->
colId
==
pColData
->
info
.
colId
);
// void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx);
SCellVal
sVal
=
{
0
};
if
(
!
tdSTSRowIterNext
(
&
iter
,
pCol
->
colId
,
pCol
->
type
,
&
sVal
))
{
// TODO: reach end
break
;
}
memcpy
(
POINTER_SHIFT
(
pColData
->
pData
,
curRow
*
pCol
->
bytes
),
sVal
.
val
,
pCol
->
bytes
);
}
curRow
++
;
}
return
pArray
;
}
source/dnode/vnode/src/vnd/vnodeWrite.c
浏览文件 @
4f81a338
...
@@ -16,19 +16,6 @@
...
@@ -16,19 +16,6 @@
#include "tq.h"
#include "tq.h"
#include "vnd.h"
#include "vnd.h"
#if 0
int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) {
switch (pMsg->msgType) {
case TDMT_VND_MQ_SET_CUR:
if (tqSetCursor(pVnode->pTq, pMsg->pCont) < 0) {
// TODO: handle error
}
break;
}
return 0;
}
#endif
int
vnodeProcessWMsgs
(
SVnode
*
pVnode
,
SArray
*
pMsgs
)
{
int
vnodeProcessWMsgs
(
SVnode
*
pVnode
,
SArray
*
pMsgs
)
{
SRpcMsg
*
pMsg
;
SRpcMsg
*
pMsg
;
...
@@ -36,9 +23,9 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
...
@@ -36,9 +23,9 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
pMsg
=
*
(
SRpcMsg
**
)
taosArrayGet
(
pMsgs
,
i
);
pMsg
=
*
(
SRpcMsg
**
)
taosArrayGet
(
pMsgs
,
i
);
// ser request version
// ser request version
void
*
pBuf
=
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
));
void
*
pBuf
=
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
));
int64_t
ver
=
pVnode
->
state
.
processed
++
;
int64_t
ver
=
pVnode
->
state
.
processed
++
;
taosEncodeFixed
U
64
(
&
pBuf
,
ver
);
taosEncodeFixed
I
64
(
&
pBuf
,
ver
);
if
(
walWrite
(
pVnode
->
pWal
,
ver
,
pMsg
->
msgType
,
pMsg
->
pCont
,
pMsg
->
contLen
)
<
0
)
{
if
(
walWrite
(
pVnode
->
pWal
,
ver
,
pMsg
->
msgType
,
pMsg
->
pCont
,
pMsg
->
contLen
)
<
0
)
{
// TODO: handle error
// TODO: handle error
...
@@ -55,7 +42,7 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
...
@@ -55,7 +42,7 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
int
vnodeApplyWMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
)
{
int
vnodeApplyWMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
)
{
SVCreateTbReq
vCreateTbReq
;
SVCreateTbReq
vCreateTbReq
;
SVCreateTbBatchReq
vCreateTbBatchReq
;
SVCreateTbBatchReq
vCreateTbBatchReq
;
void
*
ptr
=
vnodeMalloc
(
pVnode
,
pMsg
->
contLen
);
void
*
ptr
=
vnodeMalloc
(
pVnode
,
pMsg
->
contLen
);
if
(
ptr
==
NULL
)
{
if
(
ptr
==
NULL
)
{
// TODO: handle error
// TODO: handle error
}
}
...
@@ -64,8 +51,8 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
...
@@ -64,8 +51,8 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
memcpy
(
ptr
,
pMsg
->
pCont
,
pMsg
->
contLen
);
memcpy
(
ptr
,
pMsg
->
pCont
,
pMsg
->
contLen
);
// todo: change the interface here
// todo: change the interface here
u
int64_t
ver
;
int64_t
ver
;
taosDecodeFixed
U
64
(
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
&
ver
);
taosDecodeFixed
I
64
(
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
&
ver
);
if
(
tqPushMsg
(
pVnode
->
pTq
,
ptr
,
ver
)
<
0
)
{
if
(
tqPushMsg
(
pVnode
->
pTq
,
ptr
,
ver
)
<
0
)
{
// TODO: handle error
// TODO: handle error
}
}
...
@@ -132,7 +119,6 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
...
@@ -132,7 +119,6 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
}
break
;
}
break
;
case
TDMT_VND_MQ_REB
:
{
case
TDMT_VND_MQ_REB
:
{
if
(
tqProcessRebReq
(
pVnode
->
pTq
,
POINTER_SHIFT
(
ptr
,
sizeof
(
SMsgHead
)))
<
0
)
{
if
(
tqProcessRebReq
(
pVnode
->
pTq
,
POINTER_SHIFT
(
ptr
,
sizeof
(
SMsgHead
)))
<
0
)
{
}
}
}
break
;
}
break
;
default:
default:
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录