Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
080f30d0
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
080f30d0
编写于
7月 11, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat: trim db
上级
aa016b65
变更
12
显示空白变更内容
内联
并排
Showing
12 changed file
with
191 addition
and
24 deletion
+191
-24
include/common/tmsg.h
include/common/tmsg.h
+14
-0
include/common/tmsgdef.h
include/common/tmsgdef.h
+4
-3
source/common/src/tmsg.c
source/common/src/tmsg.c
+50
-0
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
+1
-0
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
+1
-0
source/dnode/mnode/impl/inc/mndPrivilege.h
source/dnode/mnode/impl/inc/mndPrivilege.h
+1
-0
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+75
-2
source/dnode/mnode/impl/src/mndMain.c
source/dnode/mnode/impl/src/mndMain.c
+1
-2
source/dnode/mnode/impl/src/mndPrivilege.c
source/dnode/mnode/impl/src/mndPrivilege.c
+2
-1
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+10
-10
source/dnode/mnode/impl/src/mndVgroup.c
source/dnode/mnode/impl/src/mndVgroup.c
+1
-1
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+31
-5
未找到文件。
include/common/tmsg.h
浏览文件 @
080f30d0
...
...
@@ -822,6 +822,20 @@ typedef struct {
int32_t
tSerializeSTrimDbReq
(
void
*
buf
,
int32_t
bufLen
,
STrimDbReq
*
pReq
);
int32_t
tDeserializeSTrimDbReq
(
void
*
buf
,
int32_t
bufLen
,
STrimDbReq
*
pReq
);
typedef
struct
{
int32_t
timestamp
;
}
SVTrimDbReq
;
int32_t
tSerializeSVTrimDbReq
(
void
*
buf
,
int32_t
bufLen
,
SVTrimDbReq
*
pReq
);
int32_t
tDeserializeSVTrimDbReq
(
void
*
buf
,
int32_t
bufLen
,
SVTrimDbReq
*
pReq
);
typedef
struct
{
int32_t
timestamp
;
}
SVDropTtlTableReq
;
int32_t
tSerializeSVDropTtlTableReq
(
void
*
buf
,
int32_t
bufLen
,
SVDropTtlTableReq
*
pReq
);
int32_t
tDeserializeSVDropTtlTableReq
(
void
*
buf
,
int32_t
bufLen
,
SVDropTtlTableReq
*
pReq
);
typedef
struct
{
int32_t
numOfVgroups
;
int32_t
numOfStables
;
...
...
include/common/tmsgdef.h
浏览文件 @
080f30d0
...
...
@@ -199,9 +199,10 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_VND_ALTER_REPLICA
,
"alter-replica"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_ALTER_CONFIRM
,
"alter-confirm"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_ALTER_HASHRANGE
,
"alter-hashrange"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_COMPACT
,
"compact"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_DROP_TTL_TABLE
,
"drop-ttl-stb"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_COMMIT
,
"commit vnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_COMPACT
,
"vnode-compact"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_DROP_TTL_TABLE
,
"vnode-drop-ttl-stb"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_TRIM
,
"vnode-trim"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_COMMIT
,
"vnode-commit"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MAX_MSG
,
"vnd-max"
,
NULL
,
NULL
)
TD_NEW_MSG_SEG
(
TDMT_SCH_MSG
)
...
...
source/common/src/tmsg.c
浏览文件 @
080f30d0
...
...
@@ -2672,6 +2672,56 @@ int32_t tDeserializeSTrimDbReq(void *buf, int32_t bufLen, STrimDbReq *pReq) {
return
0
;
}
int32_t
tSerializeSVTrimDbReq
(
void
*
buf
,
int32_t
bufLen
,
SVTrimDbReq
*
pReq
)
{
SEncoder
encoder
=
{
0
};
tEncoderInit
(
&
encoder
,
buf
,
bufLen
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
timestamp
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tEncoderClear
(
&
encoder
);
return
tlen
;
}
int32_t
tDeserializeSVTrimDbReq
(
void
*
buf
,
int32_t
bufLen
,
SVTrimDbReq
*
pReq
)
{
SDecoder
decoder
=
{
0
};
tDecoderInit
(
&
decoder
,
buf
,
bufLen
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
timestamp
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
tDecoderClear
(
&
decoder
);
return
0
;
}
int32_t
tSerializeSVDropTtlTableReq
(
void
*
buf
,
int32_t
bufLen
,
SVDropTtlTableReq
*
pReq
)
{
SEncoder
encoder
=
{
0
};
tEncoderInit
(
&
encoder
,
buf
,
bufLen
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
timestamp
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tEncoderClear
(
&
encoder
);
return
tlen
;
}
int32_t
tDeserializeSVDropTtlTableReq
(
void
*
buf
,
int32_t
bufLen
,
SVDropTtlTableReq
*
pReq
)
{
SDecoder
decoder
=
{
0
};
tDecoderInit
(
&
decoder
,
buf
,
bufLen
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
timestamp
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
tDecoderClear
(
&
decoder
);
return
0
;
}
int32_t
tSerializeSDbCfgRsp
(
void
*
buf
,
int32_t
bufLen
,
const
SDbCfgRsp
*
pRsp
)
{
SEncoder
encoder
=
{
0
};
tEncoderInit
(
&
encoder
,
buf
,
bufLen
);
...
...
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
浏览文件 @
080f30d0
...
...
@@ -171,6 +171,7 @@ SArray *mmGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_USE_DB
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_ALTER_DB
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_COMPACT_DB
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_TRIM_DB
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_GET_DB_CFG
,
mmPutMsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_VGROUP_LIST
,
mmPutMsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_REDISTRIBUTE_VGROUP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
...
...
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
浏览文件 @
080f30d0
...
...
@@ -370,6 +370,7 @@ SArray *vmGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_CONFIRM
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_HASHRANGE
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_COMPACT
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_TRIM
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_CREATE_VNODE
,
vmPutMsgToMgmtQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_DROP_VNODE
,
vmPutMsgToMgmtQueue
,
0
)
==
NULL
)
goto
_OVER
;
...
...
source/dnode/mnode/impl/inc/mndPrivilege.h
浏览文件 @
080f30d0
...
...
@@ -54,6 +54,7 @@ typedef enum {
MND_OPER_ALTER_DB
,
MND_OPER_DROP_DB
,
MND_OPER_COMPACT_DB
,
MND_OPER_TRIM_DB
,
MND_OPER_USE_DB
,
MND_OPER_WRITE_DB
,
MND_OPER_READ_DB
,
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
080f30d0
...
...
@@ -42,6 +42,7 @@ static int32_t mndProcessAlterDbReq(SRpcMsg *pReq);
static
int32_t
mndProcessDropDbReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessUseDbReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessCompactDbReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessTrimDbReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndRetrieveDbs
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rowsCapacity
);
static
void
mndCancelGetNextDb
(
SMnode
*
pMnode
,
void
*
pIter
);
static
int32_t
mndProcessGetDbCfgReq
(
SRpcMsg
*
pReq
);
...
...
@@ -62,6 +63,7 @@ int32_t mndInitDb(SMnode *pMnode) {
mndSetMsgHandle
(
pMnode
,
TDMT_MND_DROP_DB
,
mndProcessDropDbReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_USE_DB
,
mndProcessUseDbReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_COMPACT_DB
,
mndProcessCompactDbReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_TRIM_DB
,
mndProcessTrimDbReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_GET_DB_CFG
,
mndProcessGetDbCfgReq
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_DB
,
mndRetrieveDbs
);
...
...
@@ -1268,6 +1270,8 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
return
0
;
}
static
int32_t
mndCompactDb
(
SMnode
*
pMnode
,
SDbObj
*
pDb
)
{
return
0
;
}
static
int32_t
mndProcessCompactDbReq
(
SRpcMsg
*
pReq
)
{
SMnode
*
pMnode
=
pReq
->
info
.
node
;
int32_t
code
=
-
1
;
...
...
@@ -1279,7 +1283,7 @@ static int32_t mndProcessCompactDbReq(SRpcMsg *pReq) {
goto
_OVER
;
}
mDebug
(
"db:%s, start to
sync
"
,
compactReq
.
db
);
mDebug
(
"db:%s, start to
compact
"
,
compactReq
.
db
);
pDb
=
mndAcquireDb
(
pMnode
,
compactReq
.
db
);
if
(
pDb
==
NULL
)
{
...
...
@@ -1290,7 +1294,7 @@ static int32_t mndProcessCompactDbReq(SRpcMsg *pReq) {
goto
_OVER
;
}
// code = mndCompactDb(
);
code
=
mndCompactDb
(
pMnode
,
pDb
);
_OVER:
if
(
code
!=
0
)
{
...
...
@@ -1301,6 +1305,75 @@ _OVER:
return
code
;
}
static
int32_t
mndTrimDb
(
SMnode
*
pMnode
,
SDbObj
*
pDb
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SVgObj
*
pVgroup
=
NULL
;
void
*
pIter
=
NULL
;
SVTrimDbReq
trimReq
=
{.
timestamp
=
taosGetTimestampSec
()};
int32_t
reqLen
=
tSerializeSVTrimDbReq
(
NULL
,
0
,
&
trimReq
);
int32_t
contLen
=
reqLen
+
sizeof
(
SMsgHead
);
while
(
1
)
{
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pIter
,
(
void
**
)
&
pVgroup
);
if
(
pIter
==
NULL
)
break
;
SMsgHead
*
pHead
=
rpcMallocCont
(
contLen
);
if
(
pHead
==
NULL
)
{
sdbCancelFetch
(
pSdb
,
pVgroup
);
sdbRelease
(
pSdb
,
pVgroup
);
continue
;
}
pHead
->
contLen
=
htonl
(
contLen
);
pHead
->
vgId
=
htonl
(
pVgroup
->
vgId
);
tSerializeSVTrimDbReq
((
char
*
)
pHead
+
sizeof
(
SMsgHead
),
contLen
,
&
trimReq
);
SRpcMsg
rpcMsg
=
{.
msgType
=
TDMT_VND_TRIM
,
.
pCont
=
pHead
,
.
contLen
=
contLen
};
SEpSet
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgroup
);
int32_t
code
=
tmsgSendReq
(
&
epSet
,
&
rpcMsg
);
if
(
code
!=
0
)
{
mError
(
"vgId:%d, failed to send vnode-trim request to vnode since 0x%x"
,
pVgroup
->
vgId
,
code
);
}
else
{
mDebug
(
"vgId:%d, send vnode-trim request to vnode, time:%d"
,
pVgroup
->
vgId
,
trimReq
.
timestamp
);
}
sdbRelease
(
pSdb
,
pVgroup
);
}
return
0
;
}
static
int32_t
mndProcessTrimDbReq
(
SRpcMsg
*
pReq
)
{
SMnode
*
pMnode
=
pReq
->
info
.
node
;
int32_t
code
=
-
1
;
SDbObj
*
pDb
=
NULL
;
STrimDbReq
trimReq
=
{
0
};
if
(
tDeserializeSTrimDbReq
(
pReq
->
pCont
,
pReq
->
contLen
,
&
trimReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
goto
_OVER
;
}
mDebug
(
"db:%s, start to trim"
,
trimReq
.
db
);
pDb
=
mndAcquireDb
(
pMnode
,
trimReq
.
db
);
if
(
pDb
==
NULL
)
{
goto
_OVER
;
}
if
(
mndCheckDbPrivilege
(
pMnode
,
pReq
->
info
.
conn
.
user
,
MND_OPER_TRIM_DB
,
pDb
)
!=
0
)
{
goto
_OVER
;
}
code
=
mndTrimDb
(
pMnode
,
pDb
);
_OVER:
if
(
code
!=
0
)
{
mError
(
"db:%s, failed to process trim db req since %s"
,
trimReq
.
db
,
terrstr
());
}
mndReleaseDb
(
pMnode
,
pDb
);
return
code
;
}
const
char
*
mndGetDbStr
(
const
char
*
src
)
{
char
*
pos
=
strstr
(
src
,
TS_PATH_DELIMITER
);
if
(
pos
!=
NULL
)
++
pos
;
...
...
source/dnode/mnode/impl/src/mndMain.c
浏览文件 @
080f30d0
...
...
@@ -531,8 +531,7 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
if
(
!
IsReq
(
pMsg
))
return
0
;
if
(
pMsg
->
msgType
==
TDMT_SCH_QUERY
||
pMsg
->
msgType
==
TDMT_SCH_MERGE_QUERY
||
pMsg
->
msgType
==
TDMT_SCH_QUERY_CONTINUE
||
pMsg
->
msgType
==
TDMT_SCH_QUERY_HEARTBEAT
||
pMsg
->
msgType
==
TDMT_SCH_FETCH
||
pMsg
->
msgType
==
TDMT_SCH_MERGE_FETCH
||
pMsg
->
msgType
==
TDMT_SCH_DROP_TASK
)
{
pMsg
->
msgType
==
TDMT_SCH_FETCH
||
pMsg
->
msgType
==
TDMT_SCH_MERGE_FETCH
||
pMsg
->
msgType
==
TDMT_SCH_DROP_TASK
)
{
return
0
;
}
if
(
mndAcquireRpcRef
(
pMsg
->
info
.
node
)
==
0
)
return
0
;
...
...
source/dnode/mnode/impl/src/mndPrivilege.c
浏览文件 @
080f30d0
...
...
@@ -155,7 +155,8 @@ int32_t mndCheckDbPrivilege(SMnode *pMnode, const char *user, EOperType operType
if
(
pUser
->
sysInfo
)
goto
_OVER
;
}
if
(
operType
==
MND_OPER_ALTER_DB
||
operType
==
MND_OPER_DROP_DB
||
operType
==
MND_OPER_COMPACT_DB
)
{
if
(
operType
==
MND_OPER_ALTER_DB
||
operType
==
MND_OPER_DROP_DB
||
operType
==
MND_OPER_COMPACT_DB
||
operType
==
MND_OPER_TRIM_DB
)
{
if
(
strcmp
(
pUser
->
user
,
pDb
->
createUser
)
==
0
&&
pUser
->
sysInfo
)
goto
_OVER
;
}
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
080f30d0
...
...
@@ -817,12 +817,14 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SVgObj
*
pVgroup
=
NULL
;
void
*
pIter
=
NULL
;
SVDropTtlTableReq
ttlReq
=
{.
timestamp
=
taosGetTimestampSec
()};
int32_t
reqLen
=
tSerializeSVDropTtlTableReq
(
NULL
,
0
,
&
ttlReq
);
int32_t
contLen
=
reqLen
+
sizeof
(
SMsgHead
);
while
(
1
)
{
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pIter
,
(
void
**
)
&
pVgroup
);
if
(
pIter
==
NULL
)
break
;
int32_t
contLen
=
sizeof
(
SMsgHead
)
+
sizeof
(
int32_t
);
SMsgHead
*
pHead
=
rpcMallocCont
(
contLen
);
if
(
pHead
==
NULL
)
{
sdbCancelFetch
(
pSdb
,
pVgroup
);
...
...
@@ -831,17 +833,15 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
}
pHead
->
contLen
=
htonl
(
contLen
);
pHead
->
vgId
=
htonl
(
pVgroup
->
vgId
);
int32_t
t
=
taosGetTimestampSec
();
*
(
int32_t
*
)((
char
*
)
pHead
+
sizeof
(
SMsgHead
))
=
htonl
(
t
);
tSerializeSVDropTtlTableReq
((
char
*
)
pHead
+
sizeof
(
SMsgHead
),
contLen
,
&
ttlReq
);
SRpcMsg
rpcMsg
=
{.
msgType
=
TDMT_VND_DROP_TTL_TABLE
,
.
pCont
=
pHead
,
.
contLen
=
contLen
};
SEpSet
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgroup
);
int32_t
code
=
tmsgSendReq
(
&
epSet
,
&
rpcMsg
);
if
(
code
!=
0
)
{
mError
(
"
failed to send ttl time seed, code:0x%x"
,
code
);
mError
(
"
vgId:%d, failed to send drop ttl table request to vnode since 0x%x"
,
pVgroup
->
vgId
,
code
);
}
else
{
mDebug
(
"
send ttl time seed success, time:%d"
,
t
);
mDebug
(
"
vgId:%d, send drop ttl table request to vnode, time:%d"
,
pVgroup
->
vgId
,
ttlReq
.
timestamp
);
}
sdbRelease
(
pSdb
,
pVgroup
);
}
...
...
source/dnode/mnode/impl/src/mndVgroup.c
浏览文件 @
080f30d0
...
...
@@ -307,7 +307,7 @@ void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
contLen
+=
+
sizeof
(
SMsgHead
);
contLen
+=
sizeof
(
SMsgHead
);
void
*
pReq
=
taosMemoryMalloc
(
contLen
);
if
(
pReq
==
NULL
)
{
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
080f30d0
...
...
@@ -26,6 +26,7 @@ static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *
static
int32_t
vnodeProcessAlterConfirmReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
);
static
int32_t
vnodeProcessAlterHasnRangeReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
);
static
int32_t
vnodeProcessDropTtlTbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
);
static
int32_t
vnodeProcessTrimReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
);
static
int32_t
vnodeProcessDeleteReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
);
int32_t
vnodePreProcessWriteMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
)
{
...
...
@@ -172,9 +173,12 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
case
TDMT_VND_DROP_TTL_TABLE
:
if
(
vnodeProcessDropTtlTbReq
(
pVnode
,
version
,
pReq
,
len
,
pRsp
)
<
0
)
goto
_err
;
break
;
case
TDMT_VND_CREATE_SMA
:
{
case
TDMT_VND_TRIM
:
if
(
vnodeProcessTrimReq
(
pVnode
,
version
,
pReq
,
len
,
pRsp
)
<
0
)
goto
_err
;
break
;
case
TDMT_VND_CREATE_SMA
:
if
(
vnodeProcessCreateTSmaReq
(
pVnode
,
version
,
pReq
,
len
,
pRsp
)
<
0
)
goto
_err
;
}
break
;
break
;
/* TSDB */
case
TDMT_VND_SUBMIT
:
if
(
vnodeProcessSubmitReq
(
pVnode
,
version
,
pMsg
->
pCont
,
pMsg
->
contLen
,
pRsp
)
<
0
)
goto
_err
;
...
...
@@ -345,13 +349,35 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
pMetaRsp
->
precision
=
pVnode
->
config
.
tsdbCfg
.
precision
;
}
static
int32_t
vnodeProcessTrimReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
)
{
SVTrimDbReq
trimReq
=
{
0
};
if
(
tDeserializeSVTrimDbReq
(
pReq
,
len
,
&
trimReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
goto
end
;
}
vInfo
(
"vgId:%d, trim vnode request will be processed, time:%d"
,
pVnode
->
config
.
vgId
,
trimReq
.
timestamp
);
int32_t
ret
=
0
;
if
(
ret
!=
0
)
{
goto
end
;
}
end:
return
ret
;
}
static
int32_t
vnodeProcessDropTtlTbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
)
{
SArray
*
tbUids
=
taosArrayInit
(
8
,
sizeof
(
int64_t
));
if
(
tbUids
==
NULL
)
return
TSDB_CODE_OUT_OF_MEMORY
;
int32_t
t
=
ntohl
(
*
(
int32_t
*
)
pReq
);
vDebug
(
"rec ttl time:%d"
,
t
);
int32_t
ret
=
metaTtlDropTable
(
pVnode
->
pMeta
,
t
,
tbUids
);
SVDropTtlTableReq
ttlReq
=
{
0
};
if
(
tDeserializeSVDropTtlTableReq
(
pReq
,
len
,
&
ttlReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
goto
end
;
}
vInfo
(
"vgId:%d, drop ttl table req will be processed, time:%d"
,
pVnode
->
config
.
vgId
,
ttlReq
.
timestamp
);
int32_t
ret
=
metaTtlDropTable
(
pVnode
->
pMeta
,
ttlReq
.
timestamp
,
tbUids
);
if
(
ret
!=
0
)
{
goto
end
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录