Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
7b4bb723
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
7b4bb723
编写于
8月 18, 2023
作者:
M
Minglei Jin
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix(vnode/destroy): delete objects
上级
a412e9f0
变更
6
显示空白变更内容
内联
并排
Showing
6 changed file
with
114 addition
and
91 deletion
+114
-91
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
+51
-54
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
+1
-1
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+26
-25
source/dnode/vnode/src/inc/vndCos.h
source/dnode/vnode/src/inc/vndCos.h
+1
-0
source/dnode/vnode/src/vnd/vnodeCos.c
source/dnode/vnode/src/vnd/vnodeCos.c
+20
-0
source/dnode/vnode/src/vnd/vnodeOpen.c
source/dnode/vnode/src/vnd/vnodeOpen.c
+15
-11
未找到文件。
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
浏览文件 @
7b4bb723
...
...
@@ -144,7 +144,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
tmsgUpdateDnodeInfo
(
&
pNode
->
nodeId
,
&
pNode
->
clusterId
,
pNode
->
nodeFqdn
,
&
pNode
->
nodePort
);
pCfg
->
syncCfg
.
replicaNum
++
;
}
if
(
pCreate
->
selfIndex
!=
-
1
)
{
if
(
pCreate
->
selfIndex
!=
-
1
)
{
pCfg
->
syncCfg
.
myIndex
=
pCreate
->
selfIndex
;
}
for
(
int32_t
i
=
pCfg
->
syncCfg
.
replicaNum
;
i
<
pCreate
->
replica
+
pCreate
->
learnerReplica
;
++
i
)
{
...
...
@@ -157,7 +157,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg
->
syncCfg
.
totalReplicaNum
++
;
}
pCfg
->
syncCfg
.
totalReplicaNum
+=
pCfg
->
syncCfg
.
replicaNum
;
if
(
pCreate
->
learnerSelfIndex
!=
-
1
)
{
if
(
pCreate
->
learnerSelfIndex
!=
-
1
)
{
pCfg
->
syncCfg
.
myIndex
=
pCreate
->
replica
+
pCreate
->
learnerSelfIndex
;
}
}
...
...
@@ -201,24 +201,24 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return
-
1
;
}
if
(
req
.
learnerReplica
==
0
)
{
if
(
req
.
learnerReplica
==
0
)
{
req
.
learnerSelfIndex
=
-
1
;
}
dInfo
(
"vgId:%d, vnode management handle msgType:%s, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d szBuf:%"
PRIu64
", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%"
PRId64
dInfo
(
"vgId:%d, vnode management handle msgType:%s, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d "
"szBuf:%"
PRIu64
", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%"
PRId64
", days:%d keep0:%d keep1:%d keep2:%d tsma:%d precision:%d compression:%d minRows:%d maxRows:%d"
", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%"
PRId64
" rollPeriod:%d segSize:%"
PRId64
", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d "
"learnerReplica:%d learnerSelfIndex:%d strict:%d"
,
req
.
vgId
,
TMSG_INFO
(
pMsg
->
msgType
),
req
.
pages
,
req
.
pageSize
,
req
.
buffer
,
req
.
pageSize
*
1024
,
(
uint64_t
)
req
.
buffer
*
1024
*
1024
,
req
.
cacheLast
,
req
.
cacheLastSize
,
req
.
sstTrigger
,
req
.
tsdbPageSize
,
req
.
tsdbPageSize
*
1024
,
req
.
db
,
req
.
dbUid
,
req
.
daysPerFile
,
req
.
daysToKeep0
,
req
.
daysToKeep1
,
req
.
daysToKeep2
,
req
.
isTsma
,
req
.
precision
,
req
.
compression
,
req
.
minRows
,
req
.
maxRows
,
req
.
walFsyncPeriod
,
req
.
walLevel
,
req
.
walRetentionPeriod
,
req
.
walRetentionSize
,
req
.
walRollPeriod
,
req
.
walSegmentSize
,
req
.
hashMethod
,
req
.
hashBegin
,
req
.
hashEnd
,
req
.
hashPrefix
,
req
.
hashSuffix
,
req
.
replica
,
req
.
selfIndex
,
req
.
learnerReplica
,
req
.
learnerSelfIndex
,
req
.
strict
);
(
uint64_t
)
req
.
buffer
*
1024
*
1024
,
req
.
cacheLast
,
req
.
cacheLastSize
,
req
.
sstTrigger
,
req
.
tsdbPageSize
,
req
.
tsdbPageSize
*
1024
,
req
.
db
,
req
.
dbUid
,
req
.
daysPerFile
,
req
.
daysToKeep0
,
req
.
daysToKeep1
,
req
.
daysToKeep2
,
req
.
isTsma
,
req
.
precision
,
req
.
compression
,
req
.
minRows
,
req
.
maxRows
,
req
.
walFsyncPeriod
,
req
.
walLevel
,
req
.
walRetentionPeriod
,
req
.
walRetentionSize
,
req
.
walRollPeriod
,
req
.
walSegmentSize
,
req
.
hashMethod
,
req
.
hashBegin
,
req
.
hashEnd
,
req
.
hashPrefix
,
req
.
hashSuffix
,
req
.
replica
,
req
.
selfIndex
,
req
.
learnerReplica
,
req
.
learnerSelfIndex
,
req
.
strict
);
for
(
int32_t
i
=
0
;
i
<
req
.
replica
;
++
i
)
{
dInfo
(
"vgId:%d, replica:%d ep:%s:%u dnode:%d"
,
req
.
vgId
,
i
,
req
.
replicas
[
i
].
fqdn
,
req
.
replicas
[
i
].
port
,
req
.
replicas
[
i
].
id
);
...
...
@@ -229,10 +229,9 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
SReplica
*
pReplica
=
NULL
;
if
(
req
.
selfIndex
!=
-
1
)
{
if
(
req
.
selfIndex
!=
-
1
)
{
pReplica
=
&
req
.
replicas
[
req
.
selfIndex
];
}
else
{
}
else
{
pReplica
=
&
req
.
learnerReplicas
[
req
.
learnerSelfIndex
];
}
if
(
pReplica
->
id
!=
pMgmt
->
pData
->
dnodeId
||
pReplica
->
port
!=
tsServerPort
||
...
...
@@ -313,10 +312,10 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
_OVER:
if
(
code
!=
0
)
{
vnodeClose
(
pImpl
);
vnodeDestroy
(
path
,
pMgmt
->
pTfs
);
vnodeDestroy
(
0
,
path
,
pMgmt
->
pTfs
);
}
else
{
dInfo
(
"vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created"
,
req
.
vgId
,
TMSG_INFO
(
pMsg
->
msgType
));
dInfo
(
"vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created"
,
req
.
vgId
,
TMSG_INFO
(
pMsg
->
msgType
));
}
tFreeSCreateVnodeReq
(
&
req
);
...
...
@@ -331,12 +330,12 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return
-
1
;
}
if
(
req
.
learnerReplicas
==
0
)
{
if
(
req
.
learnerReplicas
==
0
)
{
req
.
learnerSelfIndex
=
-
1
;
}
dInfo
(
"vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request"
,
req
.
vgId
,
TMSG_INFO
(
pMsg
->
msgType
));
dInfo
(
"vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request"
,
req
.
vgId
,
TMSG_INFO
(
pMsg
->
msgType
));
SVnodeObj
*
pVnode
=
vmAcquireVnode
(
pMgmt
,
req
.
vgId
);
if
(
pVnode
==
NULL
)
{
...
...
@@ -347,7 +346,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
ESyncRole
role
=
vnodeGetRole
(
pVnode
->
pImpl
);
dInfo
(
"vgId:%d, checking node role:%d"
,
req
.
vgId
,
role
);
if
(
role
==
TAOS_SYNC_ROLE_VOTER
)
{
if
(
role
==
TAOS_SYNC_ROLE_VOTER
)
{
dError
(
"vgId:%d, failed to alter vnode type since node already is role:%d"
,
req
.
vgId
,
role
);
terrno
=
TSDB_CODE_VND_ALREADY_IS_VOTER
;
vmReleaseVnode
(
pMgmt
,
pVnode
);
...
...
@@ -355,7 +354,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
dInfo
(
"vgId:%d, checking node catch up"
,
req
.
vgId
);
if
(
vnodeIsCatchUp
(
pVnode
->
pImpl
)
!=
1
)
{
if
(
vnodeIsCatchUp
(
pVnode
->
pImpl
)
!=
1
)
{
terrno
=
TSDB_CODE_VND_NOT_CATCH_UP
;
vmReleaseVnode
(
pMgmt
,
pVnode
);
return
-
1
;
...
...
@@ -375,9 +374,8 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
dInfo
(
"vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d"
,
vgId
,
i
,
pReplica
->
fqdn
,
pReplica
->
port
,
pReplica
->
id
);
}
if
(
req
.
replica
<=
0
||
(
req
.
selfIndex
<
0
&&
req
.
learnerSelfIndex
<
0
)
||
req
.
selfIndex
>=
req
.
replica
||
req
.
learnerSelfIndex
>=
req
.
learnerReplica
)
{
if
(
req
.
replica
<=
0
||
(
req
.
selfIndex
<
0
&&
req
.
learnerSelfIndex
<
0
)
||
req
.
selfIndex
>=
req
.
replica
||
req
.
learnerSelfIndex
>=
req
.
learnerReplica
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
dError
(
"vgId:%d, failed to alter replica since invalid msg"
,
vgId
);
vmReleaseVnode
(
pMgmt
,
pVnode
);
...
...
@@ -385,10 +383,9 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
SReplica
*
pReplica
=
NULL
;
if
(
req
.
selfIndex
!=
-
1
)
{
if
(
req
.
selfIndex
!=
-
1
)
{
pReplica
=
&
req
.
replicas
[
req
.
selfIndex
];
}
else
{
}
else
{
pReplica
=
&
req
.
learnerReplicas
[
req
.
learnerSelfIndex
];
}
...
...
@@ -555,12 +552,13 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return
-
1
;
}
if
(
alterReq
.
learnerReplica
==
0
)
{
if
(
alterReq
.
learnerReplica
==
0
)
{
alterReq
.
learnerSelfIndex
=
-
1
;
}
int32_t
vgId
=
alterReq
.
vgId
;
dInfo
(
"vgId:%d,vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
dInfo
(
"vgId:%d,vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
"learnerSelfIndex:%d strict:%d"
,
vgId
,
TMSG_INFO
(
pMsg
->
msgType
),
alterReq
.
replica
,
alterReq
.
selfIndex
,
alterReq
.
learnerReplica
,
alterReq
.
learnerSelfIndex
,
alterReq
.
strict
);
...
...
@@ -573,8 +571,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
dInfo
(
"vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d"
,
vgId
,
i
,
pReplica
->
fqdn
,
pReplica
->
port
,
pReplica
->
port
);
}
if
(
alterReq
.
replica
<=
0
||
(
alterReq
.
selfIndex
<
0
&&
alterReq
.
learnerSelfIndex
<
0
)
||
if
(
alterReq
.
replica
<=
0
||
(
alterReq
.
selfIndex
<
0
&&
alterReq
.
learnerSelfIndex
<
0
)
||
alterReq
.
selfIndex
>=
alterReq
.
replica
||
alterReq
.
learnerSelfIndex
>=
alterReq
.
learnerReplica
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
dError
(
"vgId:%d, failed to alter replica since invalid msg"
,
vgId
);
...
...
@@ -582,10 +579,9 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
SReplica
*
pReplica
=
NULL
;
if
(
alterReq
.
selfIndex
!=
-
1
)
{
if
(
alterReq
.
selfIndex
!=
-
1
)
{
pReplica
=
&
alterReq
.
replicas
[
alterReq
.
selfIndex
];
}
else
{
}
else
{
pReplica
=
&
alterReq
.
learnerReplicas
[
alterReq
.
learnerSelfIndex
];
}
...
...
@@ -641,7 +637,8 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return
-
1
;
}
dInfo
(
"vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
dInfo
(
"vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
"learnerSelfIndex:%d strict:%d"
,
vgId
,
TMSG_INFO
(
pMsg
->
msgType
),
alterReq
.
replica
,
alterReq
.
selfIndex
,
alterReq
.
learnerReplica
,
alterReq
.
learnerSelfIndex
,
alterReq
.
strict
);
...
...
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
浏览文件 @
7b4bb723
...
...
@@ -208,7 +208,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal)
if
(
pVnode
->
dropped
)
{
dInfo
(
"vgId:%d, vnode is destroyed, dropped:%d"
,
pVnode
->
vgId
,
pVnode
->
dropped
);
snprintf
(
path
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d"
,
TD_DIRSEP
,
pVnode
->
vgId
);
vnodeDestroy
(
path
,
pMgmt
->
pTfs
);
vnodeDestroy
(
p
Vnode
->
vgId
,
p
ath
,
pMgmt
->
pTfs
);
}
taosMemoryFree
(
pVnode
->
path
);
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
7b4bb723
...
...
@@ -33,8 +33,8 @@
#include "tmsg.h"
#include "trow.h"
#include "tdb.h"
#include "storageapi.h"
#include "tdb.h"
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -57,7 +57,7 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
int32_t
diskPrimary
,
STfs
*
pTfs
);
int32_t
vnodeRestoreVgroupId
(
const
char
*
srcPath
,
const
char
*
dstPath
,
int32_t
srcVgId
,
int32_t
dstVgId
,
int32_t
diskPrimary
,
STfs
*
pTfs
);
void
vnodeDestroy
(
const
char
*
path
,
STfs
*
pTfs
);
void
vnodeDestroy
(
int32_t
vgId
,
const
char
*
path
,
STfs
*
pTfs
);
SVnode
*
vnodeOpen
(
const
char
*
path
,
int32_t
diskPrimary
,
STfs
*
pTfs
,
SMsgCb
msgCb
);
void
vnodePreClose
(
SVnode
*
pVnode
);
void
vnodePostClose
(
SVnode
*
pVnode
);
...
...
@@ -70,9 +70,9 @@ int32_t vnodeStart(SVnode *pVnode);
void
vnodeStop
(
SVnode
*
pVnode
);
int64_t
vnodeGetSyncHandle
(
SVnode
*
pVnode
);
void
vnodeGetSnapshot
(
SVnode
*
pVnode
,
SSnapshot
*
pSnapshot
);
void
vnodeGetInfo
(
void
*
pVnode
,
const
char
**
dbname
,
int32_t
*
vgId
,
int64_t
*
numOfTables
,
int64_t
*
numOfNormalTables
);
void
vnodeGetInfo
(
void
*
pVnode
,
const
char
**
dbname
,
int32_t
*
vgId
,
int64_t
*
numOfTables
,
int64_t
*
numOfNormalTables
);
int32_t
vnodeProcessCreateTSma
(
SVnode
*
pVnode
,
void
*
pCont
,
uint32_t
contLen
);
int32_t
vnodeGetTableList
(
void
*
pVnode
,
int8_t
type
,
SArray
*
pList
);
int32_t
vnodeGetTableList
(
void
*
pVnode
,
int8_t
type
,
SArray
*
pList
);
int32_t
vnodeGetAllTableList
(
SVnode
*
pVnode
,
uint64_t
uid
,
SArray
*
list
);
int32_t
vnodeIsCatchUp
(
SVnode
*
pVnode
);
ESyncRole
vnodeGetRole
(
SVnode
*
pVnode
);
...
...
@@ -80,7 +80,8 @@ ESyncRole vnodeGetRole(SVnode *pVnode);
int32_t
vnodeGetCtbIdList
(
void
*
pVnode
,
int64_t
suid
,
SArray
*
list
);
int32_t
vnodeGetCtbIdListByFilter
(
SVnode
*
pVnode
,
int64_t
suid
,
SArray
*
list
,
bool
(
*
filter
)(
void
*
arg
),
void
*
arg
);
int32_t
vnodeGetStbIdList
(
SVnode
*
pVnode
,
int64_t
suid
,
SArray
*
list
);
int32_t
vnodeGetStbIdListByFilter
(
SVnode
*
pVnode
,
int64_t
suid
,
SArray
*
list
,
bool
(
*
filter
)(
void
*
arg
,
void
*
arg1
),
void
*
arg
);
int32_t
vnodeGetStbIdListByFilter
(
SVnode
*
pVnode
,
int64_t
suid
,
SArray
*
list
,
bool
(
*
filter
)(
void
*
arg
,
void
*
arg1
),
void
*
arg
);
void
*
vnodeGetIdx
(
void
*
pVnode
);
void
*
vnodeGetIvtIdx
(
void
*
pVnode
);
...
...
@@ -105,13 +106,13 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs
void
vnodeProposeCommitOnNeed
(
SVnode
*
pVnode
,
bool
atExit
);
// meta
void
_metaReaderInit
(
SMetaReader
*
pReader
,
void
*
pVnode
,
int32_t
flags
,
SStoreMeta
*
pAPI
);
void
_metaReaderInit
(
SMetaReader
*
pReader
,
void
*
pVnode
,
int32_t
flags
,
SStoreMeta
*
pAPI
);
void
metaReaderReleaseLock
(
SMetaReader
*
pReader
);
void
metaReaderClear
(
SMetaReader
*
pReader
);
int32_t
metaReaderGetTableEntryByUid
(
SMetaReader
*
pReader
,
tb_uid_t
uid
);
int32_t
metaReaderGetTableEntryByUidCache
(
SMetaReader
*
pReader
,
tb_uid_t
uid
);
int32_t
metaGetTableTags
(
void
*
pVnode
,
uint64_t
suid
,
SArray
*
uidList
);
int32_t
metaGetTableTagsByUids
(
void
*
pVnode
,
int64_t
suid
,
SArray
*
uidList
);
int32_t
metaGetTableTagsByUids
(
void
*
pVnode
,
int64_t
suid
,
SArray
*
uidList
);
int32_t
metaReadNext
(
SMetaReader
*
pReader
);
const
void
*
metaGetTableTagVal
(
const
void
*
tag
,
int16_t
type
,
STagVal
*
tagVal
);
int
metaGetTableNameByUid
(
void
*
meta
,
uint64_t
uid
,
char
*
tbName
);
...
...
@@ -120,14 +121,14 @@ int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName);
int
metaGetTableUidByName
(
void
*
pVnode
,
char
*
tbName
,
uint64_t
*
uid
);
int
metaGetTableTypeByName
(
void
*
meta
,
char
*
tbName
,
ETableType
*
tbType
);
int
metaGetTableTtlByUid
(
void
*
meta
,
uint64_t
uid
,
int64_t
*
ttlDays
);
bool
metaIsTableExist
(
void
*
pVnode
,
tb_uid_t
uid
);
bool
metaIsTableExist
(
void
*
pVnode
,
tb_uid_t
uid
);
int32_t
metaGetCachedTableUidList
(
void
*
pVnode
,
tb_uid_t
suid
,
const
uint8_t
*
key
,
int32_t
keyLen
,
SArray
*
pList
,
bool
*
acquired
);
int32_t
metaUidFilterCachePut
(
void
*
pVnode
,
uint64_t
suid
,
const
void
*
pKey
,
int32_t
keyLen
,
void
*
pPayload
,
int32_t
payloadLen
,
double
selectivityRatio
);
tb_uid_t
metaGetTableEntryUidByName
(
SMeta
*
pMeta
,
const
char
*
name
);
int32_t
metaGetCachedTbGroup
(
void
*
pVnode
,
tb_uid_t
suid
,
const
uint8_t
*
pKey
,
int32_t
keyLen
,
SArray
**
pList
);
int32_t
metaPutTbGroupToCache
(
void
*
pVnode
,
uint64_t
suid
,
const
void
*
pKey
,
int32_t
keyLen
,
void
*
pPayload
,
int32_t
metaPutTbGroupToCache
(
void
*
pVnode
,
uint64_t
suid
,
const
void
*
pKey
,
int32_t
keyLen
,
void
*
pPayload
,
int32_t
payloadLen
);
bool
metaTbInFilterCache
(
void
*
pVnode
,
tb_uid_t
suid
,
int8_t
type
);
int32_t
metaPutTbToFilterCache
(
void
*
pVnode
,
tb_uid_t
suid
,
int8_t
type
);
...
...
@@ -228,23 +229,23 @@ STqReader *tqReaderOpen(SVnode *pVnode);
void
tqReaderClose
(
STqReader
*
);
void
tqReaderSetColIdList
(
STqReader
*
pReader
,
SArray
*
pColIdList
);
int32_t
tqReaderSetTbUidList
(
STqReader
*
pReader
,
const
SArray
*
tbUidList
,
const
char
*
id
);
int32_t
tqReaderSetTbUidList
(
STqReader
*
pReader
,
const
SArray
*
tbUidList
,
const
char
*
id
);
int32_t
tqReaderAddTbUidList
(
STqReader
*
pReader
,
const
SArray
*
pTableUidList
);
int32_t
tqReaderRemoveTbUidList
(
STqReader
*
pReader
,
const
SArray
*
tbUidList
);
bool
tqReaderIsQueriedTable
(
STqReader
*
pReader
,
uint64_t
uid
);
bool
tqCurrentBlockConsumed
(
const
STqReader
*
pReader
);
bool
tqReaderIsQueriedTable
(
STqReader
*
pReader
,
uint64_t
uid
);
bool
tqCurrentBlockConsumed
(
const
STqReader
*
pReader
);
int32_t
tqReaderSeek
(
STqReader
*
pReader
,
int64_t
ver
,
const
char
*
id
);
bool
tqNextBlockInWal
(
STqReader
*
pReader
,
const
char
*
idstr
);
bool
tqNextBlockImpl
(
STqReader
*
pReader
,
const
char
*
idstr
);
SWalReader
*
tqGetWalReader
(
STqReader
*
pReader
);
SSDataBlock
*
tqGetResultBlock
(
STqReader
*
pReader
);
SWalReader
*
tqGetWalReader
(
STqReader
*
pReader
);
SSDataBlock
*
tqGetResultBlock
(
STqReader
*
pReader
);
int32_t
extractMsgFromWal
(
SWalReader
*
pReader
,
void
**
pItem
,
int64_t
maxVer
,
const
char
*
id
);
int32_t
tqReaderSetSubmitMsg
(
STqReader
*
pReader
,
void
*
msgStr
,
int32_t
msgLen
,
int64_t
ver
);
bool
tqNextDataBlockFilterOut
(
STqReader
*
pReader
,
SHashObj
*
filterOutUids
);
int32_t
tqRetrieveDataBlock
(
STqReader
*
pReader
,
SSDataBlock
**
pRes
,
const
char
*
idstr
);
int32_t
tqRetrieveDataBlock
(
STqReader
*
pReader
,
SSDataBlock
**
pRes
,
const
char
*
idstr
);
int32_t
tqRetrieveTaosxBlock
(
STqReader
*
pReader
,
SArray
*
blocks
,
SArray
*
schemas
,
SSubmitTbData
**
pSubmitTbDataRet
);
int32_t
vnodeEnqueueStreamMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
);
...
...
source/dnode/vnode/src/inc/vndCos.h
浏览文件 @
7b4bb723
...
...
@@ -27,6 +27,7 @@ extern int8_t tsS3Enabled;
int32_t
s3Init
();
void
s3CleanUp
();
int32_t
s3PutObjectFromFile
(
const
char
*
file
,
const
char
*
object
);
void
s3DeleteObjectsByPrefix
(
const
char
*
prefix
);
void
s3DeleteObjects
(
const
char
*
object_name
[],
int
nobject
);
bool
s3Exists
(
const
char
*
object_name
);
bool
s3Get
(
const
char
*
object_name
,
const
char
*
path
);
...
...
source/dnode/vnode/src/vnd/vnodeCos.c
浏览文件 @
7b4bb723
...
...
@@ -85,6 +85,25 @@ int32_t s3PutObjectFromFile(const char *file_str, const char *object_str) {
return
code
;
}
void
s3DeleteObjectsByPrefix
(
const
char
*
prefix_str
)
{
cos_pool_t
*
p
=
NULL
;
cos_request_options_t
*
options
=
NULL
;
int
is_cname
=
0
;
cos_string_t
bucket
;
cos_status_t
*
s
=
NULL
;
cos_string_t
prefix
;
cos_pool_create
(
&
p
,
NULL
);
options
=
cos_request_options_create
(
p
);
s3InitRequestOptions
(
options
,
is_cname
);
cos_str_set
(
&
bucket
,
tsS3BucketName
);
cos_str_set
(
&
prefix
,
prefix_str
);
s
=
cos_delete_objects_by_prefix
(
options
,
&
bucket
,
&
prefix
);
log_status
(
s
);
cos_pool_destroy
(
p
);
}
void
s3DeleteObjects
(
const
char
*
object_name
[],
int
nobject
)
{
cos_pool_t
*
p
=
NULL
;
int
is_cname
=
0
;
...
...
@@ -314,6 +333,7 @@ long s3Size(const char *object_name) {
int32_t
s3Init
()
{
return
0
;
}
void
s3CleanUp
()
{}
int32_t
s3PutObjectFromFile
(
const
char
*
file
,
const
char
*
object
)
{
return
0
;
}
void
s3DeleteObjectsByPrefix
(
const
char
*
prefix
)
{}
void
s3DeleteObjects
(
const
char
*
object_name
[],
int
nobject
)
{}
bool
s3Exists
(
const
char
*
object_name
)
{
return
false
;
}
bool
s3Get
(
const
char
*
object_name
,
const
char
*
path
)
{
return
false
;
}
...
...
source/dnode/vnode/src/vnd/vnodeOpen.c
浏览文件 @
7b4bb723
...
...
@@ -14,6 +14,7 @@
*/
#include "vnd.h"
#include "vndCos.h"
int32_t
vnodeGetPrimaryDir
(
const
char
*
relPath
,
int32_t
diskPrimary
,
STfs
*
pTfs
,
char
*
buf
,
size_t
bufLen
)
{
if
(
pTfs
)
{
...
...
@@ -100,7 +101,7 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t
vInfo
(
"vgId:%d, replica:%d ep:%s:%u dnode:%d"
,
pReq
->
vgId
,
i
,
pNode
->
nodeFqdn
,
pNode
->
nodePort
,
pNode
->
nodeId
);
pCfg
->
replicaNum
++
;
}
if
(
pReq
->
selfIndex
!=
-
1
)
{
if
(
pReq
->
selfIndex
!=
-
1
)
{
pCfg
->
myIndex
=
pReq
->
selfIndex
;
}
for
(
int
i
=
pCfg
->
replicaNum
;
i
<
pReq
->
replica
+
pReq
->
learnerReplica
;
++
i
)
{
...
...
@@ -114,12 +115,12 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t
pCfg
->
totalReplicaNum
++
;
}
pCfg
->
totalReplicaNum
+=
pReq
->
replica
;
if
(
pReq
->
learnerSelfIndex
!=
-
1
)
{
if
(
pReq
->
learnerSelfIndex
!=
-
1
)
{
pCfg
->
myIndex
=
pReq
->
replica
+
pReq
->
learnerSelfIndex
;
}
vInfo
(
"vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d"
,
pReq
->
vgId
,
pCfg
->
replicaNum
,
pCfg
->
totalReplicaNum
,
pCfg
->
myIndex
);
vInfo
(
"vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d"
,
pReq
->
vgId
,
pCfg
->
replicaNum
,
pCfg
->
totalReplicaNum
,
pCfg
->
myIndex
);
info
.
config
.
syncCfg
=
*
pCfg
;
ret
=
vnodeSaveInfo
(
dir
,
&
info
);
...
...
@@ -293,9 +294,16 @@ int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t s
return
dstVgId
;
}
void
vnodeDestroy
(
const
char
*
path
,
STfs
*
pTfs
)
{
void
vnodeDestroy
(
int32_t
vgId
,
const
char
*
path
,
STfs
*
pTfs
)
{
vInfo
(
"path:%s is removed while destroy vnode"
,
path
);
tfsRmdir
(
pTfs
,
path
);
int32_t
nlevel
=
tfsGetLevel
(
pTfs
);
if
(
vgId
>
0
&&
nlevel
>
1
&&
tsS3Enabled
)
{
char
vnode_prefix
[
TSDB_FILENAME_LEN
];
snprintf
(
vnode_prefix
,
TSDB_FILENAME_LEN
,
"v%df"
,
vgId
);
s3DeleteObjectsByPrefix
(
vnode_prefix
);
}
}
static
int32_t
vnodeCheckDisk
(
int32_t
diskPrimary
,
STfs
*
pTfs
)
{
...
...
@@ -497,13 +505,9 @@ void vnodeClose(SVnode *pVnode) {
// start the sync timer after the queue is ready
int32_t
vnodeStart
(
SVnode
*
pVnode
)
{
return
vnodeSyncStart
(
pVnode
);
}
int32_t
vnodeIsCatchUp
(
SVnode
*
pVnode
){
return
syncIsCatchUp
(
pVnode
->
sync
);
}
int32_t
vnodeIsCatchUp
(
SVnode
*
pVnode
)
{
return
syncIsCatchUp
(
pVnode
->
sync
);
}
ESyncRole
vnodeGetRole
(
SVnode
*
pVnode
){
return
syncGetRole
(
pVnode
->
sync
);
}
ESyncRole
vnodeGetRole
(
SVnode
*
pVnode
)
{
return
syncGetRole
(
pVnode
->
sync
);
}
void
vnodeStop
(
SVnode
*
pVnode
)
{}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录