Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
5a7b938b
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
5a7b938b
编写于
11月 22, 2020
作者:
S
Shengliang Guan
提交者:
GitHub
11月 22, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #4312 from taosdata/feature/wal
[TD-2166]<fix>: vnode may be deleted during synchronization
上级
a1d247af
94e662c9
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
199 addition
and
141 deletion
+199
-141
src/dnode/src/dnodeEps.c
src/dnode/src/dnodeEps.c
+1
-1
src/inc/tsync.h
src/inc/tsync.h
+8
-9
src/mnode/src/mnodeSdb.c
src/mnode/src/mnodeSdb.c
+8
-9
src/mnode/src/mnodeTable.c
src/mnode/src/mnodeTable.c
+2
-2
src/sync/inc/syncInt.h
src/sync/inc/syncInt.h
+5
-6
src/sync/src/syncMain.c
src/sync/src/syncMain.c
+33
-34
src/sync/src/syncRestore.c
src/sync/src/syncRestore.c
+7
-7
src/sync/src/syncRetrieve.c
src/sync/src/syncRetrieve.c
+5
-5
src/sync/test/syncServer.c
src/sync/test/syncServer.c
+6
-7
src/vnode/src/vnodeMain.c
src/vnode/src/vnodeMain.c
+119
-55
src/vnode/src/vnodeRead.c
src/vnode/src/vnodeRead.c
+1
-2
src/vnode/src/vnodeWrite.c
src/vnode/src/vnodeWrite.c
+4
-4
未找到文件。
src/dnode/src/dnodeEps.c
浏览文件 @
5a7b938b
...
...
@@ -33,7 +33,7 @@ static void dnodePrintEps(SDnodeEps *eps);
int32_t
dnodeInitEps
()
{
pthread_mutex_init
(
&
tsEpsMutex
,
NULL
);
tsEpsHash
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
true
);
tsEpsHash
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_ENTRY_LOCK
);
dnodeResetEps
(
NULL
);
int32_t
ret
=
dnodeReadEps
();
if
(
ret
==
0
)
{
...
...
src/inc/tsync.h
浏览文件 @
5a7b938b
...
...
@@ -64,33 +64,32 @@ typedef struct {
if name is provided(name[0] is not zero), get the named file at the specified index. If not there, return
zero. If it is there, set the size to file size, and return file magic number. Index shall not be updated.
*/
typedef
uint32_t
(
*
FGetFileInfo
)(
void
*
ahandle
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int64_t
*
size
,
uint64_t
*
fversion
);
typedef
uint32_t
(
*
FGetFileInfo
)(
int32_t
vgId
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int64_t
*
size
,
uint64_t
*
fversion
);
// get the wal file from index or after
// return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file
typedef
int32_t
(
*
FGetWalInfo
)(
void
*
ahandle
,
char
*
fileName
,
int64_t
*
fileId
);
typedef
int32_t
(
*
FGetWalInfo
)(
int32_t
vgId
,
char
*
fileName
,
int64_t
*
fileId
);
// when a forward pkt is received, call this to handle data
typedef
int32_t
(
*
FWriteToCache
)(
void
*
ahandle
,
void
*
pHead
,
int32_t
qtype
,
void
*
pMsg
);
typedef
int32_t
(
*
FWriteToCache
)(
int32_t
vgId
,
void
*
pHead
,
int32_t
qtype
,
void
*
pMsg
);
// when forward is confirmed by peer, master call this API to notify app
typedef
void
(
*
FConfirmForward
)(
void
*
ahandle
,
void
*
mhandle
,
int32_t
code
);
typedef
void
(
*
FConfirmForward
)(
int32_t
vgId
,
void
*
mhandle
,
int32_t
code
);
// when role is changed, call this to notify app
typedef
void
(
*
FNotifyRole
)(
void
*
ahandle
,
int8_t
role
);
typedef
void
(
*
FNotifyRole
)(
int32_t
vgId
,
int8_t
role
);
// if a number of retrieving data failed, call this to start flow control
typedef
void
(
*
FNotifyFlowCtrl
)(
void
*
ahandle
,
int32_t
mseconds
);
typedef
void
(
*
FNotifyFlowCtrl
)(
int32_t
vgId
,
int32_t
mseconds
);
// when data file is synced successfully, notity app
typedef
int32_t
(
*
FNotifyFileSynced
)(
void
*
ahandle
,
uint64_t
fversion
);
typedef
int32_t
(
*
FNotifyFileSynced
)(
int32_t
vgId
,
uint64_t
fversion
);
typedef
struct
{
int32_t
vgId
;
// vgroup ID
uint64_t
version
;
// initial version
SSyncCfg
syncCfg
;
// configuration from mgmt
char
path
[
128
];
// path to the file
void
*
ahandle
;
// handle provided by APP
char
path
[
TSDB_FILENAME_LEN
];
// path to the file
FGetFileInfo
getFileInfo
;
FGetWalInfo
getWalInfo
;
FWriteToCache
writeToCache
;
...
...
src/mnode/src/mnodeSdb.c
浏览文件 @
5a7b938b
...
...
@@ -107,7 +107,7 @@ static taos_queue tsSdbWQueue;
static
SSdbWorkerPool
tsSdbPool
;
static
int32_t
sdbProcessWrite
(
void
*
pRow
,
void
*
pHead
,
int32_t
qtype
,
void
*
unused
);
static
int32_t
sdbWriteWalToQueue
(
void
*
vparam
,
void
*
pHead
,
int32_t
qtype
,
void
*
rparam
);
static
int32_t
sdbWriteWalToQueue
(
int32_t
vgId
,
void
*
pHead
,
int32_t
qtype
,
void
*
rparam
);
static
int32_t
sdbWriteRowToQueue
(
SSdbRow
*
pRow
,
int32_t
action
);
static
void
sdbFreeFromQueue
(
SSdbRow
*
pRow
);
static
void
*
sdbWorkerFp
(
void
*
pWorker
);
...
...
@@ -228,16 +228,16 @@ void sdbUpdateMnodeRoles() {
mnodeUpdateMnodeEpSet
();
}
static
uint32_t
sdbGetFileInfo
(
void
*
ahandle
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int64_t
*
size
,
uint64_t
*
fversion
)
{
static
uint32_t
sdbGetFileInfo
(
int32_t
vgId
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int64_t
*
size
,
uint64_t
*
fversion
)
{
sdbUpdateMnodeRoles
();
return
0
;
}
static
int32_t
sdbGetWalInfo
(
void
*
ahandle
,
char
*
fileName
,
int64_t
*
fileId
)
{
static
int32_t
sdbGetWalInfo
(
int32_t
vgId
,
char
*
fileName
,
int64_t
*
fileId
)
{
return
walGetWalFile
(
tsSdbMgmt
.
wal
,
fileName
,
fileId
);
}
static
void
sdbNotifyRole
(
void
*
ahandle
,
int8_t
role
)
{
static
void
sdbNotifyRole
(
int32_t
vgId
,
int8_t
role
)
{
sdbInfo
(
"vgId:1, mnode role changed from %s to %s"
,
syncRole
[
tsSdbMgmt
.
role
],
syncRole
[
role
]);
if
(
role
==
TAOS_SYNC_ROLE_MASTER
&&
tsSdbMgmt
.
role
!=
TAOS_SYNC_ROLE_MASTER
)
{
...
...
@@ -264,7 +264,7 @@ static void sdbHandleFailedConfirm(SSdbRow *pRow) {
}
FORCE_INLINE
static
void
sdbConfirmForward
(
void
*
ahandle
,
void
*
wparam
,
int32_t
code
)
{
static
void
sdbConfirmForward
(
int32_t
vgId
,
void
*
wparam
,
int32_t
code
)
{
if
(
wparam
==
NULL
)
return
;
SSdbRow
*
pRow
=
wparam
;
SMnodeMsg
*
pMsg
=
pRow
->
pMsg
;
...
...
@@ -370,7 +370,6 @@ void sdbUpdateSync(void *pMnodes) {
syncInfo
.
version
=
sdbGetVersion
();
syncInfo
.
syncCfg
=
syncCfg
;
sprintf
(
syncInfo
.
path
,
"%s"
,
tsMnodeDir
);
syncInfo
.
ahandle
=
NULL
;
syncInfo
.
getWalInfo
=
sdbGetWalInfo
;
syncInfo
.
getFileInfo
=
sdbGetFileInfo
;
syncInfo
.
writeToCache
=
sdbWriteWalToQueue
;
...
...
@@ -813,7 +812,7 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) {
if
(
pTable
->
keyType
==
SDB_KEY_STRING
||
pTable
->
keyType
==
SDB_KEY_VAR_STRING
)
{
hashFp
=
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
);
}
pTable
->
iHandle
=
taosHashInit
(
pTable
->
hashSessions
,
hashFp
,
true
,
true
);
pTable
->
iHandle
=
taosHashInit
(
pTable
->
hashSessions
,
hashFp
,
true
,
HASH_ENTRY_LOCK
);
tsSdbMgmt
.
numOfTables
++
;
tsSdbMgmt
.
tableList
[
pTable
->
id
]
=
pTable
;
...
...
@@ -967,7 +966,7 @@ static void sdbFreeFromQueue(SSdbRow *pRow) {
taosFreeQitem
(
pRow
);
}
static
int32_t
sdbWriteWalToQueue
(
void
*
vparam
,
void
*
wparam
,
int32_t
qtype
,
void
*
rparam
)
{
static
int32_t
sdbWriteWalToQueue
(
int32_t
vgId
,
void
*
wparam
,
int32_t
qtype
,
void
*
rparam
)
{
SWalHead
*
pHead
=
wparam
;
int32_t
size
=
sizeof
(
SSdbRow
)
+
sizeof
(
SWalHead
)
+
pHead
->
len
;
...
...
@@ -1039,7 +1038,7 @@ static void *sdbWorkerFp(void *pWorker) {
taosGetQitem
(
tsSdbWQall
,
&
qtype
,
(
void
**
)
&
pRow
);
if
(
qtype
==
TAOS_QTYPE_RPC
)
{
sdbConfirmForward
(
NULL
,
pRow
,
pRow
->
code
);
sdbConfirmForward
(
1
,
pRow
,
pRow
->
code
);
}
else
{
if
(
qtype
==
TAOS_QTYPE_FWD
)
{
syncConfirmForward
(
tsSdbMgmt
.
sync
,
pRow
->
pHead
->
version
,
pRow
->
code
);
...
...
src/mnode/src/mnodeTable.c
浏览文件 @
5a7b938b
...
...
@@ -394,7 +394,7 @@ static void mnodeAddTableIntoStable(SSTableObj *pStable, SCTableObj *pCtable) {
atomic_add_fetch_32
(
&
pStable
->
numOfTables
,
1
);
if
(
pStable
->
vgHash
==
NULL
)
{
pStable
->
vgHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
false
);
pStable
->
vgHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_NO_LOCK
);
}
if
(
pStable
->
vgHash
!=
NULL
)
{
...
...
@@ -413,7 +413,7 @@ static void mnodeRemoveTableFromStable(SSTableObj *pStable, SCTableObj *pCtable)
SVgObj
*
pVgroup
=
mnodeGetVgroup
(
pCtable
->
vgId
);
if
(
pVgroup
==
NULL
)
{
taosHashRemove
(
pStable
->
vgHash
,
(
char
*
)
&
pCtable
->
vgId
,
sizeof
(
pCtable
->
vgId
));
taosHashRemove
(
pStable
->
vgHash
,
&
pCtable
->
vgId
,
sizeof
(
pCtable
->
vgId
));
mDebug
(
"table:%s, vgId:%d is remove from stable vgList, sizeOfVgList:%d"
,
pStable
->
info
.
tableId
,
pCtable
->
vgId
,
(
int32_t
)
taosHashGetSize
(
pStable
->
vgHash
));
}
...
...
src/sync/inc/syncInt.h
浏览文件 @
5a7b938b
...
...
@@ -153,13 +153,12 @@ typedef struct SSyncNode {
int8_t
selfIndex
;
uint32_t
vgId
;
int64_t
rid
;
void
*
ahandle
;
SSyncPeer
*
peerInfo
[
TAOS_SYNC_MAX_REPLICA
+
1
];
// extra one for arbitrator
SSyncPeer
*
pMaster
;
SSyncPeer
*
peerInfo
[
TAOS_SYNC_MAX_REPLICA
+
1
];
// extra one for arbitrator
SSyncPeer
*
pMaster
;
SRecvBuffer
*
pRecv
;
SSyncFwds
*
pSyncFwds
;
// saved forward info if quorum >1
void
*
pFwdTimer
;
void
*
pRoleTimer
;
SSyncFwds
*
pSyncFwds
;
// saved forward info if quorum >1
void
*
pFwdTimer
;
void
*
pRoleTimer
;
FGetFileInfo
getFileInfo
;
FGetWalInfo
getWalInfo
;
FWriteToCache
writeToCache
;
...
...
src/sync/src/syncMain.c
浏览文件 @
5a7b938b
...
...
@@ -100,7 +100,7 @@ uint16_t syncGenTranId() {
}
int32_t
syncInit
()
{
SPoolInfo
info
;
SPoolInfo
info
=
{
0
}
;
info
.
numOfThreads
=
tsSyncTcpThreads
;
info
.
serverIp
=
0
;
...
...
@@ -124,7 +124,7 @@ int32_t syncInit() {
return
-
1
;
}
tsVgIdHash
=
taosHashInit
(
TSDB_MIN_VNODES
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
true
);
tsVgIdHash
=
taosHashInit
(
TSDB_MIN_VNODES
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_ENTRY_LOCK
);
if
(
tsVgIdHash
==
NULL
)
{
sError
(
"failed to init tsVgIdHash"
);
taosTmrCleanUp
(
tsSyncTmrCtrl
);
...
...
@@ -181,7 +181,6 @@ int64_t syncStart(const SSyncInfo *pInfo) {
tstrncpy
(
pNode
->
path
,
pInfo
->
path
,
sizeof
(
pNode
->
path
));
pthread_mutex_init
(
&
pNode
->
mutex
,
NULL
);
pNode
->
ahandle
=
pInfo
->
ahandle
;
pNode
->
getFileInfo
=
pInfo
->
getFileInfo
;
pNode
->
getWalInfo
=
pInfo
->
getWalInfo
;
pNode
->
writeToCache
=
pInfo
->
writeToCache
;
...
...
@@ -202,10 +201,10 @@ int64_t syncStart(const SSyncInfo *pInfo) {
return
-
1
;
}
for
(
int32_t
i
=
0
;
i
<
pCfg
->
replica
;
++
i
)
{
const
SNodeInfo
*
pNodeInfo
=
pCfg
->
nodeInfo
+
i
;
pNode
->
peerInfo
[
i
]
=
syncAddPeer
(
pNode
,
pNodeInfo
);
if
(
pNode
->
peerInfo
[
i
]
==
NULL
)
{
for
(
int32_t
i
ndex
=
0
;
index
<
pCfg
->
replica
;
++
index
)
{
const
SNodeInfo
*
pNodeInfo
=
pCfg
->
nodeInfo
+
i
ndex
;
pNode
->
peerInfo
[
i
ndex
]
=
syncAddPeer
(
pNode
,
pNodeInfo
);
if
(
pNode
->
peerInfo
[
i
ndex
]
==
NULL
)
{
sError
(
"vgId:%d, node:%d fqdn:%s port:%u is not configured, stop taosd"
,
pNode
->
vgId
,
pNodeInfo
->
nodeId
,
pNodeInfo
->
nodeFqdn
,
pNodeInfo
->
nodePort
);
syncStop
(
pNode
->
rid
);
...
...
@@ -213,7 +212,7 @@ int64_t syncStart(const SSyncInfo *pInfo) {
}
if
((
strcmp
(
pNodeInfo
->
nodeFqdn
,
tsNodeFqdn
)
==
0
)
&&
(
pNodeInfo
->
nodePort
==
tsSyncPort
))
{
pNode
->
selfIndex
=
i
;
pNode
->
selfIndex
=
i
ndex
;
}
}
...
...
@@ -252,10 +251,10 @@ int64_t syncStart(const SSyncInfo *pInfo) {
}
syncAddArbitrator
(
pNode
);
taosHashPut
(
tsVgIdHash
,
(
const
char
*
)
&
pNode
->
vgId
,
sizeof
(
int32_t
),
(
char
*
)(
&
pNode
)
,
sizeof
(
SSyncNode
*
));
taosHashPut
(
tsVgIdHash
,
&
pNode
->
vgId
,
sizeof
(
int32_t
),
&
pNode
,
sizeof
(
SSyncNode
*
));
if
(
pNode
->
notifyRole
)
{
(
*
pNode
->
notifyRole
)(
pNode
->
ahandle
,
nodeRole
);
(
*
pNode
->
notifyRole
)(
pNode
->
vgId
,
nodeRole
);
}
return
pNode
->
rid
;
...
...
@@ -269,14 +268,14 @@ void syncStop(int64_t rid) {
sInfo
(
"vgId:%d, cleanup sync"
,
pNode
->
vgId
);
pthread_mutex_lock
(
&
(
pNode
->
mutex
)
);
pthread_mutex_lock
(
&
pNode
->
mutex
);
if
(
tsVgIdHash
)
taosHashRemove
(
tsVgIdHash
,
(
const
char
*
)
&
pNode
->
vgId
,
sizeof
(
int32_t
));
if
(
tsVgIdHash
)
taosHashRemove
(
tsVgIdHash
,
&
pNode
->
vgId
,
sizeof
(
int32_t
));
if
(
pNode
->
pFwdTimer
)
taosTmrStop
(
pNode
->
pFwdTimer
);
if
(
pNode
->
pRoleTimer
)
taosTmrStop
(
pNode
->
pRoleTimer
);
for
(
int32_t
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
pPeer
=
pNode
->
peerInfo
[
i
];
for
(
int32_t
i
ndex
=
0
;
index
<
pNode
->
replica
;
++
index
)
{
pPeer
=
pNode
->
peerInfo
[
i
ndex
];
if
(
pPeer
)
syncRemovePeer
(
pPeer
);
}
...
...
@@ -298,7 +297,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
sInfo
(
"vgId:%d, reconfig, role:%s replica:%d old:%d"
,
pNode
->
vgId
,
syncRole
[
nodeRole
],
pNewCfg
->
replica
,
pNode
->
replica
);
pthread_mutex_lock
(
&
(
pNode
->
mutex
)
);
pthread_mutex_lock
(
&
pNode
->
mutex
);
for
(
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
for
(
j
=
0
;
j
<
pNewCfg
->
replica
;
++
j
)
{
...
...
@@ -348,7 +347,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
if
(
pNewCfg
->
replica
<=
1
)
{
sInfo
(
"vgId:%d, no peers are configured, work as master!"
,
pNode
->
vgId
);
nodeRole
=
TAOS_SYNC_ROLE_MASTER
;
(
*
pNode
->
notifyRole
)(
pNode
->
ahandle
,
nodeRole
);
(
*
pNode
->
notifyRole
)(
pNode
->
vgId
,
nodeRole
);
}
pthread_mutex_unlock
(
&
(
pNode
->
mutex
));
...
...
@@ -412,10 +411,10 @@ void syncRecover(int64_t rid) {
// if take this node to unsync state, the whole system may not work
nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
(*pNode->notifyRole)(pNode->
ahandle
, nodeRole);
(*pNode->notifyRole)(pNode->
vgId
, nodeRole);
nodeVersion = 0;
pthread_mutex_lock(&
(pNode->mutex)
);
pthread_mutex_lock(&
pNode->mutex
);
for (int32_t i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i];
...
...
@@ -568,7 +567,7 @@ static void syncResetFlowCtrl(SSyncNode *pNode) {
}
if
(
pNode
->
notifyFlowCtrl
)
{
(
*
pNode
->
notifyFlowCtrl
)(
pNode
->
ahandle
,
0
);
(
*
pNode
->
notifyFlowCtrl
)(
pNode
->
vgId
,
0
);
}
}
...
...
@@ -631,7 +630,7 @@ static void syncChooseMaster(SSyncNode *pNode) {
}
#endif
syncResetFlowCtrl
(
pNode
);
(
*
pNode
->
notifyRole
)(
pNode
->
ahandle
,
nodeRole
);
(
*
pNode
->
notifyRole
)(
pNode
->
vgId
,
nodeRole
);
}
else
{
pPeer
=
pNode
->
peerInfo
[
index
];
sInfo
(
"%s, it shall work as master"
,
pPeer
->
id
);
...
...
@@ -662,7 +661,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
if
(
onlineNum
<=
replica
*
0
.
5
)
{
if
(
nodeRole
!=
TAOS_SYNC_ROLE_UNSYNCED
)
{
nodeRole
=
TAOS_SYNC_ROLE_UNSYNCED
;
(
*
pNode
->
notifyRole
)(
pNode
->
ahandle
,
nodeRole
);
(
*
pNode
->
notifyRole
)(
pNode
->
vgId
,
nodeRole
);
sInfo
(
"vgId:%d, self change to unsynced state, online:%d replica:%d"
,
pNode
->
vgId
,
onlineNum
,
replica
);
}
}
else
{
...
...
@@ -675,7 +674,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
if
(
masterIndex
==
pNode
->
selfIndex
)
{
sError
(
"%s, peer is master, work as slave instead"
,
pTemp
->
id
);
nodeRole
=
TAOS_SYNC_ROLE_SLAVE
;
(
*
pNode
->
notifyRole
)(
pNode
->
ahandle
,
nodeRole
);
(
*
pNode
->
notifyRole
)(
pNode
->
vgId
,
nodeRole
);
}
}
}
...
...
@@ -692,7 +691,7 @@ static int32_t syncValidateMaster(SSyncPeer *pPeer) {
if
(
nodeRole
==
TAOS_SYNC_ROLE_MASTER
&&
nodeVersion
<
pPeer
->
version
)
{
sDebug
(
"%s, peer has higher sver:%"
PRIu64
", restart all peer connections"
,
pPeer
->
id
,
pPeer
->
version
);
nodeRole
=
TAOS_SYNC_ROLE_UNSYNCED
;
(
*
pNode
->
notifyRole
)(
pNode
->
ahandle
,
nodeRole
);
(
*
pNode
->
notifyRole
)(
pNode
->
vgId
,
nodeRole
);
code
=
-
1
;
for
(
int32_t
index
=
0
;
index
<
pNode
->
replica
;
++
index
)
{
...
...
@@ -729,7 +728,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus* peersStatus, int8_t new
}
else
{
sInfo
(
"%s, is master, work as slave, self sver:%"
PRIu64
,
pMaster
->
id
,
nodeVersion
);
nodeRole
=
TAOS_SYNC_ROLE_SLAVE
;
(
*
pNode
->
notifyRole
)(
pNode
->
ahandle
,
nodeRole
);
(
*
pNode
->
notifyRole
)(
pNode
->
vgId
,
nodeRole
);
}
}
else
if
(
nodeRole
==
TAOS_SYNC_ROLE_SLAVE
&&
pMaster
==
pPeer
)
{
sDebug
(
"%s, is master, continue work as slave, self sver:%"
PRIu64
,
pMaster
->
id
,
nodeVersion
);
...
...
@@ -832,7 +831,7 @@ static void syncNotStarted(void *param, void *tmrId) {
SSyncPeer
*
pPeer
=
param
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
pthread_mutex_lock
(
&
(
pNode
->
mutex
)
);
pthread_mutex_lock
(
&
pNode
->
mutex
);
pPeer
->
timer
=
NULL
;
sInfo
(
"%s, sync connection is still not up, restart"
,
pPeer
->
id
);
syncRestartConnection
(
pPeer
);
...
...
@@ -843,7 +842,7 @@ static void syncTryRecoverFromMaster(void *param, void *tmrId) {
SSyncPeer
*
pPeer
=
param
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
pthread_mutex_lock
(
&
(
pNode
->
mutex
)
);
pthread_mutex_lock
(
&
pNode
->
mutex
);
syncRecoverFromMaster
(
pPeer
);
pthread_mutex_unlock
(
&
(
pNode
->
mutex
));
}
...
...
@@ -913,7 +912,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
if
(
nodeRole
==
TAOS_SYNC_ROLE_SLAVE
)
{
// nodeVersion = pHead->version;
(
*
pNode
->
writeToCache
)(
pNode
->
ahandle
,
pHead
,
TAOS_QTYPE_FWD
,
NULL
);
(
*
pNode
->
writeToCache
)(
pNode
->
vgId
,
pHead
,
TAOS_QTYPE_FWD
,
NULL
);
}
else
{
if
(
nodeSStatus
!=
TAOS_SYNC_STATUS_INIT
)
{
syncSaveIntoBuffer
(
pPeer
,
pHead
);
...
...
@@ -969,7 +968,7 @@ static int32_t syncProcessPeerMsg(void *param, void *buffer) {
char
*
cont
=
buffer
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
pthread_mutex_lock
(
&
(
pNode
->
mutex
)
);
pthread_mutex_lock
(
&
pNode
->
mutex
);
int32_t
code
=
syncReadPeerMsg
(
pPeer
,
&
head
,
cont
);
...
...
@@ -1067,7 +1066,7 @@ static void syncCheckPeerConnection(void *param, void *tmrId) {
SSyncPeer
*
pPeer
=
param
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
pthread_mutex_lock
(
&
(
pNode
->
mutex
)
);
pthread_mutex_lock
(
&
pNode
->
mutex
);
sDebug
(
"%s, check peer connection"
,
pPeer
->
id
);
syncSetupPeerConnection
(
pPeer
);
...
...
@@ -1119,7 +1118,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
}
SSyncNode
*
pNode
=
*
ppNode
;
pthread_mutex_lock
(
&
(
pNode
->
mutex
)
);
pthread_mutex_lock
(
&
pNode
->
mutex
);
SSyncPeer
*
pPeer
;
for
(
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
...
...
@@ -1157,7 +1156,7 @@ static void syncProcessBrokenLink(void *param) {
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
if
(
taosAcquireRef
(
tsSyncRefId
,
pNode
->
rid
)
==
NULL
)
return
;
pthread_mutex_lock
(
&
(
pNode
->
mutex
)
);
pthread_mutex_lock
(
&
pNode
->
mutex
);
sDebug
(
"%s, TCP link is broken since %s"
,
pPeer
->
id
,
strerror
(
errno
));
pPeer
->
peerFd
=
-
1
;
...
...
@@ -1228,7 +1227,7 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code
if
(
confirm
&&
pFwdInfo
->
confirmed
==
0
)
{
sTrace
(
"vgId:%d, forward is confirmed, hver:%"
PRIu64
" code:%x"
,
pNode
->
vgId
,
pFwdInfo
->
version
,
pFwdInfo
->
code
);
(
*
pNode
->
confirmForward
)(
pNode
->
ahandle
,
pFwdInfo
->
mhandle
,
pFwdInfo
->
code
);
(
*
pNode
->
confirmForward
)(
pNode
->
vgId
,
pFwdInfo
->
mhandle
,
pFwdInfo
->
code
);
pFwdInfo
->
confirmed
=
1
;
}
}
...
...
@@ -1263,7 +1262,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
int64_t
time
=
taosGetTimestampMs
();
if
(
pSyncFwds
->
fwds
>
0
)
{
pthread_mutex_lock
(
&
(
pNode
->
mutex
)
);
pthread_mutex_lock
(
&
pNode
->
mutex
);
for
(
int32_t
i
=
0
;
i
<
pSyncFwds
->
fwds
;
++
i
)
{
SFwdInfo
*
pFwdInfo
=
pSyncFwds
->
fwdInfo
+
(
pSyncFwds
->
first
+
i
)
%
tsMaxFwdInfo
;
if
(
ABS
(
time
-
pFwdInfo
->
time
)
<
2000
)
break
;
...
...
@@ -1321,7 +1320,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
pSyncHead
->
len
=
sizeof
(
SWalHead
)
+
pWalHead
->
len
;
fwdLen
=
pSyncHead
->
len
+
sizeof
(
SSyncHead
);
// include the WAL and SYNC head
pthread_mutex_lock
(
&
(
pNode
->
mutex
)
);
pthread_mutex_lock
(
&
pNode
->
mutex
);
for
(
int32_t
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
pPeer
=
pNode
->
peerInfo
[
i
];
...
...
src/sync/src/syncRestore.c
浏览文件 @
5a7b938b
...
...
@@ -38,7 +38,7 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, int32_t sindex, int32_t eindex
while
(
1
)
{
name
[
0
]
=
0
;
magic
=
(
*
pNode
->
getFileInfo
)(
pNode
->
ahandle
,
name
,
&
index
,
eindex
,
&
size
,
&
fversion
);
magic
=
(
*
pNode
->
getFileInfo
)(
pNode
->
vgId
,
name
,
&
index
,
eindex
,
&
size
,
&
fversion
);
if
(
magic
==
0
)
break
;
snprintf
(
fname
,
sizeof
(
fname
),
"%s/%s"
,
pNode
->
path
,
name
);
...
...
@@ -84,7 +84,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
// check the file info
sinfo
=
minfo
;
sDebug
(
"%s, get file info:%s"
,
pPeer
->
id
,
minfo
.
name
);
sinfo
.
magic
=
(
*
pNode
->
getFileInfo
)(
pNode
->
ahandle
,
sinfo
.
name
,
&
sinfo
.
index
,
TAOS_SYNC_MAX_INDEX
,
&
sinfo
.
size
,
sinfo
.
magic
=
(
*
pNode
->
getFileInfo
)(
pNode
->
vgId
,
sinfo
.
name
,
&
sinfo
.
index
,
TAOS_SYNC_MAX_INDEX
,
&
sinfo
.
size
,
&
sinfo
.
fversion
);
// if file not there or magic is not the same, file shall be synced
...
...
@@ -164,7 +164,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) {
}
lastVer
=
pHead
->
version
;
(
*
pNode
->
writeToCache
)(
pNode
->
ahandle
,
pHead
,
TAOS_QTYPE_WAL
,
NULL
);
(
*
pNode
->
writeToCache
)(
pNode
->
vgId
,
pHead
,
TAOS_QTYPE_WAL
,
NULL
);
}
if
(
code
<
0
)
{
...
...
@@ -179,7 +179,7 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) {
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SWalHead
*
pHead
=
(
SWalHead
*
)
offset
;
(
*
pNode
->
writeToCache
)(
pNode
->
ahandle
,
pHead
,
TAOS_QTYPE_FWD
,
NULL
);
(
*
pNode
->
writeToCache
)(
pNode
->
vgId
,
pHead
,
TAOS_QTYPE_FWD
,
NULL
);
offset
+=
pHead
->
len
+
sizeof
(
SWalHead
);
return
offset
;
...
...
@@ -276,7 +276,7 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
// if code > 0, data file is changed, notify app, and pass the version
if
(
code
>
0
&&
pNode
->
notifyFileSynced
)
{
if
((
*
pNode
->
notifyFileSynced
)(
pNode
->
ahandle
,
fversion
)
<
0
)
{
if
((
*
pNode
->
notifyFileSynced
)(
pNode
->
vgId
,
fversion
)
<
0
)
{
sError
(
"%s, app not in ready state"
,
pPeer
->
id
);
return
-
1
;
}
...
...
@@ -307,7 +307,7 @@ void *syncRestoreData(void *param) {
taosBlockSIGPIPE
();
__sync_fetch_and_add
(
&
tsSyncNum
,
1
);
(
*
pNode
->
notifyRole
)(
pNode
->
ahandle
,
TAOS_SYNC_ROLE_SYNCING
);
(
*
pNode
->
notifyRole
)(
pNode
->
vgId
,
TAOS_SYNC_ROLE_SYNCING
);
if
(
syncOpenRecvBuffer
(
pNode
)
<
0
)
{
sError
(
"%s, failed to allocate recv buffer, restart connection"
,
pPeer
->
id
);
...
...
@@ -324,7 +324,7 @@ void *syncRestoreData(void *param) {
}
}
(
*
pNode
->
notifyRole
)(
pNode
->
ahandle
,
nodeRole
);
(
*
pNode
->
notifyRole
)(
pNode
->
vgId
,
nodeRole
);
nodeSStatus
=
TAOS_SYNC_STATUS_INIT
;
taosClose
(
pPeer
->
syncFd
);
...
...
src/sync/src/syncRetrieve.c
浏览文件 @
5a7b938b
...
...
@@ -108,7 +108,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
while
(
1
)
{
// retrieve file info
fileInfo
.
name
[
0
]
=
0
;
fileInfo
.
magic
=
(
*
pNode
->
getFileInfo
)(
pNode
->
ahandle
,
fileInfo
.
name
,
&
fileInfo
.
index
,
TAOS_SYNC_MAX_INDEX
,
fileInfo
.
magic
=
(
*
pNode
->
getFileInfo
)(
pNode
->
vgId
,
fileInfo
.
name
,
&
fileInfo
.
index
,
TAOS_SYNC_MAX_INDEX
,
&
fileInfo
.
size
,
&
fileInfo
.
fversion
);
// fileInfo.size = htonl(size);
...
...
@@ -354,7 +354,7 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index)
index
++
;
wname
[
0
]
=
0
;
code
=
(
*
pNode
->
getWalInfo
)(
pNode
->
ahandle
,
wname
,
&
index
);
code
=
(
*
pNode
->
getWalInfo
)(
pNode
->
vgId
,
wname
,
&
index
);
if
(
code
<
0
)
break
;
if
(
wname
[
0
]
==
0
)
{
code
=
0
;
...
...
@@ -382,7 +382,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
while
(
1
)
{
// retrieve wal info
wname
[
0
]
=
0
;
code
=
(
*
pNode
->
getWalInfo
)(
pNode
->
ahandle
,
wname
,
&
index
);
code
=
(
*
pNode
->
getWalInfo
)(
pNode
->
vgId
,
wname
,
&
index
);
if
(
code
<
0
)
break
;
// error
if
(
wname
[
0
]
==
0
)
{
// no wal file
sDebug
(
"%s, no wal file"
,
pPeer
->
id
);
...
...
@@ -487,10 +487,10 @@ void *syncRetrieveData(void *param) {
// if file is changed 3 times continuously, start flow control
pPeer
->
numOfRetrieves
++
;
if
(
pPeer
->
numOfRetrieves
>=
2
&&
pNode
->
notifyFlowCtrl
)
(
*
pNode
->
notifyFlowCtrl
)(
pNode
->
ahandle
,
4
<<
(
pPeer
->
numOfRetrieves
-
2
));
(
*
pNode
->
notifyFlowCtrl
)(
pNode
->
vgId
,
4
<<
(
pPeer
->
numOfRetrieves
-
2
));
}
else
{
pPeer
->
numOfRetrieves
=
0
;
if
(
pNode
->
notifyFlowCtrl
)
(
*
pNode
->
notifyFlowCtrl
)(
pNode
->
ahandle
,
0
);
if
(
pNode
->
notifyFlowCtrl
)
(
*
pNode
->
notifyFlowCtrl
)(
pNode
->
vgId
,
0
);
}
pPeer
->
fileChanged
=
0
;
...
...
src/sync/test/syncServer.c
浏览文件 @
5a7b938b
...
...
@@ -70,7 +70,7 @@ int writeIntoWal(SWalHead *pHead) {
return
0
;
}
void
confirmForward
(
void
*
ahandle
,
void
*
mhandle
,
int32_t
code
)
{
void
confirmForward
(
int32_t
vgId
,
void
*
mhandle
,
int32_t
code
)
{
SRpcMsg
*
pMsg
=
(
SRpcMsg
*
)
mhandle
;
SWalHead
*
pHead
=
(
SWalHead
*
)(((
char
*
)
pMsg
->
pCont
)
-
sizeof
(
SWalHead
));
...
...
@@ -227,7 +227,7 @@ void processRequestMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
taosWriteQitem
(
qhandle
,
TAOS_QTYPE_RPC
,
pTemp
);
}
uint32_t
getFileInfo
(
void
*
ahandle
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int64_t
*
size
,
uint64_t
*
fversion
)
{
uint32_t
getFileInfo
(
int32_t
vgId
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int64_t
*
size
,
uint64_t
*
fversion
)
{
uint32_t
magic
;
struct
stat
fstat
;
char
aname
[
280
];
...
...
@@ -254,7 +254,7 @@ uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex
return
magic
;
}
int
getWalInfo
(
void
*
ahandle
,
char
*
name
,
int64_t
*
index
)
{
int
getWalInfo
(
int32_t
vgId
,
char
*
name
,
int64_t
*
index
)
{
struct
stat
fstat
;
char
aname
[
280
];
...
...
@@ -272,7 +272,7 @@ int getWalInfo(void *ahandle, char *name, int64_t *index) {
return
1
;
}
int
writeToCache
(
void
*
ahandle
,
void
*
data
,
int
type
)
{
int
writeToCache
(
int32_t
vgId
,
void
*
data
,
int
type
)
{
SWalHead
*
pHead
=
data
;
uDebug
(
"pkt from peer is received, ver:%"
PRIu64
" len:%d type:%d"
,
pHead
->
version
,
pHead
->
len
,
type
);
...
...
@@ -285,9 +285,9 @@ int writeToCache(void *ahandle, void *data, int type) {
return
0
;
}
void
confirmFwd
(
void
*
ahandle
,
int64_t
version
)
{
return
;
}
void
confirmFwd
(
int32_t
vgId
,
int64_t
version
)
{
return
;
}
void
notifyRole
(
void
*
ahandle
,
int8_t
r
)
{
void
notifyRole
(
int32_t
vgId
,
int8_t
r
)
{
role
=
r
;
printf
(
"current role:%s
\n
"
,
syncRole
[
role
]);
}
...
...
@@ -296,7 +296,6 @@ void initSync() {
pCfg
->
replica
=
1
;
pCfg
->
quorum
=
1
;
syncInfo
.
vgId
=
1
;
syncInfo
.
ahandle
=
&
syncInfo
;
syncInfo
.
getFileInfo
=
getFileInfo
;
syncInfo
.
getWalInfo
=
getWalInfo
;
syncInfo
.
writeToCache
=
writeToCache
;
...
...
src/vnode/src/vnodeMain.c
浏览文件 @
5a7b938b
...
...
@@ -30,19 +30,21 @@
static
SHashObj
*
tsVnodesHash
;
static
void
vnodeCleanUp
(
SVnodeObj
*
pVnode
);
static
int
vnodeProcessTsdbStatus
(
void
*
arg
,
int
status
,
int
eno
);
static
uint32_t
vnodeGetFileInfo
(
void
*
ahandle
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int64_t
*
size
,
uint64_t
*
fversion
);
static
int
vnodeGetWalInfo
(
void
*
ahandle
,
char
*
fileName
,
int64_t
*
fileId
);
static
void
vnodeNotifyRole
(
void
*
ahandle
,
int8_t
role
);
static
void
vnodeCtrlFlow
(
void
*
handle
,
int32_t
mseconds
);
static
int
vnodeNotifyFileSynced
(
void
*
ahandle
,
uint64_t
fversion
);
static
int32_t
vnodeProcessTsdbStatus
(
void
*
arg
,
int32_t
status
,
int32_t
eno
);
static
uint32_t
vnodeGetFileInfo
(
int32_t
vgId
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int64_t
*
size
,
uint64_t
*
fversion
);
static
int32_t
vnodeGetWalInfo
(
int32_t
vgId
,
char
*
fileName
,
int64_t
*
fileId
);
static
void
vnodeNotifyRole
(
int32_t
vgId
,
int8_t
role
);
static
void
vnodeCtrlFlow
(
int32_t
vgId
,
int32_t
mseconds
);
static
int32_t
vnodeNotifyFileSynced
(
int32_t
vgId
,
uint64_t
fversion
);
static
void
vnodeConfirmForard
(
int32_t
vgId
,
void
*
wparam
,
int32_t
code
);
static
int32_t
vnodeWriteToCache
(
int32_t
vgId
,
void
*
wparam
,
int32_t
qtype
,
void
*
rparam
);
#ifndef _SYNC
int64_t
syncStart
(
const
SSyncInfo
*
info
)
{
return
NULL
;
}
int32_t
syncForwardToPeer
(
int64_t
rid
,
void
*
pHead
,
void
*
mhandle
,
int
qtype
)
{
return
0
;
}
int32_t
syncForwardToPeer
(
int64_t
rid
,
void
*
pHead
,
void
*
mhandle
,
int
32_t
qtype
)
{
return
0
;
}
void
syncStop
(
int64_t
rid
)
{}
int32_t
syncReconfig
(
int64_t
rid
,
const
SSyncCfg
*
cfg
)
{
return
0
;
}
int
syncGetNodesRole
(
int64_t
rid
,
SNodesRole
*
cfg
)
{
return
0
;
}
int32_t
syncReconfig
(
int64_t
rid
,
const
SSyncCfg
*
cfg
)
{
return
0
;
}
int
32_t
syncGetNodesRole
(
int64_t
rid
,
SNodesRole
*
cfg
)
{
return
0
;
}
void
syncConfirmForward
(
int64_t
rid
,
uint64_t
version
,
int32_t
code
)
{}
#endif
...
...
@@ -55,13 +57,13 @@ char* vnodeStatus[] = {
};
int32_t
vnodeInitResources
()
{
int
code
=
syncInit
();
int
32_t
code
=
syncInit
();
if
(
code
!=
0
)
return
code
;
vnodeInitWriteFp
();
vnodeInitReadFp
();
tsVnodesHash
=
taosHashInit
(
TSDB_MIN_VNODES
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
true
);
tsVnodesHash
=
taosHashInit
(
TSDB_MIN_VNODES
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_ENTRY_LOCK
);
if
(
tsVnodesHash
==
NULL
)
{
vError
(
"failed to init vnode list"
);
return
TSDB_CODE_VND_OUT_OF_MEMORY
;
...
...
@@ -173,8 +175,8 @@ int32_t vnodeDrop(int32_t vgId) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
vnodeAlter
(
void
*
param
,
SCreateVnodeMsg
*
pVnodeCfg
)
{
SVnodeObj
*
pVnode
=
param
;
int32_t
vnodeAlter
(
void
*
v
param
,
SCreateVnodeMsg
*
pVnodeCfg
)
{
SVnodeObj
*
pVnode
=
v
param
;
// vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS
// cfgVersion can be corrected by status msg
...
...
@@ -325,16 +327,28 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
walRemoveAllOldFiles
(
pVnode
->
wal
);
walRenew
(
pVnode
->
wal
);
pVnode
->
qMgmt
=
qOpenQueryMgmt
(
pVnode
->
vgId
);
if
(
pVnode
->
qMgmt
==
NULL
)
{
vnodeCleanUp
(
pVnode
);
return
terrno
;
}
pVnode
->
events
=
NULL
;
vDebug
(
"vgId:%d, vnode is opened in %s, pVnode:%p"
,
pVnode
->
vgId
,
rootDir
,
pVnode
);
tsdbIncCommitRef
(
pVnode
->
vgId
);
taosHashPut
(
tsVnodesHash
,
&
pVnode
->
vgId
,
sizeof
(
int32_t
),
&
pVnode
,
sizeof
(
SVnodeObj
*
));
SSyncInfo
syncInfo
;
syncInfo
.
vgId
=
pVnode
->
vgId
;
syncInfo
.
version
=
pVnode
->
version
;
syncInfo
.
syncCfg
=
pVnode
->
syncCfg
;
sprintf
(
syncInfo
.
path
,
"%s"
,
rootDir
);
syncInfo
.
ahandle
=
pVnode
;
syncInfo
.
getWalInfo
=
vnodeGetWalInfo
;
syncInfo
.
getFileInfo
=
vnodeGetFileInfo
;
syncInfo
.
writeToCache
=
vnodeWriteTo
WQueu
e
;
syncInfo
.
confirmForward
=
dnodeSendRpcVWriteRsp
;
syncInfo
.
writeToCache
=
vnodeWriteTo
Cach
e
;
syncInfo
.
confirmForward
=
vnodeConfirmForard
;
syncInfo
.
notifyRole
=
vnodeNotifyRole
;
syncInfo
.
notifyFlowCtrl
=
vnodeCtrlFlow
;
syncInfo
.
notifyFileSynced
=
vnodeNotifyFileSynced
;
...
...
@@ -346,24 +360,13 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
if
(
pVnode
->
sync
<=
0
)
{
vError
(
"vgId:%d, failed to open sync module, replica:%d reason:%s"
,
pVnode
->
vgId
,
pVnode
->
syncCfg
.
replica
,
tstrerror
(
terrno
));
vnodeRelease
(
pVnode
);
vnodeCleanUp
(
pVnode
);
return
terrno
;
}
#endif
pVnode
->
qMgmt
=
qOpenQueryMgmt
(
pVnode
->
vgId
);
if
(
pVnode
->
qMgmt
==
NULL
)
{
vnodeCleanUp
(
pVnode
);
return
terrno
;
}
#endif
pVnode
->
events
=
NULL
;
pVnode
->
status
=
TAOS_VN_STATUS_READY
;
vDebug
(
"vgId:%d, vnode is opened in %s, pVnode:%p"
,
pVnode
->
vgId
,
rootDir
,
pVnode
);
tsdbIncCommitRef
(
pVnode
->
vgId
);
taosHashPut
(
tsVnodesHash
,
(
const
char
*
)
&
pVnode
->
vgId
,
sizeof
(
int32_t
),
(
char
*
)(
&
pVnode
),
sizeof
(
SVnodeObj
*
));
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -389,7 +392,7 @@ void vnodeRelease(void *vparam) {
assert
(
refCount
>=
0
);
if
(
refCount
>
0
)
{
if
(
pVnode
->
status
==
TAOS_VN_STATUS_RESET
&&
refCount
==
2
)
{
if
(
pVnode
->
status
==
TAOS_VN_STATUS_RESET
&&
refCount
<=
3
)
{
tsem_post
(
&
pVnode
->
sem
);
}
return
;
...
...
@@ -567,11 +570,11 @@ void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes) {
static
void
vnodeCleanUp
(
SVnodeObj
*
pVnode
)
{
// remove from hash, so new messages wont be consumed
taosHashRemove
(
tsVnodesHash
,
(
const
char
*
)
&
pVnode
->
vgId
,
sizeof
(
int32_t
));
taosHashRemove
(
tsVnodesHash
,
&
pVnode
->
vgId
,
sizeof
(
int32_t
));
if
(
pVnode
->
status
!=
TAOS_VN_STATUS_INIT
)
{
// it may be in updateing or reset state, then it shall wait
int
i
=
0
;
int
32_t
i
=
0
;
while
(
atomic_val_compare_exchange_8
(
&
pVnode
->
status
,
TAOS_VN_STATUS_READY
,
TAOS_VN_STATUS_CLOSING
)
!=
TAOS_VN_STATUS_READY
)
{
if
(
++
i
%
1000
==
0
)
{
...
...
@@ -595,7 +598,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
}
// TODO: this is a simple implement
static
int
vnodeProcessTsdbStatus
(
void
*
arg
,
int
status
,
in
t
eno
)
{
static
int
32_t
vnodeProcessTsdbStatus
(
void
*
arg
,
int32_t
status
,
int32_
t
eno
)
{
SVnodeObj
*
pVnode
=
arg
;
if
(
eno
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -608,37 +611,59 @@ static int vnodeProcessTsdbStatus(void *arg, int status, int eno) {
if
(
status
==
TSDB_STATUS_COMMIT_START
)
{
pVnode
->
fversion
=
pVnode
->
version
;
vDebug
(
"vgId:%d, start commit, fver:%"
PRIu64
" vver:%"
PRIu64
,
pVnode
->
vgId
,
pVnode
->
fversion
,
pVnode
->
version
);
if
(
pVnode
->
status
==
TAOS_VN_STATUS_INIT
)
{
return
0
;
}
else
{
if
(
pVnode
->
status
!=
TAOS_VN_STATUS_INIT
)
{
return
walRenew
(
pVnode
->
wal
);
}
return
0
;
}
if
(
status
==
TSDB_STATUS_COMMIT_OVER
)
{
vDebug
(
"vgId:%d, commit over, fver:%"
PRIu64
" vver:%"
PRIu64
,
pVnode
->
vgId
,
pVnode
->
fversion
,
pVnode
->
version
);
pVnode
->
isFull
=
0
;
walRemoveOneOldFile
(
pVnode
->
wal
);
if
(
pVnode
->
status
!=
TAOS_VN_STATUS_INIT
)
{
walRemoveOneOldFile
(
pVnode
->
wal
);
}
return
vnodeSaveVersion
(
pVnode
);
}
return
0
;
}
static
uint32_t
vnodeGetFileInfo
(
void
*
ahandle
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int64_t
*
size
,
static
uint32_t
vnodeGetFileInfo
(
int32_t
vgId
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int64_t
*
size
,
uint64_t
*
fversion
)
{
SVnodeObj
*
pVnode
=
ahandle
;
SVnodeObj
*
pVnode
=
vnodeAcquire
(
vgId
);
if
(
pVnode
==
NULL
)
{
vError
(
"vgId:%d, vnode not found while get file info"
,
vgId
);
return
0
;
}
*
fversion
=
pVnode
->
fversion
;
return
tsdbGetFileInfo
(
pVnode
->
tsdb
,
name
,
index
,
eindex
,
size
);
uint32_t
ret
=
tsdbGetFileInfo
(
pVnode
->
tsdb
,
name
,
index
,
eindex
,
size
);
vnodeRelease
(
pVnode
);
return
ret
;
}
static
int
vnodeGetWalInfo
(
void
*
ahandle
,
char
*
fileName
,
int64_t
*
fileId
)
{
SVnodeObj
*
pVnode
=
ahandle
;
return
walGetWalFile
(
pVnode
->
wal
,
fileName
,
fileId
);
static
int32_t
vnodeGetWalInfo
(
int32_t
vgId
,
char
*
fileName
,
int64_t
*
fileId
)
{
SVnodeObj
*
pVnode
=
vnodeAcquire
(
vgId
);
if
(
pVnode
==
NULL
)
{
vError
(
"vgId:%d, vnode not found while get wal info"
,
vgId
);
return
-
1
;
}
int32_t
code
=
walGetWalFile
(
pVnode
->
wal
,
fileName
,
fileId
);
vnodeRelease
(
pVnode
);
return
code
;
}
static
void
vnodeNotifyRole
(
void
*
ahandle
,
int8_t
role
)
{
SVnodeObj
*
pVnode
=
ahandle
;
static
void
vnodeNotifyRole
(
int32_t
vgId
,
int8_t
role
)
{
SVnodeObj
*
pVnode
=
vnodeAcquire
(
vgId
);
if
(
pVnode
==
NULL
)
{
vError
(
"vgId:%d, vnode not found while notify role"
,
vgId
);
return
;
}
vInfo
(
"vgId:%d, sync role changed from %s to %s"
,
pVnode
->
vgId
,
syncRole
[
pVnode
->
role
],
syncRole
[
role
]);
pVnode
->
role
=
role
;
dnodeSendStatusMsgToMnode
();
...
...
@@ -648,17 +673,26 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) {
}
else
{
cqStop
(
pVnode
->
cq
);
}
vnodeRelease
(
pVnode
);
}
static
void
vnodeCtrlFlow
(
void
*
ahandle
,
int32_t
mseconds
)
{
SVnodeObj
*
pVnode
=
ahandle
;
static
void
vnodeCtrlFlow
(
int32_t
vgId
,
int32_t
mseconds
)
{
SVnodeObj
*
pVnode
=
vnodeAcquire
(
vgId
);
if
(
pVnode
==
NULL
)
{
vError
(
"vgId:%d, vnode not found while ctrl flow"
,
vgId
);
return
;
}
if
(
pVnode
->
delayMs
!=
mseconds
)
{
pVnode
->
delayMs
=
mseconds
;
vDebug
(
"vgId:%d, sync flow control, mseconds:%d"
,
pVnode
->
vgId
,
mseconds
);
}
vnodeRelease
(
pVnode
);
}
static
int
vnodeResetTsdb
(
SVnodeObj
*
pVnode
)
{
static
int
32_t
vnodeResetTsdb
(
SVnodeObj
*
pVnode
)
{
char
rootDir
[
128
]
=
"
\0
"
;
sprintf
(
rootDir
,
"%s/tsdb"
,
pVnode
->
rootDir
);
...
...
@@ -668,11 +702,11 @@ static int vnodeResetTsdb(SVnodeObj *pVnode) {
void
*
tsdb
=
pVnode
->
tsdb
;
pVnode
->
tsdb
=
NULL
;
// acquire vnode
int32_t
refCount
=
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
if
(
refCount
>
2
)
{
if
(
refCount
>
3
)
{
tsem_wait
(
&
pVnode
->
sem
);
}
...
...
@@ -692,14 +726,44 @@ static int vnodeResetTsdb(SVnodeObj *pVnode) {
return
0
;
}
static
int
vnodeNotifyFileSynced
(
void
*
ahandle
,
uint64_t
fversion
)
{
SVnodeObj
*
pVnode
=
ahandle
;
static
int32_t
vnodeNotifyFileSynced
(
int32_t
vgId
,
uint64_t
fversion
)
{
SVnodeObj
*
pVnode
=
vnodeAcquire
(
vgId
);
if
(
pVnode
==
NULL
)
{
vError
(
"vgId:%d, vnode not found while notify file synced"
,
vgId
);
return
0
;
}
pVnode
->
fversion
=
fversion
;
pVnode
->
version
=
fversion
;
vnodeSaveVersion
(
pVnode
);
vDebug
(
"vgId:%d, data file is synced, fver:%"
PRIu64
" vver:%"
PRIu64
,
pVnode
->
vgId
,
pVnode
->
fversion
,
pVnode
->
version
);
return
vnodeResetTsdb
(
pVnode
);
vDebug
(
"vgId:%d, data file is synced, fver:%"
PRIu64
" vver:%"
PRIu64
,
vgId
,
fversion
,
fversion
);
int32_t
code
=
vnodeResetTsdb
(
pVnode
);
vnodeRelease
(
pVnode
);
return
code
;
}
void
vnodeConfirmForard
(
int32_t
vgId
,
void
*
wparam
,
int32_t
code
)
{
void
*
pVnode
=
vnodeAcquire
(
vgId
);
if
(
pVnode
==
NULL
)
{
vError
(
"vgId:%d, vnode not found while confirm forward"
,
vgId
);
return
;
}
dnodeSendRpcVWriteRsp
(
pVnode
,
wparam
,
code
);
vnodeRelease
(
pVnode
);
}
static
int32_t
vnodeWriteToCache
(
int32_t
vgId
,
void
*
wparam
,
int32_t
qtype
,
void
*
rparam
)
{
SVnodeObj
*
pVnode
=
vnodeAcquire
(
vgId
);
if
(
pVnode
==
NULL
)
{
vError
(
"vgId:%d, vnode not found while write to cache"
,
vgId
);
return
TSDB_CODE_VND_INVALID_VGROUP_ID
;
}
int32_t
code
=
vnodeWriteToWQueue
(
pVnode
,
wparam
,
qtype
,
rparam
);
vnodeRelease
(
pVnode
);
return
code
;
}
src/vnode/src/vnodeRead.c
浏览文件 @
5a7b938b
...
...
@@ -53,8 +53,7 @@ int32_t vnodeProcessRead(void *vparam, SVReadMsg *pRead) {
return
(
*
vnodeProcessReadMsgFp
[
msgType
])(
pVnode
,
pRead
);
}
static
int32_t
vnodeCheckRead
(
void
*
vparam
)
{
SVnodeObj
*
pVnode
=
vparam
;
static
int32_t
vnodeCheckRead
(
SVnodeObj
*
pVnode
)
{
if
(
pVnode
->
status
!=
TAOS_VN_STATUS_READY
)
{
vDebug
(
"vgId:%d, vnode status is %s, refCount:%d pVnode:%p"
,
pVnode
->
vgId
,
vnodeStatus
[
pVnode
->
status
],
pVnode
->
refCount
,
pVnode
);
...
...
src/vnode/src/vnodeWrite.c
浏览文件 @
5a7b938b
...
...
@@ -48,10 +48,10 @@ void vnodeInitWriteFp(void) {
}
int32_t
vnodeProcessWrite
(
void
*
vparam
,
void
*
wparam
,
int32_t
qtype
,
void
*
rparam
)
{
int32_t
code
=
0
;
SVnodeObj
*
pVnode
=
vparam
;
SWalHead
*
pHead
=
wparam
;
SRspRet
*
pRspRet
=
rparam
;
int32_t
code
=
0
;
SVnodeObj
*
pVnode
=
vparam
;
SWalHead
*
pHead
=
wparam
;
SRspRet
*
pRspRet
=
rparam
;
if
(
vnodeProcessWriteMsgFp
[
pHead
->
msgType
]
==
NULL
)
{
vError
(
"vgId:%d, msg:%s not processed since no handle, qtype:%s hver:%"
PRIu64
,
pVnode
->
vgId
,
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录