Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
9d6e3776
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
9d6e3776
编写于
11月 18, 2020
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
TD-1898
上级
52e99a8a
变更
6
显示空白变更内容
内联
并排
Showing
6 changed file
with
86 addition
and
85 deletion
+86
-85
src/mnode/src/mnodeSdb.c
src/mnode/src/mnodeSdb.c
+64
-66
src/sync/inc/syncInt.h
src/sync/inc/syncInt.h
+1
-1
src/sync/src/syncMain.c
src/sync/src/syncMain.c
+17
-14
src/sync/src/syncRestore.c
src/sync/src/syncRestore.c
+1
-1
src/sync/src/syncRetrieve.c
src/sync/src/syncRetrieve.c
+1
-1
src/wal/src/walWrite.c
src/wal/src/walWrite.c
+2
-2
未找到文件。
src/mnode/src/mnodeSdb.c
浏览文件 @
9d6e3776
...
...
@@ -166,7 +166,7 @@ static char *sdbGetKeyStr(SSdbTable *pTable, void *key) {
}
}
static
char
*
sdbGet
KeyStrFromObj
(
SSdbTable
*
pTable
,
void
*
key
)
{
static
char
*
sdbGet
ObjStr
(
SSdbTable
*
pTable
,
void
*
key
)
{
return
sdbGetKeyStr
(
pTable
,
sdbGetObjKey
(
pTable
,
key
));
}
...
...
@@ -176,18 +176,18 @@ static void *sdbGetTableFromId(int32_t tableId) {
static
int32_t
sdbInitWal
()
{
SWalCfg
walCfg
=
{.
vgId
=
1
,
.
walLevel
=
TAOS_WAL_FSYNC
,
.
keep
=
TAOS_WAL_KEEP
,
.
fsyncPeriod
=
0
};
char
temp
[
TSDB_FILENAME_LEN
]
;
char
temp
[
TSDB_FILENAME_LEN
]
=
{
0
}
;
sprintf
(
temp
,
"%s/wal"
,
tsMnodeDir
);
tsSdbObj
.
wal
=
walOpen
(
temp
,
&
walCfg
);
if
(
tsSdbObj
.
wal
==
NULL
)
{
sdbError
(
"
failed to open sdb
wal in %s"
,
tsMnodeDir
);
sdbError
(
"
vgId:1, failed to open
wal in %s"
,
tsMnodeDir
);
return
-
1
;
}
sdbInfo
(
"
open sdb
wal for restore"
);
sdbInfo
(
"
vgId:1, open
wal for restore"
);
int
code
=
walRestore
(
tsSdbObj
.
wal
,
NULL
,
sdbWrite
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
sdbError
(
"
failed to open wal for restore, reason:
%s"
,
tstrerror
(
code
));
sdbError
(
"
vgId:1, failed to open wal for restore since
%s"
,
tstrerror
(
code
));
return
-
1
;
}
return
0
;
...
...
@@ -205,10 +205,10 @@ static void sdbRestoreTables() {
totalRows
+=
pTable
->
numOfRows
;
numOfTables
++
;
sdbDebug
(
"
table:%s, is restored, numOfR
ows:%"
PRId64
,
pTable
->
tableName
,
pTable
->
numOfRows
);
sdbDebug
(
"
vgId:1, sdb:%s is restored, r
ows:%"
PRId64
,
pTable
->
tableName
,
pTable
->
numOfRows
);
}
sdbInfo
(
"
sdb is restored, ver:%"
PRId64
" totalRows:%d numOfT
ables:%d"
,
tsSdbObj
.
version
,
totalRows
,
numOfTables
);
sdbInfo
(
"
vgId:1, sdb is restored, mver:%"
PRIu64
" rows:%d t
ables:%d"
,
tsSdbObj
.
version
,
totalRows
,
numOfTables
);
}
void
sdbUpdateMnodeRoles
()
{
...
...
@@ -217,12 +217,12 @@ void sdbUpdateMnodeRoles() {
SNodesRole
roles
=
{
0
};
syncGetNodesRole
(
tsSdbObj
.
sync
,
&
roles
);
sdbInfo
(
"
update mnodes sync roles, total
:%d"
,
tsSdbObj
.
cfg
.
replica
);
sdbInfo
(
"
vgId:1, update mnodes roles, replica
:%d"
,
tsSdbObj
.
cfg
.
replica
);
for
(
int32_t
i
=
0
;
i
<
tsSdbObj
.
cfg
.
replica
;
++
i
)
{
SMnodeObj
*
pMnode
=
mnodeGetMnode
(
roles
.
nodeId
[
i
]);
if
(
pMnode
!=
NULL
)
{
pMnode
->
role
=
roles
.
role
[
i
];
sdbInfo
(
"mnode:%d, role:%s"
,
pMnode
->
mnodeId
,
mnodeGetMnodeRoleStr
(
pMnode
->
role
));
sdbInfo
(
"
vgId:1,
mnode:%d, role:%s"
,
pMnode
->
mnodeId
,
mnodeGetMnodeRoleStr
(
pMnode
->
role
));
if
(
pMnode
->
mnodeId
==
dnodeGetDnodeId
())
tsSdbObj
.
role
=
pMnode
->
role
;
mnodeDecMnodeRef
(
pMnode
);
}
...
...
@@ -242,7 +242,7 @@ static int32_t sdbGetWalInfo(void *ahandle, char *fileName, int64_t *fileId) {
}
static
void
sdbNotifyRole
(
void
*
ahandle
,
int8_t
role
)
{
sdbInfo
(
"mnode role changed from %s to %s"
,
mnodeGetMnodeRoleStr
(
tsSdbObj
.
role
),
mnodeGetMnodeRoleStr
(
role
));
sdbInfo
(
"
vgId:1,
mnode role changed from %s to %s"
,
mnodeGetMnodeRoleStr
(
tsSdbObj
.
role
),
mnodeGetMnodeRoleStr
(
role
));
if
(
role
==
TAOS_SYNC_ROLE_MASTER
&&
tsSdbObj
.
role
!=
TAOS_SYNC_ROLE_MASTER
)
{
balanceReset
();
...
...
@@ -262,24 +262,21 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
int32_t
processedCount
=
atomic_add_fetch_32
(
&
pOper
->
processedCount
,
1
);
if
(
processedCount
<=
1
)
{
if
(
pMsg
!=
NULL
)
{
sdbDebug
(
"app:%p:%p, waiting for confirm this operation, count:%d result:%s"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
processedCount
,
tstrerror
(
code
));
sdbDebug
(
"vgId:1, msg:%p waiting for confirm, count:%d code:%x"
,
pMsg
,
processedCount
,
code
);
}
return
;
}
if
(
pMsg
!=
NULL
)
{
sdbDebug
(
"app:%p:%p, is confirmed and will do callback func, result:%s"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
tstrerror
(
code
));
sdbDebug
(
"vgId:1, msg:%p is confirmed, code:%x"
,
pMsg
,
code
);
}
// failed to forward, need revert insert
if
(
pOper
->
retCode
!=
TSDB_CODE_SUCCESS
)
{
SWalHead
*
pHead
=
(
void
*
)
pOper
+
sizeof
(
SSdbOper
)
+
SDB_SYNC_HACK
;
int32_t
action
=
pHead
->
msgType
%
10
;
sdbError
(
"table:%s record:%p:%s ver:%"
PRIu64
", action:%d failed to foward reason:%s"
,
((
SSdbTable
*
)
pOper
->
table
)
->
tableName
,
pOper
->
pObj
,
sdbGetKeyStr
(
pOper
->
table
,
pHead
->
cont
),
pHead
->
version
,
action
,
tstrerror
(
pOper
->
retCode
));
sdbError
(
"vgId:1, key:%p:%s hver:%"
PRIu64
" action:%d, failed to foward since %s"
,
pOper
->
pObj
,
sdbGetKeyStr
(
pOper
->
table
,
pHead
->
cont
),
pHead
->
version
,
action
,
tstrerror
(
pOper
->
retCode
));
if
(
action
==
SDB_ACTION_INSERT
)
{
// It's better to create a table in two stages, create it first and then set it success
//sdbDeleteHash(pOper->table, pOper);
...
...
@@ -314,11 +311,11 @@ void sdbUpdateAsync() {
void
sdbUpdateSync
(
void
*
pMnodes
)
{
SMnodeInfos
*
mnodes
=
pMnodes
;
if
(
!
mnodeIsRunning
())
{
mDebug
(
"mnode not start yet, update sync config later"
);
mDebug
(
"
vgId:1,
mnode not start yet, update sync config later"
);
return
;
}
mDebug
(
"update sync config in sync module, mnodes:%p"
,
pMnodes
);
mDebug
(
"
vgId:1,
update sync config in sync module, mnodes:%p"
,
pMnodes
);
SSyncCfg
syncCfg
=
{
0
};
int32_t
index
=
0
;
...
...
@@ -344,7 +341,7 @@ void sdbUpdateSync(void *pMnodes) {
}
sdbFreeIter
(
pIter
);
syncCfg
.
replica
=
index
;
mDebug
(
"mnodes info not input, use infos in sdb, numOfMnodes:%d"
,
syncCfg
.
replica
);
mDebug
(
"
vgId:1,
mnodes info not input, use infos in sdb, numOfMnodes:%d"
,
syncCfg
.
replica
);
}
else
{
for
(
index
=
0
;
index
<
mnodes
->
mnodeNum
;
++
index
)
{
SMnodeInfo
*
node
=
&
mnodes
->
mnodeInfos
[
index
];
...
...
@@ -353,7 +350,7 @@ void sdbUpdateSync(void *pMnodes) {
syncCfg
.
nodeInfo
[
index
].
nodePort
+=
TSDB_PORT_SYNC
;
}
syncCfg
.
replica
=
index
;
mDebug
(
"mnodes info input, numOfMnodes:%d"
,
syncCfg
.
replica
);
mDebug
(
"
vgId:1,
mnodes info input, numOfMnodes:%d"
,
syncCfg
.
replica
);
}
syncCfg
.
quorum
=
(
syncCfg
.
replica
==
1
)
?
1
:
2
;
...
...
@@ -367,18 +364,19 @@ void sdbUpdateSync(void *pMnodes) {
}
if
(
!
hasThisDnode
)
{
sdbDebug
(
"update sync config, this dnode not exist"
);
sdbDebug
(
"
vgId:1,
update sync config, this dnode not exist"
);
return
;
}
if
(
memcmp
(
&
syncCfg
,
&
tsSdbObj
.
cfg
,
sizeof
(
SSyncCfg
))
==
0
)
{
sdbDebug
(
"update sync config, info not changed"
);
sdbDebug
(
"
vgId:1,
update sync config, info not changed"
);
return
;
}
sdbInfo
(
"work as mnode, replica:%d"
,
syncCfg
.
replica
);
sdbInfo
(
"
vgId:1,
work as mnode, replica:%d"
,
syncCfg
.
replica
);
for
(
int32_t
i
=
0
;
i
<
syncCfg
.
replica
;
++
i
)
{
sdbInfo
(
"mnode:%d, %s:%d"
,
syncCfg
.
nodeInfo
[
i
].
nodeId
,
syncCfg
.
nodeInfo
[
i
].
nodeFqdn
,
syncCfg
.
nodeInfo
[
i
].
nodePort
);
sdbInfo
(
"vgId:1, mnode:%d, %s:%d"
,
syncCfg
.
nodeInfo
[
i
].
nodeId
,
syncCfg
.
nodeInfo
[
i
].
nodeFqdn
,
syncCfg
.
nodeInfo
[
i
].
nodePort
);
}
SSyncInfo
syncInfo
=
{
0
};
...
...
@@ -429,7 +427,7 @@ void sdbCleanUp() {
tsSdbObj
.
status
=
SDB_STATUS_CLOSING
;
sdbCleanupWriteWorker
();
sdbDebug
(
"
sdb will be closed, ver:%"
PRId
64
,
tsSdbObj
.
version
);
sdbDebug
(
"
vgId:1, sdb will be closed, mver:%"
PRIu
64
,
tsSdbObj
.
version
);
if
(
tsSdbObj
.
sync
)
{
syncStop
(
tsSdbObj
.
sync
);
...
...
@@ -450,7 +448,7 @@ void sdbIncRef(void *handle, void *pObj) {
SSdbTable
*
pTable
=
handle
;
int32_t
*
pRefCount
=
(
int32_t
*
)(
pObj
+
pTable
->
refCountPos
);
int32_t
refCount
=
atomic_add_fetch_32
(
pRefCount
,
1
);
sdbTrace
(
"
add ref to table:%s record:%p:%s:%d"
,
pTable
->
tableName
,
pObj
,
sdbGetKeyStrFromObj
(
pTable
,
pObj
),
refCount
);
sdbTrace
(
"
vgId:1, sdb:%s, inc ref to key:%p:%s:%d"
,
pTable
->
tableName
,
pObj
,
sdbGetObjStr
(
pTable
,
pObj
),
refCount
);
}
void
sdbDecRef
(
void
*
handle
,
void
*
pObj
)
{
...
...
@@ -459,11 +457,11 @@ void sdbDecRef(void *handle, void *pObj) {
SSdbTable
*
pTable
=
handle
;
int32_t
*
pRefCount
=
(
int32_t
*
)(
pObj
+
pTable
->
refCountPos
);
int32_t
refCount
=
atomic_sub_fetch_32
(
pRefCount
,
1
);
sdbTrace
(
"
def ref of table:%s record:%p:%s:%d"
,
pTable
->
tableName
,
pObj
,
sdbGetKeyStrFromObj
(
pTable
,
pObj
),
refCount
);
sdbTrace
(
"
vgId:1, sdb:%s, dec ref to key:%p:%s:%d"
,
pTable
->
tableName
,
pObj
,
sdbGetObjStr
(
pTable
,
pObj
),
refCount
);
int32_t
*
updateEnd
=
pObj
+
pTable
->
refCountPos
-
4
;
if
(
refCount
<=
0
&&
*
updateEnd
)
{
sdbTrace
(
"
table:%s, record:%p:%s:%d is destroyed"
,
pTable
->
tableName
,
pObj
,
sdbGetKeyStrFromObj
(
pTable
,
pObj
),
refCount
);
sdbTrace
(
"
vgId:1, sdb:%s, key:%p:%s:%d destroyed"
,
pTable
->
tableName
,
pObj
,
sdbGetObjStr
(
pTable
,
pObj
),
refCount
);
SSdbOper
oper
=
{.
pObj
=
pObj
};
(
*
pTable
->
destroyFp
)(
&
oper
);
}
...
...
@@ -523,13 +521,13 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) {
atomic_add_fetch_32
(
&
pTable
->
autoIndex
,
1
);
}
sdbDebug
(
"
table:%s, insert record:%s to hash, rowSize:%d numOfR
ows:%"
PRId64
", msg:%p"
,
pTable
->
tableName
,
sdbGet
KeyStrFromObj
(
pTable
,
pOper
->
pObj
),
pOper
->
rowSize
,
pTable
->
numOfRows
,
pOper
->
pMsg
);
sdbDebug
(
"
vgId:1, sdb:%s, insert key:%s to hash, rowSize:%d r
ows:%"
PRId64
", msg:%p"
,
pTable
->
tableName
,
sdbGet
ObjStr
(
pTable
,
pOper
->
pObj
),
pOper
->
rowSize
,
pTable
->
numOfRows
,
pOper
->
pMsg
);
int32_t
code
=
(
*
pTable
->
insertFp
)(
pOper
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
sdbError
(
"
table:%s, failed to insert record
:%s to hash, remove it"
,
pTable
->
tableName
,
sdbGet
KeyStrFromObj
(
pTable
,
pOper
->
pObj
));
sdbError
(
"
vgId:1, sdb:%s, failed to insert key
:%s to hash, remove it"
,
pTable
->
tableName
,
sdbGet
ObjStr
(
pTable
,
pOper
->
pObj
));
sdbDeleteHash
(
pTable
,
pOper
);
}
...
...
@@ -540,8 +538,8 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) {
int32_t
*
updateEnd
=
pOper
->
pObj
+
pTable
->
refCountPos
-
4
;
bool
set
=
atomic_val_compare_exchange_32
(
updateEnd
,
0
,
1
)
==
0
;
if
(
!
set
)
{
sdbError
(
"
table:%s, failed to delete record
:%s from hash, for it already removed"
,
pTable
->
tableName
,
sdbGet
KeyStrFromObj
(
pTable
,
pOper
->
pObj
));
sdbError
(
"
vgId:1, sdb:%s, failed to delete key
:%s from hash, for it already removed"
,
pTable
->
tableName
,
sdbGet
ObjStr
(
pTable
,
pOper
->
pObj
));
return
TSDB_CODE_MND_SDB_OBJ_NOT_THERE
;
}
...
...
@@ -559,8 +557,8 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) {
atomic_sub_fetch_32
(
&
pTable
->
numOfRows
,
1
);
sdbDebug
(
"
table:%s, delete record
:%s from hash, numOfRows:%"
PRId64
", msg:%p"
,
pTable
->
tableName
,
sdbGet
KeyStrFromObj
(
pTable
,
pOper
->
pObj
),
pTable
->
numOfRows
,
pOper
->
pMsg
);
sdbDebug
(
"
vgId:1, sdb:%s, delete key
:%s from hash, numOfRows:%"
PRId64
", msg:%p"
,
pTable
->
tableName
,
sdbGet
ObjStr
(
pTable
,
pOper
->
pObj
),
pTable
->
numOfRows
,
pOper
->
pMsg
);
sdbDecRef
(
pTable
,
pOper
->
pObj
);
...
...
@@ -568,8 +566,8 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) {
}
static
int32_t
sdbUpdateHash
(
SSdbTable
*
pTable
,
SSdbOper
*
pOper
)
{
sdbDebug
(
"
table:%s, update record
:%s in hash, numOfRows:%"
PRId64
", msg:%p"
,
pTable
->
tableName
,
sdbGet
KeyStrFromObj
(
pTable
,
pOper
->
pObj
),
pTable
->
numOfRows
,
pOper
->
pMsg
);
sdbDebug
(
"
vgId:1, sdb:%s, update key
:%s in hash, numOfRows:%"
PRId64
", msg:%p"
,
pTable
->
tableName
,
sdbGet
ObjStr
(
pTable
,
pOper
->
pObj
),
pTable
->
numOfRows
,
pOper
->
pMsg
);
(
*
pTable
->
updateFp
)(
pOper
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -594,12 +592,12 @@ static int sdbWrite(void *param, void *data, int32_t type, void *pMsg) {
// for data from WAL or forward, version may be smaller
if
(
pHead
->
version
<=
tsSdbObj
.
version
)
{
pthread_mutex_unlock
(
&
tsSdbObj
.
mutex
);
sdbDebug
(
"
table:%s, failed to restore %s record:%s from source(%d), ver:%"
PRId64
" too large, sdb ver:%"
PRId
64
,
sdbDebug
(
"
vgId:1, sdb:%s, failed to restore %s key:%s from source(%d), hver:%"
PRIu64
" too large, mver:%"
PRIu
64
,
pTable
->
tableName
,
sdbGetActionStr
(
action
),
sdbGetKeyStr
(
pTable
,
pHead
->
cont
),
type
,
pHead
->
version
,
tsSdbObj
.
version
);
return
TSDB_CODE_SUCCESS
;
}
else
if
(
pHead
->
version
!=
tsSdbObj
.
version
+
1
)
{
pthread_mutex_unlock
(
&
tsSdbObj
.
mutex
);
sdbError
(
"
table:%s, failed to restore %s record:%s from source(%d), ver:%"
PRId64
" too large, sdb ver:%"
PRId
64
,
sdbError
(
"
vgId:1, sdb:%s, failed to restore %s key:%s from source(%d), hver:%"
PRIu64
" too large, mver:%"
PRIu
64
,
pTable
->
tableName
,
sdbGetActionStr
(
action
),
sdbGetKeyStr
(
pTable
,
pHead
->
cont
),
type
,
pHead
->
version
,
tsSdbObj
.
version
);
return
TSDB_CODE_SYN_INVALID_VERSION
;
}
else
{
...
...
@@ -623,19 +621,19 @@ static int sdbWrite(void *param, void *data, int32_t type, void *pMsg) {
if
(
syncCode
<=
0
)
pOper
->
processedCount
=
1
;
if
(
syncCode
<
0
)
{
sdbError
(
"
table:%s, failed to forward request, result:%s action:%s record:%s ver:%"
PRId
64
", msg:%p"
,
pTable
->
tableName
,
sdbError
(
"
vgId:1, sdb:%s, failed to forward req since %s action:%s key:%s hver:%"
PRIu
64
", msg:%p"
,
pTable
->
tableName
,
tstrerror
(
syncCode
),
sdbGetActionStr
(
action
),
sdbGetKeyStr
(
pTable
,
pHead
->
cont
),
pHead
->
version
,
pOper
->
pMsg
);
}
else
if
(
syncCode
>
0
)
{
sdbDebug
(
"
table:%s, forward request is sent, action:%s record:%s ver:%"
PRId
64
", msg:%p"
,
pTable
->
tableName
,
sdbDebug
(
"
vgId:1, sdb:%s, forward req is sent, action:%s key:%s hver:%"
PRIu
64
", msg:%p"
,
pTable
->
tableName
,
sdbGetActionStr
(
action
),
sdbGetKeyStr
(
pTable
,
pHead
->
cont
),
pHead
->
version
,
pOper
->
pMsg
);
}
else
{
sdbTrace
(
"
table:%s, no need to send fwd request, action:%s record:%s ver:%"
PRId
64
", msg:%p"
,
pTable
->
tableName
,
sdbTrace
(
"
vgId:1, sdb:%s, no need to send fwd req, action:%s key:%s hver:%"
PRIu
64
", msg:%p"
,
pTable
->
tableName
,
sdbGetActionStr
(
action
),
sdbGetKeyStr
(
pTable
,
pHead
->
cont
),
pHead
->
version
,
pOper
->
pMsg
);
}
return
syncCode
;
}
sdbDebug
(
"
table:%s, record from wal/fwd is disposed, action:%s record:%s ver:%"
PRId
64
,
pTable
->
tableName
,
sdbDebug
(
"
vgId:1, sdb:%s, record from wal/fwd is disposed, action:%s key:%s hver:%"
PRIu
64
,
pTable
->
tableName
,
sdbGetActionStr
(
action
),
sdbGetKeyStr
(
pTable
,
pHead
->
cont
),
pHead
->
version
);
// even it is WAL/FWD, it shall be called to update version in sync
...
...
@@ -649,7 +647,7 @@ static int sdbWrite(void *param, void *data, int32_t type, void *pMsg) {
}
else
if
(
action
==
SDB_ACTION_DELETE
)
{
void
*
pRow
=
sdbGetRowMeta
(
pTable
,
pHead
->
cont
);
if
(
pRow
==
NULL
)
{
sdbDebug
(
"
table
:%s, object:%s not exist in hash, ignore delete action"
,
pTable
->
tableName
,
sdbDebug
(
"
vgId:1, sdb
:%s, object:%s not exist in hash, ignore delete action"
,
pTable
->
tableName
,
sdbGetKeyStr
(
pTable
,
pHead
->
cont
));
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -658,7 +656,7 @@ static int sdbWrite(void *param, void *data, int32_t type, void *pMsg) {
}
else
if
(
action
==
SDB_ACTION_UPDATE
)
{
void
*
pRow
=
sdbGetRowMeta
(
pTable
,
pHead
->
cont
);
if
(
pRow
==
NULL
)
{
sdbDebug
(
"
table
:%s, object:%s not exist in hash, ignore update action"
,
pTable
->
tableName
,
sdbDebug
(
"
vgId:1, sdb
:%s, object:%s not exist in hash, ignore update action"
,
pTable
->
tableName
,
sdbGetKeyStr
(
pTable
,
pHead
->
cont
));
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -675,8 +673,8 @@ int32_t sdbInsertRow(SSdbOper *pOper) {
if
(
pTable
==
NULL
)
return
TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE
;
if
(
sdbGetRowFromObj
(
pTable
,
pOper
->
pObj
))
{
sdbError
(
"
table:%s, failed to insert record
:%s, already exist"
,
pTable
->
tableName
,
sdbGet
KeyStrFromObj
(
pTable
,
pOper
->
pObj
));
sdbError
(
"
vgId:1, sdb:%s, failed to insert key
:%s, already exist"
,
pTable
->
tableName
,
sdbGet
ObjStr
(
pTable
,
pOper
->
pObj
));
sdbDecRef
(
pTable
,
pOper
->
pObj
);
return
TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE
;
}
...
...
@@ -692,7 +690,7 @@ int32_t sdbInsertRow(SSdbOper *pOper) {
int32_t
code
=
sdbInsertHash
(
pTable
,
pOper
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
sdbError
(
"
table
:%s, failed to insert into hash"
,
pTable
->
tableName
);
sdbError
(
"
vgId:1, sdb
:%s, failed to insert into hash"
,
pTable
->
tableName
);
return
code
;
}
...
...
@@ -727,8 +725,8 @@ int32_t sdbInsertRowImp(SSdbOper *pOper) {
memcpy
(
pNewOper
,
pOper
,
sizeof
(
SSdbOper
));
if
(
pNewOper
->
pMsg
!=
NULL
)
{
sdbDebug
(
"
app:%p:%p, table:%s record
:%p:%s, insert action is add to sdb queue"
,
pNewOper
->
pMsg
->
rpcMsg
.
ahandle
,
pNewOper
->
pMsg
,
pTable
->
tableName
,
pOper
->
pObj
,
sdbGet
KeyStrFromObj
(
pTable
,
pOper
->
pObj
));
sdbDebug
(
"
vgId:1, ahandle:%p msg:%p, sdb:%s key
:%p:%s, insert action is add to sdb queue"
,
pNewOper
->
pMsg
->
rpcMsg
.
ahandle
,
pNewOper
->
pMsg
,
pTable
->
tableName
,
pOper
->
pObj
,
sdbGet
ObjStr
(
pTable
,
pOper
->
pObj
));
}
sdbIncRef
(
pNewOper
->
table
,
pNewOper
->
pObj
);
...
...
@@ -751,7 +749,7 @@ int32_t sdbDeleteRow(SSdbOper *pOper) {
void
*
pRow
=
sdbGetRowMetaFromObj
(
pTable
,
pOper
->
pObj
);
if
(
pRow
==
NULL
)
{
sdbDebug
(
"
table
:%s, record is not there, delete failed"
,
pTable
->
tableName
);
sdbDebug
(
"
vgId:1, sdb
:%s, record is not there, delete failed"
,
pTable
->
tableName
);
return
TSDB_CODE_MND_SDB_OBJ_NOT_THERE
;
}
...
...
@@ -759,7 +757,7 @@ int32_t sdbDeleteRow(SSdbOper *pOper) {
int32_t
code
=
sdbDeleteHash
(
pTable
,
pOper
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
sdbError
(
"
table
:%s, failed to delete from hash"
,
pTable
->
tableName
);
sdbError
(
"
vgId:1, sdb
:%s, failed to delete from hash"
,
pTable
->
tableName
);
sdbDecRef
(
pTable
,
pOper
->
pObj
);
return
code
;
}
...
...
@@ -795,8 +793,8 @@ int32_t sdbDeleteRowImp(SSdbOper *pOper) {
memcpy
(
pNewOper
,
pOper
,
sizeof
(
SSdbOper
));
if
(
pNewOper
->
pMsg
!=
NULL
)
{
sdbDebug
(
"
app:%p:%p, table:%s record
:%p:%s, delete action is add to sdb queue"
,
pNewOper
->
pMsg
->
rpcMsg
.
ahandle
,
pNewOper
->
pMsg
,
pTable
->
tableName
,
pOper
->
pObj
,
sdbGet
KeyStrFromObj
(
pTable
,
pOper
->
pObj
));
sdbDebug
(
"
vgId:1, ahandle:%p msg:%p, sdb:%s key
:%p:%s, delete action is add to sdb queue"
,
pNewOper
->
pMsg
->
rpcMsg
.
ahandle
,
pNewOper
->
pMsg
,
pTable
->
tableName
,
pOper
->
pObj
,
sdbGet
ObjStr
(
pTable
,
pOper
->
pObj
));
}
taosWriteQitem
(
tsSdbWriteQueue
,
TAOS_QTYPE_RPC
,
pNewOper
);
...
...
@@ -810,13 +808,13 @@ int32_t sdbUpdateRow(SSdbOper *pOper) {
void
*
pRow
=
sdbGetRowMetaFromObj
(
pTable
,
pOper
->
pObj
);
if
(
pRow
==
NULL
)
{
sdbDebug
(
"
table
:%s, record is not there, update failed"
,
pTable
->
tableName
);
sdbDebug
(
"
vgId:1, sdb
:%s, record is not there, update failed"
,
pTable
->
tableName
);
return
TSDB_CODE_MND_SDB_OBJ_NOT_THERE
;
}
int32_t
code
=
sdbUpdateHash
(
pTable
,
pOper
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
sdbError
(
"
table
:%s, failed to update hash"
,
pTable
->
tableName
);
sdbError
(
"
vgId:1, sdb
:%s, failed to update hash"
,
pTable
->
tableName
);
return
code
;
}
...
...
@@ -850,8 +848,8 @@ int32_t sdbUpdateRowImp(SSdbOper *pOper) {
memcpy
(
pNewOper
,
pOper
,
sizeof
(
SSdbOper
));
if
(
pNewOper
->
pMsg
!=
NULL
)
{
sdbDebug
(
"
app:%p:%p, table:%s record
:%p:%s, update action is add to sdb queue"
,
pNewOper
->
pMsg
->
rpcMsg
.
ahandle
,
pNewOper
->
pMsg
,
pTable
->
tableName
,
pOper
->
pObj
,
sdbGet
KeyStrFromObj
(
pTable
,
pOper
->
pObj
));
sdbDebug
(
"
vgId:1, ahandle:%p msg:%p, sdb:%s key
:%p:%s, update action is add to sdb queue"
,
pNewOper
->
pMsg
->
rpcMsg
.
ahandle
,
pNewOper
->
pMsg
,
pTable
->
tableName
,
pOper
->
pObj
,
sdbGet
ObjStr
(
pTable
,
pOper
->
pObj
));
}
sdbIncRef
(
pNewOper
->
table
,
pNewOper
->
pObj
);
...
...
@@ -948,7 +946,7 @@ void sdbCloseTable(void *handle) {
taosHashCleanup
(
pTable
->
iHandle
);
pthread_mutex_destroy
(
&
pTable
->
mutex
);
sdbDebug
(
"
table
:%s, is closed, numOfTables:%d"
,
pTable
->
tableName
,
tsSdbObj
.
numOfTables
);
sdbDebug
(
"
vgId:1, sdb
:%s, is closed, numOfTables:%d"
,
pTable
->
tableName
,
tsSdbObj
.
numOfTables
);
free
(
pTable
);
}
...
...
@@ -964,7 +962,7 @@ int32_t sdbInitWriteWorker() {
sdbAllocWriteQueue
();
mInfo
(
"sdb write is opened"
);
mInfo
(
"
vgId:1,
sdb write is opened"
);
return
0
;
}
...
...
@@ -986,7 +984,7 @@ void sdbCleanupWriteWorker() {
sdbFreeWritequeue
();
tfree
(
tsSdbPool
.
writeWorker
);
mInfo
(
"sdb write is closed"
);
mInfo
(
"
vgId:1,
sdb write is closed"
);
}
int32_t
sdbAllocWriteQueue
()
{
...
...
@@ -1072,7 +1070,7 @@ static void *sdbWorkerFp(void *param) {
pOper
->
processedCount
=
1
;
pHead
=
(
void
*
)
pOper
+
sizeof
(
SSdbOper
)
+
SDB_SYNC_HACK
;
if
(
pOper
->
pMsg
!=
NULL
)
{
sdbDebug
(
"
app:%p:%p, table:%s record:%p:%s
ver:%"
PRIu64
", will be processed in sdb queue"
,
sdbDebug
(
"
vgId:1, ahandle:%p msg:%p, sdb:%s key:%p:%s h
ver:%"
PRIu64
", will be processed in sdb queue"
,
pOper
->
pMsg
->
rpcMsg
.
ahandle
,
pOper
->
pMsg
,
((
SSdbTable
*
)
pOper
->
table
)
->
tableName
,
pOper
->
pObj
,
sdbGetKeyStr
(
pOper
->
table
,
pHead
->
cont
),
pHead
->
version
);
}
...
...
src/sync/inc/syncInt.h
浏览文件 @
9d6e3776
...
...
@@ -106,7 +106,7 @@ typedef struct {
int8_t
nacks
;
int8_t
confirmed
;
int32_t
code
;
uint64_t
time
;
int64_t
time
;
}
SFwdInfo
;
typedef
struct
{
...
...
src/sync/src/syncMain.c
浏览文件 @
9d6e3776
...
...
@@ -689,7 +689,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne
if
(
pMaster
)
{
// master is there
pNode
->
pMaster
=
pMaster
;
sDebug
(
"%s, it is the master, ver:%"
PRIu64
,
pMaster
->
id
,
pMaster
->
version
);
sDebug
(
"%s, it is the master,
s
ver:%"
PRIu64
,
pMaster
->
id
,
pMaster
->
version
);
if
(
syncValidateMaster
(
pPeer
)
<
0
)
return
;
...
...
@@ -697,7 +697,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne
if
(
nodeVersion
<
pMaster
->
version
)
{
syncRequired
=
1
;
}
else
{
sInfo
(
"%s is master, work as slave, ver:%"
PRIu64
,
pMaster
->
id
,
pMaster
->
version
);
sInfo
(
"%s is master, work as slave,
s
ver:%"
PRIu64
,
pMaster
->
id
,
pMaster
->
version
);
nodeRole
=
TAOS_SYNC_ROLE_SLAVE
;
(
*
pNode
->
notifyRole
)(
pNode
->
ahandle
,
nodeRole
);
}
...
...
@@ -854,7 +854,7 @@ static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) {
SSyncFwds
*
pSyncFwds
=
pNode
->
pSyncFwds
;
SFwdInfo
*
pFwdInfo
;
sDebug
(
"%s, forward-rsp is received, code:%x ver:%"
PRIu64
,
pPeer
->
id
,
pFwdRsp
->
code
,
pFwdRsp
->
version
);
sDebug
(
"%s, forward-rsp is received, code:%x
h
ver:%"
PRIu64
,
pPeer
->
id
,
pFwdRsp
->
code
,
pFwdRsp
->
version
);
SFwdInfo
*
pFirst
=
pSyncFwds
->
fwdInfo
+
pSyncFwds
->
first
;
if
(
pFirst
->
version
<=
pFwdRsp
->
version
&&
pSyncFwds
->
fwds
>
0
)
{
...
...
@@ -891,7 +891,7 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) {
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SPeersStatus
*
pPeersStatus
=
(
SPeersStatus
*
)
cont
;
sDebug
(
"%s, status msg is received, self:%s
ver:%"
PRIu64
" peer:%s
ver:%"
PRIu64
", ack:%d"
,
pPeer
->
id
,
sDebug
(
"%s, status msg is received, self:%s
sver:%"
PRIu64
" peer:%s s
ver:%"
PRIu64
", ack:%d"
,
pPeer
->
id
,
syncRole
[
nodeRole
],
nodeVersion
,
syncRole
[
pPeersStatus
->
role
],
pPeersStatus
->
version
,
pPeersStatus
->
ack
);
pPeer
->
version
=
pPeersStatus
->
version
;
...
...
@@ -979,7 +979,7 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) {
int32_t
retLen
=
write
(
pPeer
->
peerFd
,
msg
,
statusMsgLen
);
if
(
retLen
==
statusMsgLen
)
{
sDebug
(
"%s, status msg is sent, self:%s ver:%"
PRIu64
", ack:%d"
,
pPeer
->
id
,
syncRole
[
pPeersStatus
->
role
],
sDebug
(
"%s, status msg is sent, self:%s
s
ver:%"
PRIu64
", ack:%d"
,
pPeer
->
id
,
syncRole
[
pPeersStatus
->
role
],
pPeersStatus
->
version
,
pPeersStatus
->
ack
);
}
else
{
sDebug
(
"%s, failed to send status msg, restart"
,
pPeer
->
id
);
...
...
@@ -1154,7 +1154,7 @@ static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) {
pFwdInfo
->
time
=
time
;
pSyncFwds
->
fwds
++
;
sDebug
(
"vgId:%d, fwd info is saved, ver:%"
PRIu64
" fwds:%d "
,
pNode
->
vgId
,
version
,
pSyncFwds
->
fwds
);
sDebug
(
"vgId:%d, fwd info is saved,
h
ver:%"
PRIu64
" fwds:%d "
,
pNode
->
vgId
,
version
,
pSyncFwds
->
fwds
);
}
static
void
syncRemoveConfirmedFwdInfo
(
SSyncNode
*
pNode
)
{
...
...
@@ -1168,7 +1168,7 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) {
pSyncFwds
->
first
=
(
pSyncFwds
->
first
+
1
)
%
tsMaxFwdInfo
;
pSyncFwds
->
fwds
--
;
if
(
pSyncFwds
->
fwds
==
0
)
pSyncFwds
->
first
=
pSyncFwds
->
last
;
// sDebug("vgId:%d, fwd info is removed, ver:%d, fwds:%d",
// sDebug("vgId:%d, fwd info is removed,
h
ver:%d, fwds:%d",
// pNode->vgId, pFwdInfo->version, pSyncFwds->fwds);
memset
(
pFwdInfo
,
0
,
sizeof
(
SFwdInfo
));
}
...
...
@@ -1191,7 +1191,7 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code
}
if
(
confirm
&&
pFwdInfo
->
confirmed
==
0
)
{
sDebug
(
"vgId:%d, forward is confirmed, ver:%"
PRIu64
" code:%x"
,
pNode
->
vgId
,
pFwdInfo
->
version
,
pFwdInfo
->
code
);
sDebug
(
"vgId:%d, forward is confirmed,
h
ver:%"
PRIu64
" code:%x"
,
pNode
->
vgId
,
pFwdInfo
->
version
,
pFwdInfo
->
code
);
(
*
pNode
->
confirmForward
)(
pNode
->
ahandle
,
pFwdInfo
->
mhandle
,
pFwdInfo
->
code
);
pFwdInfo
->
confirmed
=
1
;
}
...
...
@@ -1204,14 +1204,17 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
SSyncFwds
*
pSyncFwds
=
pNode
->
pSyncFwds
;
if
(
pSyncFwds
)
{
;
uint64_t
time
=
taosGetTimestampMs
();
if
(
pSyncFwds
)
{
int64_t
time
=
taosGetTimestampMs
();
if
(
pSyncFwds
->
fwds
>
0
)
{
pthread_mutex_lock
(
&
(
pNode
->
mutex
));
for
(
int32_t
i
=
0
;
i
<
pSyncFwds
->
fwds
;
++
i
)
{
SFwdInfo
*
pFwdInfo
=
pSyncFwds
->
fwdInfo
+
(
pSyncFwds
->
first
+
i
)
%
tsMaxFwdInfo
;
if
(
time
-
pFwdInfo
->
time
<
2000
)
break
;
if
(
ABS
(
time
-
pFwdInfo
->
time
)
<
2000
)
break
;
sDebug
(
"vgId:%d, forward info expired, hver:%"
PRIu64
" curtime:%"
PRIu64
" savetime:%"
PRIu64
,
pNode
->
vgId
,
pFwdInfo
->
version
,
time
,
pFwdInfo
->
time
);
syncProcessFwdAck
(
pNode
,
pFwdInfo
,
TSDB_CODE_RPC_NETWORK_UNAVAIL
);
}
...
...
@@ -1234,7 +1237,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
if
(
pWalHead
->
version
>
nodeVersion
+
1
)
{
sError
(
"vgId:%d, hver:%"
PRIu64
", inconsistent with ver:%"
PRIu64
,
pNode
->
vgId
,
pWalHead
->
version
,
nodeVersion
);
sError
(
"vgId:%d, hver:%"
PRIu64
", inconsistent with
s
ver:%"
PRIu64
,
pNode
->
vgId
,
pWalHead
->
version
,
nodeVersion
);
if
(
nodeRole
==
TAOS_SYNC_ROLE_SLAVE
)
{
sInfo
(
"vgId:%d, restart connection"
,
pNode
->
vgId
);
for
(
int32_t
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
...
...
@@ -1277,9 +1280,9 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
int32_t
retLen
=
write
(
pPeer
->
peerFd
,
pSyncHead
,
fwdLen
);
if
(
retLen
==
fwdLen
)
{
sDebug
(
"%s, forward is sent, ver:%"
PRIu64
" contLen:%d"
,
pPeer
->
id
,
pWalHead
->
version
,
pWalHead
->
len
);
sDebug
(
"%s, forward is sent,
h
ver:%"
PRIu64
" contLen:%d"
,
pPeer
->
id
,
pWalHead
->
version
,
pWalHead
->
len
);
}
else
{
sError
(
"%s, failed to forward, ver:%"
PRIu64
" retLen:%d"
,
pPeer
->
id
,
pWalHead
->
version
,
retLen
);
sError
(
"%s, failed to forward,
h
ver:%"
PRIu64
" retLen:%d"
,
pPeer
->
id
,
pWalHead
->
version
,
retLen
);
syncRestartConnection
(
pPeer
);
}
}
...
...
src/sync/src/syncRestore.c
浏览文件 @
9d6e3776
...
...
@@ -214,7 +214,7 @@ int32_t syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) {
memcpy
(
pRecv
->
offset
,
pHead
,
len
);
pRecv
->
offset
+=
len
;
pRecv
->
forwards
++
;
sDebug
(
"%s, fwd is saved into queue, ver:%"
PRIu64
" fwds:%d"
,
pPeer
->
id
,
pHead
->
version
,
pRecv
->
forwards
);
sDebug
(
"%s, fwd is saved into queue,
h
ver:%"
PRIu64
" fwds:%d"
,
pPeer
->
id
,
pHead
->
version
,
pRecv
->
forwards
);
}
else
{
sError
(
"%s, buffer size:%d is too small"
,
pPeer
->
id
,
pRecv
->
bufferSize
);
pRecv
->
code
=
-
1
;
// set error code
...
...
src/sync/src/syncRetrieve.c
浏览文件 @
9d6e3776
...
...
@@ -268,7 +268,7 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi
break
;
}
sDebug
(
"%s, last wal is forwarded, ver:%"
PRIu64
,
pPeer
->
id
,
pHead
->
version
);
sDebug
(
"%s, last wal is forwarded,
h
ver:%"
PRIu64
,
pPeer
->
id
,
pHead
->
version
);
int32_t
ret
=
taosWriteMsg
(
pPeer
->
syncFd
,
pHead
,
wsize
);
if
(
ret
!=
wsize
)
break
;
pPeer
->
sversion
=
pHead
->
version
;
...
...
src/wal/src/walWrite.c
浏览文件 @
9d6e3776
...
...
@@ -144,9 +144,9 @@ void walFsync(void *handle, bool forceFsync) {
if
(
pWal
==
NULL
||
pWal
->
fd
<
0
)
return
;
if
(
forceFsync
||
(
pWal
->
level
==
TAOS_WAL_FSYNC
&&
pWal
->
fsyncPeriod
==
0
))
{
wTrace
(
"vgId:%d, file
:%s, do fsync"
,
pWal
->
vgId
,
pWal
->
name
);
wTrace
(
"vgId:%d, file
Id:%"
PRId64
", do fsync"
,
pWal
->
vgId
,
pWal
->
fileId
);
if
(
fsync
(
pWal
->
fd
)
<
0
)
{
wError
(
"vgId:%d, file
:%s, fsync failed since %s"
,
pWal
->
vgId
,
pWal
->
name
,
strerror
(
errno
));
wError
(
"vgId:%d, file
Id:%"
PRId64
", fsync failed since %s"
,
pWal
->
vgId
,
pWal
->
fileId
,
strerror
(
errno
));
}
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录