Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
cf5bcc4b
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
cf5bcc4b
编写于
11月 18, 2020
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
TD-2046
上级
c1e1dd64
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
95 addition
and
94 deletion
+95
-94
src/mnode/inc/mnodeSdb.h
src/mnode/inc/mnodeSdb.h
+2
-2
src/mnode/src/mnodeAcct.c
src/mnode/src/mnodeAcct.c
+6
-6
src/mnode/src/mnodeCluster.c
src/mnode/src/mnodeCluster.c
+6
-6
src/mnode/src/mnodeDb.c
src/mnode/src/mnodeDb.c
+4
-4
src/mnode/src/mnodeDnode.c
src/mnode/src/mnodeDnode.c
+4
-4
src/mnode/src/mnodeMnode.c
src/mnode/src/mnodeMnode.c
+4
-4
src/mnode/src/mnodeSdb.c
src/mnode/src/mnodeSdb.c
+55
-54
src/mnode/src/mnodeTable.c
src/mnode/src/mnodeTable.c
+4
-4
src/mnode/src/mnodeUser.c
src/mnode/src/mnodeUser.c
+6
-6
src/mnode/src/mnodeVgroup.c
src/mnode/src/mnodeVgroup.c
+4
-4
未找到文件。
src/mnode/inc/mnodeSdb.h
浏览文件 @
cf5bcc4b
...
...
@@ -63,11 +63,11 @@ typedef struct SSWriteMsg {
}
SSWriteMsg
;
typedef
struct
{
char
*
tableN
ame
;
char
*
n
ame
;
int32_t
hashSessions
;
int32_t
maxRowSize
;
int32_t
refCountPos
;
ESdbTable
tableI
d
;
ESdbTable
i
d
;
ESdbKey
keyType
;
int32_t
(
*
fpInsert
)(
SSWriteMsg
*
pWrite
);
int32_t
(
*
fpDelete
)(
SSWriteMsg
*
pWrite
);
...
...
src/mnode/src/mnodeAcct.c
浏览文件 @
cf5bcc4b
...
...
@@ -99,9 +99,9 @@ int32_t mnodeInitAccts() {
SAcctObj
tObj
;
tsAcctUpdateSize
=
(
int8_t
*
)
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
;
SSdbTableDesc
tableD
esc
=
{
.
tableId
=
SDB_TABLE_ACCOUNT
,
.
tableName
=
"accounts"
,
SSdbTableDesc
d
esc
=
{
.
id
=
SDB_TABLE_ACCOUNT
,
.
name
=
"accounts"
,
.
hashSessions
=
TSDB_DEFAULT_ACCOUNTS_HASH_SIZE
,
.
maxRowSize
=
tsAcctUpdateSize
,
.
refCountPos
=
(
int8_t
*
)(
&
tObj
.
refCount
)
-
(
int8_t
*
)
&
tObj
,
...
...
@@ -115,13 +115,13 @@ int32_t mnodeInitAccts() {
.
fpRestored
=
mnodeAcctActionRestored
};
tsAcctSdb
=
sdbOpenTable
(
&
tableD
esc
);
tsAcctSdb
=
sdbOpenTable
(
&
d
esc
);
if
(
tsAcctSdb
==
NULL
)
{
mError
(
"table:%s, failed to create hash"
,
tableDesc
.
tableN
ame
);
mError
(
"table:%s, failed to create hash"
,
desc
.
n
ame
);
return
-
1
;
}
mDebug
(
"table:%s, hash is created"
,
tableDesc
.
tableN
ame
);
mDebug
(
"table:%s, hash is created"
,
desc
.
n
ame
);
return
TSDB_CODE_SUCCESS
;
}
...
...
src/mnode/src/mnodeCluster.c
浏览文件 @
cf5bcc4b
...
...
@@ -84,9 +84,9 @@ int32_t mnodeInitCluster() {
SClusterObj
tObj
;
tsClusterUpdateSize
=
(
int8_t
*
)
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
;
SSdbTableDesc
tableD
esc
=
{
.
tableId
=
SDB_TABLE_CLUSTER
,
.
tableName
=
"cluster"
,
SSdbTableDesc
d
esc
=
{
.
id
=
SDB_TABLE_CLUSTER
,
.
name
=
"cluster"
,
.
hashSessions
=
TSDB_DEFAULT_CLUSTER_HASH_SIZE
,
.
maxRowSize
=
tsClusterUpdateSize
,
.
refCountPos
=
(
int8_t
*
)(
&
tObj
.
refCount
)
-
(
int8_t
*
)
&
tObj
,
...
...
@@ -100,16 +100,16 @@ int32_t mnodeInitCluster() {
.
fpRestored
=
mnodeClusterActionRestored
};
tsClusterSdb
=
sdbOpenTable
(
&
tableD
esc
);
tsClusterSdb
=
sdbOpenTable
(
&
d
esc
);
if
(
tsClusterSdb
==
NULL
)
{
mError
(
"table:%s, failed to create hash"
,
tableDesc
.
tableN
ame
);
mError
(
"table:%s, failed to create hash"
,
desc
.
n
ame
);
return
-
1
;
}
mnodeAddShowMetaHandle
(
TSDB_MGMT_TABLE_CLUSTER
,
mnodeGetClusterMeta
);
mnodeAddShowRetrieveHandle
(
TSDB_MGMT_TABLE_CLUSTER
,
mnodeRetrieveClusters
);
mDebug
(
"table:%s, hash is created"
,
tableDesc
.
tableN
ame
);
mDebug
(
"table:%s, hash is created"
,
desc
.
n
ame
);
return
TSDB_CODE_SUCCESS
;
}
...
...
src/mnode/src/mnodeDb.c
浏览文件 @
cf5bcc4b
...
...
@@ -144,9 +144,9 @@ int32_t mnodeInitDbs() {
SDbObj
tObj
;
tsDbUpdateSize
=
(
int8_t
*
)
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
;
SSdbTableDesc
tableD
esc
=
{
.
tableId
=
SDB_TABLE_DB
,
.
tableName
=
"dbs"
,
SSdbTableDesc
d
esc
=
{
.
id
=
SDB_TABLE_DB
,
.
name
=
"dbs"
,
.
hashSessions
=
TSDB_DEFAULT_DBS_HASH_SIZE
,
.
maxRowSize
=
tsDbUpdateSize
,
.
refCountPos
=
(
int8_t
*
)(
&
tObj
.
refCount
)
-
(
int8_t
*
)
&
tObj
,
...
...
@@ -160,7 +160,7 @@ int32_t mnodeInitDbs() {
.
fpRestored
=
mnodeDbActionRestored
};
tsDbSdb
=
sdbOpenTable
(
&
tableD
esc
);
tsDbSdb
=
sdbOpenTable
(
&
d
esc
);
if
(
tsDbSdb
==
NULL
)
{
mError
(
"failed to init db data"
);
return
-
1
;
...
...
src/mnode/src/mnodeDnode.c
浏览文件 @
cf5bcc4b
...
...
@@ -171,9 +171,9 @@ int32_t mnodeInitDnodes() {
tsDnodeUpdateSize
=
(
int8_t
*
)
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
;
pthread_mutex_init
(
&
tsDnodeEpsMutex
,
NULL
);
SSdbTableDesc
tableD
esc
=
{
.
tableId
=
SDB_TABLE_DNODE
,
.
tableName
=
"dnodes"
,
SSdbTableDesc
d
esc
=
{
.
id
=
SDB_TABLE_DNODE
,
.
name
=
"dnodes"
,
.
hashSessions
=
TSDB_DEFAULT_DNODES_HASH_SIZE
,
.
maxRowSize
=
tsDnodeUpdateSize
,
.
refCountPos
=
(
int8_t
*
)(
&
tObj
.
refCount
)
-
(
int8_t
*
)
&
tObj
,
...
...
@@ -187,7 +187,7 @@ int32_t mnodeInitDnodes() {
.
fpRestored
=
mnodeDnodeActionRestored
};
tsDnodeSdb
=
sdbOpenTable
(
&
tableD
esc
);
tsDnodeSdb
=
sdbOpenTable
(
&
d
esc
);
if
(
tsDnodeSdb
==
NULL
)
{
mError
(
"failed to init dnodes data"
);
return
-
1
;
...
...
src/mnode/src/mnodeMnode.c
浏览文件 @
cf5bcc4b
...
...
@@ -137,9 +137,9 @@ int32_t mnodeInitMnodes() {
SMnodeObj
tObj
;
tsMnodeUpdateSize
=
(
int8_t
*
)
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
;
SSdbTableDesc
tableD
esc
=
{
.
tableId
=
SDB_TABLE_MNODE
,
.
tableName
=
"mnodes"
,
SSdbTableDesc
d
esc
=
{
.
id
=
SDB_TABLE_MNODE
,
.
name
=
"mnodes"
,
.
hashSessions
=
TSDB_DEFAULT_MNODES_HASH_SIZE
,
.
maxRowSize
=
tsMnodeUpdateSize
,
.
refCountPos
=
(
int8_t
*
)(
&
tObj
.
refCount
)
-
(
int8_t
*
)
&
tObj
,
...
...
@@ -153,7 +153,7 @@ int32_t mnodeInitMnodes() {
.
fpRestored
=
mnodeMnodeActionRestored
};
tsMnodeSdb
=
sdbOpenTable
(
&
tableD
esc
);
tsMnodeSdb
=
sdbOpenTable
(
&
d
esc
);
if
(
tsMnodeSdb
==
NULL
)
{
mError
(
"failed to init mnodes data"
);
return
-
1
;
...
...
src/mnode/src/mnodeSdb.c
浏览文件 @
cf5bcc4b
...
...
@@ -56,8 +56,8 @@ char *actStr[] = {
};
typedef
struct
SSdbTable
{
char
tableN
ame
[
SDB_TABLE_LEN
];
ESdbTable
tableI
d
;
char
n
ame
[
SDB_TABLE_LEN
];
ESdbTable
i
d
;
ESdbKey
keyType
;
int32_t
hashSessions
;
int32_t
maxRowSize
;
...
...
@@ -201,7 +201,7 @@ static void sdbRestoreTables() {
totalRows
+=
pTable
->
numOfRows
;
numOfTables
++
;
sdbDebug
(
"vgId:1, sdb:%s is restored, rows:%"
PRId64
,
pTable
->
tableN
ame
,
pTable
->
numOfRows
);
sdbDebug
(
"vgId:1, sdb:%s is restored, rows:%"
PRId64
,
pTable
->
n
ame
,
pTable
->
numOfRows
);
}
sdbInfo
(
"vgId:1, sdb is restored, mver:%"
PRIu64
" rows:%d tables:%d"
,
tsSdbMgmt
.
version
,
totalRows
,
numOfTables
);
...
...
@@ -248,6 +248,21 @@ static void sdbNotifyRole(void *ahandle, int8_t role) {
sdbUpdateMnodeRoles
();
}
// failed to forward, need revert insert
static
void
sdbHandleFailedConfirm
(
SSWriteMsg
*
pWrite
)
{
SWalHead
*
pHead
=
(
SWalHead
*
)((
char
*
)
pWrite
+
sizeof
(
SSWriteMsg
)
+
SDB_SYNC_HACK
);
int32_t
action
=
pHead
->
msgType
%
10
;
sdbError
(
"vgId:1, row:%p:%s hver:%"
PRIu64
" action:%s, failed to foward since %s"
,
pWrite
->
pRow
,
sdbGetKeyStr
(
pWrite
->
pTable
,
pHead
->
cont
),
pHead
->
version
,
actStr
[
action
],
tstrerror
(
pWrite
->
code
));
// It's better to create a table in two stages, create it first and then set it success
if
(
action
==
SDB_ACTION_INSERT
)
{
SSWriteMsg
wmsg
=
{.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
pWrite
->
pTable
,
.
pRow
=
pWrite
->
pRow
};
sdbDeleteRow
(
&
wmsg
);
}
}
FORCE_INLINE
static
void
sdbConfirmForward
(
void
*
ahandle
,
void
*
wparam
,
int32_t
code
)
{
if
(
wparam
==
NULL
)
return
;
...
...
@@ -263,26 +278,12 @@ static void sdbConfirmForward(void *ahandle, void *wparam, int32_t code) {
if
(
pMsg
!=
NULL
)
sdbTrace
(
"vgId:1, msg:%p is confirmed, code:%x"
,
pMsg
,
code
);
}
// failed to forward, need revert insert
if
(
pWrite
->
code
!=
TSDB_CODE_SUCCESS
)
{
SWalHead
*
pHead
=
(
SWalHead
*
)((
char
*
)
pWrite
+
sizeof
(
SSWriteMsg
)
+
SDB_SYNC_HACK
);
int32_t
action
=
pHead
->
msgType
%
10
;
sdbError
(
"vgId:1, row:%p:%s hver:%"
PRIu64
" action:%s, failed to foward since %s"
,
pWrite
->
pRow
,
sdbGetKeyStr
(
pWrite
->
pTable
,
pHead
->
cont
),
pHead
->
version
,
actStr
[
action
],
tstrerror
(
pWrite
->
code
));
if
(
action
==
SDB_ACTION_INSERT
)
{
// It's better to create a table in two stages, create it first and then set it success
SSWriteMsg
wmsg
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
pTable
=
pWrite
->
pTable
,
.
pRow
=
pWrite
->
pRow
};
sdbDeleteRow
(
&
wmsg
);
}
}
if
(
pWrite
->
code
!=
TSDB_CODE_SUCCESS
)
sdbHandleFailedConfirm
(
pWrite
);
if
(
pWrite
->
fpRsp
!=
NULL
)
{
pWrite
->
code
=
(
*
pWrite
->
fpRsp
)(
pMsg
,
pWrite
->
code
);
}
dnodeSendRpcMWriteRsp
(
pMsg
,
pWrite
->
code
);
// if ahandle, means this func is called by sdb write
...
...
@@ -439,7 +440,7 @@ void sdbIncRef(void *tparam, void *pRow) {
SSdbTable
*
pTable
=
tparam
;
int32_t
*
pRefCount
=
(
int32_t
*
)(
pRow
+
pTable
->
refCountPos
);
int32_t
refCount
=
atomic_add_fetch_32
(
pRefCount
,
1
);
sdbTrace
(
"vgId:1, sdb:%s, inc ref to row:%p:%s:%d"
,
pTable
->
tableN
ame
,
pRow
,
sdbGetRowStr
(
pTable
,
pRow
),
refCount
);
sdbTrace
(
"vgId:1, sdb:%s, inc ref to row:%p:%s:%d"
,
pTable
->
n
ame
,
pRow
,
sdbGetRowStr
(
pTable
,
pRow
),
refCount
);
}
void
sdbDecRef
(
void
*
tparam
,
void
*
pRow
)
{
...
...
@@ -448,11 +449,11 @@ void sdbDecRef(void *tparam, void *pRow) {
SSdbTable
*
pTable
=
tparam
;
int32_t
*
pRefCount
=
(
int32_t
*
)(
pRow
+
pTable
->
refCountPos
);
int32_t
refCount
=
atomic_sub_fetch_32
(
pRefCount
,
1
);
sdbTrace
(
"vgId:1, sdb:%s, dec ref to row:%p:%s:%d"
,
pTable
->
tableN
ame
,
pRow
,
sdbGetRowStr
(
pTable
,
pRow
),
refCount
);
sdbTrace
(
"vgId:1, sdb:%s, dec ref to row:%p:%s:%d"
,
pTable
->
n
ame
,
pRow
,
sdbGetRowStr
(
pTable
,
pRow
),
refCount
);
int32_t
*
updateEnd
=
pRow
+
pTable
->
refCountPos
-
4
;
if
(
refCount
<=
0
&&
*
updateEnd
)
{
sdbTrace
(
"vgId:1, sdb:%s, row:%p:%s:%d destroyed"
,
pTable
->
tableN
ame
,
pRow
,
sdbGetRowStr
(
pTable
,
pRow
),
refCount
);
sdbTrace
(
"vgId:1, sdb:%s, row:%p:%s:%d destroyed"
,
pTable
->
n
ame
,
pRow
,
sdbGetRowStr
(
pTable
,
pRow
),
refCount
);
SSWriteMsg
wmsg
=
{.
pRow
=
pRow
};
(
*
pTable
->
fpDestroy
)(
&
wmsg
);
}
...
...
@@ -512,12 +513,12 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSWriteMsg *pWrite) {
atomic_add_fetch_32
(
&
pTable
->
autoIndex
,
1
);
}
sdbDebug
(
"vgId:1, sdb:%s, insert key:%s to hash, rowSize:%d rows:%"
PRId64
", msg:%p"
,
pTable
->
tableN
ame
,
sdbDebug
(
"vgId:1, sdb:%s, insert key:%s to hash, rowSize:%d rows:%"
PRId64
", msg:%p"
,
pTable
->
n
ame
,
sdbGetRowStr
(
pTable
,
pWrite
->
pRow
),
pWrite
->
rowSize
,
pTable
->
numOfRows
,
pWrite
->
pMsg
);
int32_t
code
=
(
*
pTable
->
fpInsert
)(
pWrite
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
sdbError
(
"vgId:1, sdb:%s, failed to insert key:%s to hash, remove it"
,
pTable
->
tableN
ame
,
sdbError
(
"vgId:1, sdb:%s, failed to insert key:%s to hash, remove it"
,
pTable
->
n
ame
,
sdbGetRowStr
(
pTable
,
pWrite
->
pRow
));
sdbDeleteHash
(
pTable
,
pWrite
);
}
...
...
@@ -529,7 +530,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSWriteMsg *pWrite) {
int32_t
*
updateEnd
=
pWrite
->
pRow
+
pTable
->
refCountPos
-
4
;
bool
set
=
atomic_val_compare_exchange_32
(
updateEnd
,
0
,
1
)
==
0
;
if
(
!
set
)
{
sdbError
(
"vgId:1, sdb:%s, failed to delete key:%s from hash, for it already removed"
,
pTable
->
tableN
ame
,
sdbError
(
"vgId:1, sdb:%s, failed to delete key:%s from hash, for it already removed"
,
pTable
->
n
ame
,
sdbGetRowStr
(
pTable
,
pWrite
->
pRow
));
return
TSDB_CODE_MND_SDB_OBJ_NOT_THERE
;
}
...
...
@@ -548,7 +549,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSWriteMsg *pWrite) {
atomic_sub_fetch_32
(
&
pTable
->
numOfRows
,
1
);
sdbDebug
(
"vgId:1, sdb:%s, delete key:%s from hash, numOfRows:%"
PRId64
", msg:%p"
,
pTable
->
tableN
ame
,
sdbDebug
(
"vgId:1, sdb:%s, delete key:%s from hash, numOfRows:%"
PRId64
", msg:%p"
,
pTable
->
n
ame
,
sdbGetRowStr
(
pTable
,
pWrite
->
pRow
),
pTable
->
numOfRows
,
pWrite
->
pMsg
);
sdbDecRef
(
pTable
,
pWrite
->
pRow
);
...
...
@@ -557,7 +558,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSWriteMsg *pWrite) {
}
static
int32_t
sdbUpdateHash
(
SSdbTable
*
pTable
,
SSWriteMsg
*
pWrite
)
{
sdbDebug
(
"vgId:1, sdb:%s, update key:%s in hash, numOfRows:%"
PRId64
", msg:%p"
,
pTable
->
tableN
ame
,
sdbDebug
(
"vgId:1, sdb:%s, update key:%s in hash, numOfRows:%"
PRId64
", msg:%p"
,
pTable
->
n
ame
,
sdbGetRowStr
(
pTable
,
pWrite
->
pRow
),
pTable
->
numOfRows
,
pWrite
->
pMsg
);
(
*
pTable
->
fpUpdate
)(
pWrite
);
...
...
@@ -584,12 +585,12 @@ static int sdbWrite(void *wparam, void *hparam, int32_t qtype, void *unused) {
if
(
pHead
->
version
<=
tsSdbMgmt
.
version
)
{
pthread_mutex_unlock
(
&
tsSdbMgmt
.
mutex
);
sdbDebug
(
"vgId:1, sdb:%s, failed to restore %s key:%s from source(%d), hver:%"
PRIu64
" too large, mver:%"
PRIu64
,
pTable
->
tableN
ame
,
actStr
[
action
],
sdbGetKeyStr
(
pTable
,
pHead
->
cont
),
qtype
,
pHead
->
version
,
tsSdbMgmt
.
version
);
pTable
->
n
ame
,
actStr
[
action
],
sdbGetKeyStr
(
pTable
,
pHead
->
cont
),
qtype
,
pHead
->
version
,
tsSdbMgmt
.
version
);
return
TSDB_CODE_SUCCESS
;
}
else
if
(
pHead
->
version
!=
tsSdbMgmt
.
version
+
1
)
{
pthread_mutex_unlock
(
&
tsSdbMgmt
.
mutex
);
sdbError
(
"vgId:1, sdb:%s, failed to restore %s key:%s from source(%d), hver:%"
PRIu64
" too large, mver:%"
PRIu64
,
pTable
->
tableN
ame
,
actStr
[
action
],
sdbGetKeyStr
(
pTable
,
pHead
->
cont
),
qtype
,
pHead
->
version
,
tsSdbMgmt
.
version
);
pTable
->
n
ame
,
actStr
[
action
],
sdbGetKeyStr
(
pTable
,
pHead
->
cont
),
qtype
,
pHead
->
version
,
tsSdbMgmt
.
version
);
return
TSDB_CODE_SYN_INVALID_VERSION
;
}
else
{
tsSdbMgmt
.
version
=
pHead
->
version
;
...
...
@@ -612,19 +613,19 @@ static int sdbWrite(void *wparam, void *hparam, int32_t qtype, void *unused) {
if
(
syncCode
<=
0
)
pWrite
->
processedCount
=
1
;
if
(
syncCode
<
0
)
{
sdbError
(
"vgId:1, sdb:%s, failed to forward req since %s action:%s key:%s hver:%"
PRIu64
", msg:%p"
,
pTable
->
tableN
ame
,
sdbError
(
"vgId:1, sdb:%s, failed to forward req since %s action:%s key:%s hver:%"
PRIu64
", msg:%p"
,
pTable
->
n
ame
,
tstrerror
(
syncCode
),
actStr
[
action
],
sdbGetKeyStr
(
pTable
,
pHead
->
cont
),
pHead
->
version
,
pWrite
->
pMsg
);
}
else
if
(
syncCode
>
0
)
{
sdbDebug
(
"vgId:1, sdb:%s, forward req is sent, action:%s key:%s hver:%"
PRIu64
", msg:%p"
,
pTable
->
tableN
ame
,
sdbDebug
(
"vgId:1, sdb:%s, forward req is sent, action:%s key:%s hver:%"
PRIu64
", msg:%p"
,
pTable
->
n
ame
,
actStr
[
action
],
sdbGetKeyStr
(
pTable
,
pHead
->
cont
),
pHead
->
version
,
pWrite
->
pMsg
);
}
else
{
sdbTrace
(
"vgId:1, sdb:%s, no need to send fwd req, action:%s key:%s hver:%"
PRIu64
", msg:%p"
,
pTable
->
tableN
ame
,
sdbTrace
(
"vgId:1, sdb:%s, no need to send fwd req, action:%s key:%s hver:%"
PRIu64
", msg:%p"
,
pTable
->
n
ame
,
actStr
[
action
],
sdbGetKeyStr
(
pTable
,
pHead
->
cont
),
pHead
->
version
,
pWrite
->
pMsg
);
}
return
syncCode
;
}
sdbDebug
(
"vgId:1, sdb:%s, record from wal/fwd is disposed, action:%s key:%s hver:%"
PRIu64
,
pTable
->
tableN
ame
,
sdbDebug
(
"vgId:1, sdb:%s, record from wal/fwd is disposed, action:%s key:%s hver:%"
PRIu64
,
pTable
->
n
ame
,
actStr
[
action
],
sdbGetKeyStr
(
pTable
,
pHead
->
cont
),
pHead
->
version
);
// even it is WAL/FWD, it shall be called to update version in sync
...
...
@@ -638,7 +639,7 @@ static int sdbWrite(void *wparam, void *hparam, int32_t qtype, void *unused) {
}
else
if
(
action
==
SDB_ACTION_DELETE
)
{
void
*
pRow
=
sdbGetRowMeta
(
pTable
,
pHead
->
cont
);
if
(
pRow
==
NULL
)
{
sdbDebug
(
"vgId:1, sdb:%s, object:%s not exist in hash, ignore delete action"
,
pTable
->
tableN
ame
,
sdbDebug
(
"vgId:1, sdb:%s, object:%s not exist in hash, ignore delete action"
,
pTable
->
n
ame
,
sdbGetKeyStr
(
pTable
,
pHead
->
cont
));
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -647,7 +648,7 @@ static int sdbWrite(void *wparam, void *hparam, int32_t qtype, void *unused) {
}
else
if
(
action
==
SDB_ACTION_UPDATE
)
{
void
*
pRow
=
sdbGetRowMeta
(
pTable
,
pHead
->
cont
);
if
(
pRow
==
NULL
)
{
sdbDebug
(
"vgId:1, sdb:%s, object:%s not exist in hash, ignore update action"
,
pTable
->
tableN
ame
,
sdbDebug
(
"vgId:1, sdb:%s, object:%s not exist in hash, ignore update action"
,
pTable
->
n
ame
,
sdbGetKeyStr
(
pTable
,
pHead
->
cont
));
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -664,7 +665,7 @@ int32_t sdbInsertRow(SSWriteMsg *pWrite) {
if
(
pTable
==
NULL
)
return
TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE
;
if
(
sdbGetRowFromObj
(
pTable
,
pWrite
->
pRow
))
{
sdbError
(
"vgId:1, sdb:%s, failed to insert key:%s
, already exist"
,
pTable
->
tableN
ame
,
sdbError
(
"vgId:1, sdb:%s, failed to insert key:%s
since it already exist"
,
pTable
->
n
ame
,
sdbGetRowStr
(
pTable
,
pWrite
->
pRow
));
sdbDecRef
(
pTable
,
pWrite
->
pRow
);
return
TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE
;
...
...
@@ -674,14 +675,14 @@ int32_t sdbInsertRow(SSWriteMsg *pWrite) {
*
((
uint32_t
*
)
pWrite
->
pRow
)
=
atomic_add_fetch_32
(
&
pTable
->
autoIndex
,
1
);
// let vgId increase from 2
if
(
pTable
->
autoIndex
==
1
&&
strcmp
(
pTable
->
tableN
ame
,
"vgroups"
)
==
0
)
{
if
(
pTable
->
autoIndex
==
1
&&
strcmp
(
pTable
->
n
ame
,
"vgroups"
)
==
0
)
{
*
((
uint32_t
*
)
pWrite
->
pRow
)
=
atomic_add_fetch_32
(
&
pTable
->
autoIndex
,
1
);
}
}
int32_t
code
=
sdbInsertHash
(
pTable
,
pWrite
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
sdbError
(
"vgId:1, sdb:%s, failed to insert into hash"
,
pTable
->
tableN
ame
);
sdbError
(
"vgId:1, sdb:%s, failed to insert into hash"
,
pTable
->
n
ame
);
return
code
;
}
...
...
@@ -707,7 +708,7 @@ int32_t sdbInsertRowImp(SSWriteMsg *pWrite) {
SWalHead
*
pHead
=
(
SWalHead
*
)((
char
*
)
pNewWrite
+
sizeof
(
SSWriteMsg
)
+
SDB_SYNC_HACK
);
pHead
->
version
=
0
;
pHead
->
len
=
pWrite
->
rowSize
;
pHead
->
msgType
=
pTable
->
tableI
d
*
10
+
SDB_ACTION_INSERT
;
pHead
->
msgType
=
pTable
->
i
d
*
10
+
SDB_ACTION_INSERT
;
pWrite
->
rowData
=
pHead
->
cont
;
(
*
pTable
->
fpEncode
)(
pWrite
);
...
...
@@ -717,7 +718,7 @@ int32_t sdbInsertRowImp(SSWriteMsg *pWrite) {
if
(
pNewWrite
->
pMsg
!=
NULL
)
{
sdbDebug
(
"vgId:1, ahandle:%p msg:%p, sdb:%s row:%p:%s, insert action is add to sdb queue"
,
pNewWrite
->
pMsg
->
rpcMsg
.
ahandle
,
pNewWrite
->
pMsg
,
pTable
->
tableN
ame
,
pWrite
->
pRow
,
sdbGetRowStr
(
pTable
,
pWrite
->
pRow
));
pNewWrite
->
pMsg
,
pTable
->
n
ame
,
pWrite
->
pRow
,
sdbGetRowStr
(
pTable
,
pWrite
->
pRow
));
}
sdbIncRef
(
pNewWrite
->
pTable
,
pNewWrite
->
pRow
);
...
...
@@ -740,7 +741,7 @@ int32_t sdbDeleteRow(SSWriteMsg *pWrite) {
void
*
pRow
=
sdbGetRowMetaFromObj
(
pTable
,
pWrite
->
pRow
);
if
(
pRow
==
NULL
)
{
sdbDebug
(
"vgId:1, sdb:%s, record is not there, delete failed"
,
pTable
->
tableN
ame
);
sdbDebug
(
"vgId:1, sdb:%s, record is not there, delete failed"
,
pTable
->
n
ame
);
return
TSDB_CODE_MND_SDB_OBJ_NOT_THERE
;
}
...
...
@@ -748,7 +749,7 @@ int32_t sdbDeleteRow(SSWriteMsg *pWrite) {
int32_t
code
=
sdbDeleteHash
(
pTable
,
pWrite
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
sdbError
(
"vgId:1, sdb:%s, failed to delete from hash"
,
pTable
->
tableN
ame
);
sdbError
(
"vgId:1, sdb:%s, failed to delete from hash"
,
pTable
->
n
ame
);
sdbDecRef
(
pTable
,
pWrite
->
pRow
);
return
code
;
}
...
...
@@ -775,7 +776,7 @@ int32_t sdbDeleteRowImp(SSWriteMsg *pWrite) {
SWalHead
*
pHead
=
(
SWalHead
*
)((
void
*
)
pNewWrite
+
sizeof
(
SSWriteMsg
)
+
SDB_SYNC_HACK
);
pHead
->
version
=
0
;
pHead
->
msgType
=
pTable
->
tableI
d
*
10
+
SDB_ACTION_DELETE
;
pHead
->
msgType
=
pTable
->
i
d
*
10
+
SDB_ACTION_DELETE
;
pWrite
->
rowData
=
pHead
->
cont
;
(
*
pTable
->
fpEncode
)(
pWrite
);
...
...
@@ -785,7 +786,7 @@ int32_t sdbDeleteRowImp(SSWriteMsg *pWrite) {
if
(
pNewWrite
->
pMsg
!=
NULL
)
{
sdbDebug
(
"vgId:1, ahandle:%p msg:%p, sdb:%s row:%p:%s, delete action is add to sdb queue"
,
pNewWrite
->
pMsg
->
rpcMsg
.
ahandle
,
pNewWrite
->
pMsg
,
pTable
->
tableN
ame
,
pWrite
->
pRow
,
sdbGetRowStr
(
pTable
,
pWrite
->
pRow
));
pNewWrite
->
pMsg
,
pTable
->
n
ame
,
pWrite
->
pRow
,
sdbGetRowStr
(
pTable
,
pWrite
->
pRow
));
}
taosWriteQitem
(
tsSdbWQueue
,
TAOS_QTYPE_RPC
,
pNewWrite
);
...
...
@@ -799,13 +800,13 @@ int32_t sdbUpdateRow(SSWriteMsg *pWrite) {
void
*
pRow
=
sdbGetRowMetaFromObj
(
pTable
,
pWrite
->
pRow
);
if
(
pRow
==
NULL
)
{
sdbDebug
(
"vgId:1, sdb:%s, record is not there, update failed"
,
pTable
->
tableN
ame
);
sdbDebug
(
"vgId:1, sdb:%s, record is not there, update failed"
,
pTable
->
n
ame
);
return
TSDB_CODE_MND_SDB_OBJ_NOT_THERE
;
}
int32_t
code
=
sdbUpdateHash
(
pTable
,
pWrite
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
sdbError
(
"vgId:1, sdb:%s, failed to update hash"
,
pTable
->
tableN
ame
);
sdbError
(
"vgId:1, sdb:%s, failed to update hash"
,
pTable
->
n
ame
);
return
code
;
}
...
...
@@ -830,7 +831,7 @@ int32_t sdbUpdateRowImp(SSWriteMsg *pWrite) {
SWalHead
*
pHead
=
(
SWalHead
*
)((
void
*
)
pNewWrite
+
sizeof
(
SSWriteMsg
)
+
SDB_SYNC_HACK
);
pHead
->
version
=
0
;
pHead
->
msgType
=
pTable
->
tableI
d
*
10
+
SDB_ACTION_UPDATE
;
pHead
->
msgType
=
pTable
->
i
d
*
10
+
SDB_ACTION_UPDATE
;
pWrite
->
rowData
=
pHead
->
cont
;
(
*
pTable
->
fpEncode
)(
pWrite
);
...
...
@@ -840,7 +841,7 @@ int32_t sdbUpdateRowImp(SSWriteMsg *pWrite) {
if
(
pNewWrite
->
pMsg
!=
NULL
)
{
sdbDebug
(
"vgId:1, ahandle:%p msg:%p, sdb:%s row:%p:%s, update action is add to sdb queue"
,
pNewWrite
->
pMsg
->
rpcMsg
.
ahandle
,
pNewWrite
->
pMsg
,
pTable
->
tableN
ame
,
pWrite
->
pRow
,
sdbGetRowStr
(
pTable
,
pWrite
->
pRow
));
pNewWrite
->
pMsg
,
pTable
->
n
ame
,
pWrite
->
pRow
,
sdbGetRowStr
(
pTable
,
pWrite
->
pRow
));
}
sdbIncRef
(
pNewWrite
->
pTable
,
pNewWrite
->
pRow
);
...
...
@@ -888,9 +889,9 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) {
if
(
pTable
==
NULL
)
return
NULL
;
pthread_mutex_init
(
&
pTable
->
mutex
,
NULL
);
tstrncpy
(
pTable
->
tableName
,
pDesc
->
tableN
ame
,
SDB_TABLE_LEN
);
tstrncpy
(
pTable
->
name
,
pDesc
->
n
ame
,
SDB_TABLE_LEN
);
pTable
->
keyType
=
pDesc
->
keyType
;
pTable
->
tableId
=
pDesc
->
tableI
d
;
pTable
->
id
=
pDesc
->
i
d
;
pTable
->
hashSessions
=
pDesc
->
hashSessions
;
pTable
->
maxRowSize
=
pDesc
->
maxRowSize
;
pTable
->
refCountPos
=
pDesc
->
refCountPos
;
...
...
@@ -909,7 +910,7 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) {
pTable
->
iHandle
=
taosHashInit
(
pTable
->
hashSessions
,
hashFp
,
true
,
true
);
tsSdbMgmt
.
numOfTables
++
;
tsSdbMgmt
.
tableList
[
pTable
->
tableI
d
]
=
pTable
;
tsSdbMgmt
.
tableList
[
pTable
->
i
d
]
=
pTable
;
return
pTable
;
}
...
...
@@ -918,7 +919,7 @@ void sdbCloseTable(void *handle) {
if
(
pTable
==
NULL
)
return
;
tsSdbMgmt
.
numOfTables
--
;
tsSdbMgmt
.
tableList
[
pTable
->
tableI
d
]
=
NULL
;
tsSdbMgmt
.
tableList
[
pTable
->
i
d
]
=
NULL
;
SHashMutableIterator
*
pIter
=
taosHashCreateIter
(
pTable
->
iHandle
);
while
(
taosHashIterNext
(
pIter
))
{
...
...
@@ -937,7 +938,7 @@ void sdbCloseTable(void *handle) {
taosHashCleanup
(
pTable
->
iHandle
);
pthread_mutex_destroy
(
&
pTable
->
mutex
);
sdbDebug
(
"vgId:1, sdb:%s, is closed, numOfTables:%d"
,
pTable
->
tableN
ame
,
tsSdbMgmt
.
numOfTables
);
sdbDebug
(
"vgId:1, sdb:%s, is closed, numOfTables:%d"
,
pTable
->
n
ame
,
tsSdbMgmt
.
numOfTables
);
free
(
pTable
);
}
...
...
@@ -1062,7 +1063,7 @@ static void *sdbWorkerFp(void *pWorker) {
pHead
=
(
void
*
)
pWrite
+
sizeof
(
SSWriteMsg
)
+
SDB_SYNC_HACK
;
if
(
pWrite
->
pMsg
!=
NULL
)
{
sdbDebug
(
"vgId:1, ahandle:%p msg:%p, sdb:%s row:%p:%s hver:%"
PRIu64
", will be processed in sdb queue"
,
pWrite
->
pMsg
->
rpcMsg
.
ahandle
,
pWrite
->
pMsg
,
pWrite
->
pTable
->
tableN
ame
,
pWrite
->
pRow
,
pWrite
->
pMsg
->
rpcMsg
.
ahandle
,
pWrite
->
pMsg
,
pWrite
->
pTable
->
n
ame
,
pWrite
->
pRow
,
sdbGetKeyStr
(
pWrite
->
pTable
,
pHead
->
cont
),
pHead
->
version
);
}
}
else
{
...
...
src/mnode/src/mnodeTable.c
浏览文件 @
cf5bcc4b
...
...
@@ -352,8 +352,8 @@ static int32_t mnodeInitChildTables() {
tsChildTableUpdateSize
=
(
int8_t
*
)
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
.
info
.
type
;
SSdbTableDesc
tableDesc
=
{
.
tableId
=
SDB_TABLE_CTABLE
,
.
tableName
=
"ctables"
,
.
id
=
SDB_TABLE_CTABLE
,
.
name
=
"ctables"
,
.
hashSessions
=
TSDB_DEFAULT_CTABLES_HASH_SIZE
,
.
maxRowSize
=
sizeof
(
SCTableObj
)
+
sizeof
(
SSchema
)
*
(
TSDB_MAX_TAGS
+
TSDB_MAX_COLUMNS
+
16
)
+
TSDB_TABLE_FNAME_LEN
+
TSDB_CQ_SQL_SIZE
,
.
refCountPos
=
(
int8_t
*
)(
&
tObj
.
refCount
)
-
(
int8_t
*
)
&
tObj
,
...
...
@@ -545,8 +545,8 @@ static int32_t mnodeInitSuperTables() {
tsSuperTableUpdateSize
=
(
int8_t
*
)
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
.
info
.
type
;
SSdbTableDesc
tableDesc
=
{
.
tableId
=
SDB_TABLE_STABLE
,
.
tableName
=
"stables"
,
.
id
=
SDB_TABLE_STABLE
,
.
name
=
"stables"
,
.
hashSessions
=
TSDB_DEFAULT_STABLES_HASH_SIZE
,
.
maxRowSize
=
sizeof
(
SSTableObj
)
+
sizeof
(
SSchema
)
*
(
TSDB_MAX_TAGS
+
TSDB_MAX_COLUMNS
+
16
)
+
TSDB_TABLE_FNAME_LEN
,
.
refCountPos
=
(
int8_t
*
)(
&
tObj
.
refCount
)
-
(
int8_t
*
)
&
tObj
,
...
...
src/mnode/src/mnodeUser.c
浏览文件 @
cf5bcc4b
...
...
@@ -150,9 +150,9 @@ int32_t mnodeInitUsers() {
SUserObj
tObj
;
tsUserUpdateSize
=
(
int8_t
*
)
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
;
SSdbTableDesc
tableD
esc
=
{
.
tableId
=
SDB_TABLE_USER
,
.
tableName
=
"users"
,
SSdbTableDesc
d
esc
=
{
.
id
=
SDB_TABLE_USER
,
.
name
=
"users"
,
.
hashSessions
=
TSDB_DEFAULT_USERS_HASH_SIZE
,
.
maxRowSize
=
tsUserUpdateSize
,
.
refCountPos
=
(
int8_t
*
)(
&
tObj
.
refCount
)
-
(
int8_t
*
)
&
tObj
,
...
...
@@ -166,9 +166,9 @@ int32_t mnodeInitUsers() {
.
fpRestored
=
mnodeUserActionRestored
};
tsUserSdb
=
sdbOpenTable
(
&
tableD
esc
);
tsUserSdb
=
sdbOpenTable
(
&
d
esc
);
if
(
tsUserSdb
==
NULL
)
{
mError
(
"table:%s, failed to create hash"
,
tableDesc
.
tableN
ame
);
mError
(
"table:%s, failed to create hash"
,
desc
.
n
ame
);
return
-
1
;
}
...
...
@@ -179,7 +179,7 @@ int32_t mnodeInitUsers() {
mnodeAddShowRetrieveHandle
(
TSDB_MGMT_TABLE_USER
,
mnodeRetrieveUsers
);
mnodeAddPeerMsgHandle
(
TSDB_MSG_TYPE_DM_AUTH
,
mnodeProcessAuthMsg
);
mDebug
(
"table:%s, hash is created"
,
tableDesc
.
tableN
ame
);
mDebug
(
"table:%s, hash is created"
,
desc
.
n
ame
);
return
0
;
}
...
...
src/mnode/src/mnodeVgroup.c
浏览文件 @
cf5bcc4b
...
...
@@ -206,9 +206,9 @@ int32_t mnodeInitVgroups() {
SVgObj
tObj
;
tsVgUpdateSize
=
(
int8_t
*
)
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
;
SSdbTableDesc
tableD
esc
=
{
.
tableId
=
SDB_TABLE_VGROUP
,
.
tableName
=
"vgroups"
,
SSdbTableDesc
d
esc
=
{
.
id
=
SDB_TABLE_VGROUP
,
.
name
=
"vgroups"
,
.
hashSessions
=
TSDB_DEFAULT_VGROUPS_HASH_SIZE
,
.
maxRowSize
=
tsVgUpdateSize
,
.
refCountPos
=
(
int8_t
*
)(
&
tObj
.
refCount
)
-
(
int8_t
*
)
&
tObj
,
...
...
@@ -222,7 +222,7 @@ int32_t mnodeInitVgroups() {
.
fpRestored
=
mnodeVgroupActionRestored
,
};
tsVgroupSdb
=
sdbOpenTable
(
&
tableD
esc
);
tsVgroupSdb
=
sdbOpenTable
(
&
d
esc
);
if
(
tsVgroupSdb
==
NULL
)
{
mError
(
"failed to init vgroups data"
);
return
-
1
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录