Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
15a0befa
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
15a0befa
编写于
4月 27, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
4月 27, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #11928 from taosdata/feature/dnode
refactor(cluster): add cb func on transaction start and stop
上级
3abf351a
01dd33ae
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
194 addition
and
132 deletion
+194
-132
include/dnode/mnode/sdb/sdb.h
include/dnode/mnode/sdb/sdb.h
+27
-2
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+4
-4
source/dnode/mnode/impl/inc/mndTrans.h
source/dnode/mnode/impl/inc/mndTrans.h
+10
-1
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+65
-14
source/dnode/mnode/impl/src/mndUser.c
source/dnode/mnode/impl/src/mndUser.c
+72
-73
source/dnode/mnode/impl/src/mndVgroup.c
source/dnode/mnode/impl/src/mndVgroup.c
+2
-2
source/dnode/mnode/sdb/inc/sdbInt.h
source/dnode/mnode/sdb/inc/sdbInt.h
+0
-24
source/dnode/mnode/sdb/src/sdb.c
source/dnode/mnode/sdb/src/sdb.c
+1
-1
source/dnode/mnode/sdb/src/sdbHash.c
source/dnode/mnode/sdb/src/sdbHash.c
+8
-8
source/dnode/mnode/sdb/src/sdbRaw.c
source/dnode/mnode/sdb/src/sdbRaw.c
+3
-1
source/dnode/mnode/sdb/src/sdbRow.c
source/dnode/mnode/sdb/src/sdbRow.c
+2
-2
未找到文件。
include/dnode/mnode/sdb/sdb.h
浏览文件 @
15a0befa
...
...
@@ -18,6 +18,11 @@
#include "os.h"
#include "thash.h"
#include "tlockfree.h"
#include "tlog.h"
#include "tmsg.h"
#ifdef __cplusplus
extern
"C"
{
#endif
...
...
@@ -135,7 +140,7 @@ typedef enum {
typedef
struct
SSdb
SSdb
;
typedef
int32_t
(
*
SdbInsertFp
)(
SSdb
*
pSdb
,
void
*
pObj
);
typedef
int32_t
(
*
SdbUpdateFp
)(
SSdb
*
pSdb
,
void
*
pSrcObj
,
void
*
pDstObj
);
typedef
int32_t
(
*
SdbDeleteFp
)(
SSdb
*
pSdb
,
void
*
pObj
);
typedef
int32_t
(
*
SdbDeleteFp
)(
SSdb
*
pSdb
,
void
*
pObj
,
bool
callFunc
);
typedef
int32_t
(
*
SdbDeployFp
)(
SMnode
*
pMnode
);
typedef
SSdbRow
*
(
*
SdbDecodeFp
)(
SSdbRaw
*
pRaw
);
typedef
SSdbRaw
*
(
*
SdbEncodeFp
)(
void
*
pObj
);
...
...
@@ -326,9 +331,29 @@ int32_t sdbGetRawSoftVer(SSdbRaw *pRaw, int8_t *sver);
int32_t
sdbGetRawTotalSize
(
SSdbRaw
*
pRaw
);
SSdbRow
*
sdbAllocRow
(
int32_t
objSize
);
void
sdbFreeRow
(
SSdb
*
pSdb
,
SSdbRow
*
pRow
);
void
sdbFreeRow
(
SSdb
*
pSdb
,
SSdbRow
*
pRow
,
bool
callFunc
);
void
*
sdbGetRowObj
(
SSdbRow
*
pRow
);
typedef
struct
SSdb
{
SMnode
*
pMnode
;
char
*
currDir
;
char
*
syncDir
;
char
*
tmpDir
;
int64_t
lastCommitVer
;
int64_t
curVer
;
int64_t
tableVer
[
SDB_MAX
];
int64_t
maxId
[
SDB_MAX
];
EKeyType
keyTypes
[
SDB_MAX
];
SHashObj
*
hashObjs
[
SDB_MAX
];
SRWLatch
locks
[
SDB_MAX
];
SdbInsertFp
insertFps
[
SDB_MAX
];
SdbUpdateFp
updateFps
[
SDB_MAX
];
SdbDeleteFp
deleteFps
[
SDB_MAX
];
SdbDeployFp
deployFps
[
SDB_MAX
];
SdbEncodeFp
encodeFps
[
SDB_MAX
];
SdbDecodeFp
decodeFps
[
SDB_MAX
];
}
SSdb
;
#ifdef __cplusplus
}
#endif
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
15a0befa
...
...
@@ -126,8 +126,6 @@ typedef enum {
DND_REASON_OTHERS
}
EDndReason
;
typedef
void
(
*
TransCbFp
)(
SMnode
*
pMnode
,
void
*
param
);
typedef
struct
{
int32_t
id
;
ETrnStage
stage
;
...
...
@@ -150,8 +148,10 @@ typedef struct {
int64_t
dbUid
;
char
dbname
[
TSDB_DB_FNAME_LEN
];
char
lastError
[
TSDB_TRANS_ERROR_LEN
];
TransCbFp
transCbFp
;
void
*
transCbParam
;
int32_t
startFunc
;
int32_t
stopFunc
;
int32_t
paramLen
;
void
*
param
;
}
STrans
;
typedef
struct
{
...
...
source/dnode/mnode/impl/inc/mndTrans.h
浏览文件 @
15a0befa
...
...
@@ -33,6 +33,15 @@ typedef struct {
void
*
pCont
;
}
STransAction
;
typedef
enum
{
TEST_TRANS_START_FUNC
=
1
,
TEST_TRANS_STOP_FUNC
=
2
,
CONSUME_TRANS_START_FUNC
=
3
,
CONSUME_TRANS_STOP_FUNC
=
4
,
}
ETrnFuncType
;
typedef
void
(
*
TransCbFp
)(
SMnode
*
pMnode
,
void
*
param
,
int32_t
paramLen
);
int32_t
mndInitTrans
(
SMnode
*
pMnode
);
void
mndCleanupTrans
(
SMnode
*
pMnode
);
...
...
@@ -44,7 +53,7 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw);
int32_t
mndTransAppendRedoAction
(
STrans
*
pTrans
,
STransAction
*
pAction
);
int32_t
mndTransAppendUndoAction
(
STrans
*
pTrans
,
STransAction
*
pAction
);
void
mndTransSetRpcRsp
(
STrans
*
pTrans
,
void
*
pCont
,
int32_t
contLen
);
void
mndTransSetCb
(
STrans
*
pTrans
,
TransCbFp
fp
,
void
*
param
);
void
mndTransSetCb
(
STrans
*
pTrans
,
ETrnFuncType
startFunc
,
ETrnFuncType
stopFunc
,
void
*
param
,
int32_t
paramLen
);
void
mndTransSetDbInfo
(
STrans
*
pTrans
,
SDbObj
*
pDb
);
int32_t
mndTransPrepare
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
...
...
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
15a0befa
...
...
@@ -29,7 +29,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans);
static
SSdbRow
*
mndTransActionDecode
(
SSdbRaw
*
pRaw
);
static
int32_t
mndTransActionInsert
(
SSdb
*
pSdb
,
STrans
*
pTrans
);
static
int32_t
mndTransActionUpdate
(
SSdb
*
pSdb
,
STrans
*
OldTrans
,
STrans
*
pOld
);
static
int32_t
mndTransActionDelete
(
SSdb
*
pSdb
,
STrans
*
pTrans
);
static
int32_t
mndTransActionDelete
(
SSdb
*
pSdb
,
STrans
*
pTrans
,
bool
callFunc
);
static
int32_t
mndTransAppendLog
(
SArray
*
pArray
,
SSdbRaw
*
pRaw
);
static
int32_t
mndTransAppendAction
(
SArray
*
pArray
,
STransAction
*
pAction
);
...
...
@@ -174,6 +174,13 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_BINARY
(
pRaw
,
dataPos
,
(
void
*
)
pAction
->
pCont
,
pAction
->
contLen
,
TRANS_ENCODE_OVER
)
}
SDB_SET_INT32
(
pRaw
,
dataPos
,
pTrans
->
startFunc
,
TRANS_ENCODE_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pTrans
->
stopFunc
,
TRANS_ENCODE_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pTrans
->
paramLen
,
TRANS_ENCODE_OVER
)
if
(
pTrans
->
param
!=
NULL
)
{
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pTrans
->
param
,
pTrans
->
paramLen
,
TRANS_ENCODE_OVER
)
}
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
MND_TRANS_RESERVE_SIZE
,
TRANS_ENCODE_OVER
)
SDB_SET_DATALEN
(
pRaw
,
dataPos
,
TRANS_ENCODE_OVER
)
...
...
@@ -305,6 +312,14 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
action
.
pCont
=
NULL
;
}
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pTrans
->
startFunc
,
TRANS_DECODE_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pTrans
->
stopFunc
,
TRANS_DECODE_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pTrans
->
paramLen
,
TRANS_DECODE_OVER
)
if
(
pTrans
->
paramLen
!=
0
)
{
pTrans
->
param
=
taosMemoryMalloc
(
pTrans
->
paramLen
);
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pTrans
->
param
,
pTrans
->
paramLen
,
TRANS_DECODE_OVER
);
}
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
MND_TRANS_RESERVE_SIZE
,
TRANS_DECODE_OVER
)
terrno
=
0
;
...
...
@@ -413,9 +428,36 @@ static const char *mndTransType(ETrnType type) {
}
}
static
void
mndTransTestStartFunc
(
SMnode
*
pMnode
,
void
*
param
,
int32_t
paramLen
)
{
mInfo
(
"test trans start, param:%s, len:%d"
,
(
char
*
)
param
,
paramLen
);
}
static
void
mndTransTestStopFunc
(
SMnode
*
pMnode
,
void
*
param
,
int32_t
paramLen
)
{
mInfo
(
"test trans stop, param:%s, len:%d"
,
(
char
*
)
param
,
paramLen
);
}
static
TransCbFp
mndTransGetCbFp
(
ETrnFuncType
ftype
)
{
switch
(
ftype
)
{
case
TEST_TRANS_START_FUNC
:
return
mndTransTestStartFunc
;
case
TEST_TRANS_STOP_FUNC
:
return
mndTransTestStopFunc
;
default:
return
NULL
;
}
}
static
int32_t
mndTransActionInsert
(
SSdb
*
pSdb
,
STrans
*
pTrans
)
{
// pTrans->stage = TRN_STAGE_PREPARE;
mTrace
(
"trans:%d, perform insert action, row:%p stage:%s"
,
pTrans
->
id
,
pTrans
,
mndTransStr
(
pTrans
->
stage
));
if
(
pTrans
->
startFunc
>
0
)
{
TransCbFp
fp
=
mndTransGetCbFp
(
pTrans
->
startFunc
);
if
(
fp
)
{
(
*
fp
)(
pSdb
->
pMnode
,
pTrans
->
param
,
pTrans
->
paramLen
);
}
}
return
0
;
}
...
...
@@ -430,10 +472,23 @@ static void mndTransDropData(STrans *pTrans) {
pTrans
->
rpcRsp
=
NULL
;
pTrans
->
rpcRspLen
=
0
;
}
if
(
pTrans
->
param
!=
NULL
)
{
taosMemoryFree
(
pTrans
->
param
);
pTrans
->
param
=
NULL
;
pTrans
->
paramLen
=
0
;
}
}
static
int32_t
mndTransActionDelete
(
SSdb
*
pSdb
,
STrans
*
pTrans
)
{
mTrace
(
"trans:%d, perform delete action, row:%p stage:%s"
,
pTrans
->
id
,
pTrans
,
mndTransStr
(
pTrans
->
stage
));
static
int32_t
mndTransActionDelete
(
SSdb
*
pSdb
,
STrans
*
pTrans
,
bool
callFunc
)
{
mDebug
(
"trans:%d, perform delete action, row:%p stage:%s callfunc:%d"
,
pTrans
->
id
,
pTrans
,
mndTransStr
(
pTrans
->
stage
),
callFunc
);
if
(
pTrans
->
stopFunc
>
0
&&
callFunc
)
{
TransCbFp
fp
=
mndTransGetCbFp
(
pTrans
->
stopFunc
);
if
(
fp
)
{
(
*
fp
)(
pSdb
->
pMnode
,
pTrans
->
param
,
pTrans
->
paramLen
);
}
}
mndTransDropData
(
pTrans
);
return
0
;
}
...
...
@@ -498,7 +553,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S
return
NULL
;
}
mDebug
(
"trans:%d, is created, data:%p"
,
pTrans
->
id
,
pTrans
);
mDebug
(
"trans:%d,
local var
is created, data:%p"
,
pTrans
->
id
,
pTrans
);
return
pTrans
;
}
...
...
@@ -525,7 +580,7 @@ static void mndTransDropActions(SArray *pArray) {
void
mndTransDrop
(
STrans
*
pTrans
)
{
if
(
pTrans
!=
NULL
)
{
mndTransDropData
(
pTrans
);
mDebug
(
"trans:%d,
is dropp
ed, data:%p"
,
pTrans
->
id
,
pTrans
);
mDebug
(
"trans:%d,
local var is fre
ed, data:%p"
,
pTrans
->
id
,
pTrans
);
taosMemoryFreeClear
(
pTrans
);
}
}
...
...
@@ -574,9 +629,11 @@ void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen) {
pTrans
->
rpcRspLen
=
contLen
;
}
void
mndTransSetCb
(
STrans
*
pTrans
,
TransCbFp
fp
,
void
*
param
)
{
pTrans
->
transCbFp
=
fp
;
pTrans
->
transCbParam
=
param
;
void
mndTransSetCb
(
STrans
*
pTrans
,
ETrnFuncType
startFunc
,
ETrnFuncType
stopFunc
,
void
*
param
,
int32_t
paramLen
)
{
pTrans
->
startFunc
=
startFunc
;
pTrans
->
stopFunc
=
stopFunc
;
pTrans
->
param
=
param
;
pTrans
->
paramLen
=
paramLen
;
}
void
mndTransSetDbInfo
(
STrans
*
pTrans
,
SDbObj
*
pDb
)
{
...
...
@@ -712,8 +769,6 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
pNew
->
rpcRefId
=
pTrans
->
rpcRefId
;
pNew
->
rpcRsp
=
pTrans
->
rpcRsp
;
pNew
->
rpcRspLen
=
pTrans
->
rpcRspLen
;
pNew
->
transCbFp
=
pTrans
->
transCbFp
;
pNew
->
transCbParam
=
pTrans
->
transCbParam
;
pTrans
->
rpcRsp
=
NULL
;
pTrans
->
rpcRspLen
=
0
;
...
...
@@ -1125,10 +1180,6 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) {
mDebug
(
"trans:%d, finished, code:0x%04x, failedTimes:%d"
,
pTrans
->
id
,
pTrans
->
code
,
pTrans
->
failedTimes
);
if
(
pTrans
->
transCbFp
!=
NULL
)
{
(
*
pTrans
->
transCbFp
)(
pMnode
,
pTrans
->
transCbParam
);
}
return
continueExec
;
}
...
...
source/dnode/mnode/impl/src/mndUser.c
浏览文件 @
15a0befa
...
...
@@ -96,36 +96,36 @@ static SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
int32_t
size
=
sizeof
(
SUserObj
)
+
USER_RESERVE_SIZE
+
(
numOfReadDbs
+
numOfWriteDbs
)
*
TSDB_DB_FNAME_LEN
;
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_USER
,
USER_VER_NUMBER
,
size
);
if
(
pRaw
==
NULL
)
goto
USER_ENCODE
_OVER
;
if
(
pRaw
==
NULL
)
goto
_OVER
;
int32_t
dataPos
=
0
;
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pUser
->
user
,
TSDB_USER_LEN
,
USER_ENCODE
_OVER
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pUser
->
pass
,
TSDB_PASSWORD_LEN
,
USER_ENCODE
_OVER
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pUser
->
acct
,
TSDB_USER_LEN
,
USER_ENCODE
_OVER
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pUser
->
createdTime
,
USER_ENCODE
_OVER
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pUser
->
updateTime
,
USER_ENCODE
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pUser
->
superUser
,
USER_ENCODE
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
numOfReadDbs
,
USER_ENCODE
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
numOfWriteDbs
,
USER_ENCODE
_OVER
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pUser
->
user
,
TSDB_USER_LEN
,
_OVER
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pUser
->
pass
,
TSDB_PASSWORD_LEN
,
_OVER
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pUser
->
acct
,
TSDB_USER_LEN
,
_OVER
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pUser
->
createdTime
,
_OVER
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pUser
->
updateTime
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pUser
->
superUser
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
numOfReadDbs
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
numOfWriteDbs
,
_OVER
)
char
*
db
=
taosHashIterate
(
pUser
->
readDbs
,
NULL
);
while
(
db
!=
NULL
)
{
SDB_SET_BINARY
(
pRaw
,
dataPos
,
db
,
TSDB_DB_FNAME_LEN
,
USER_ENCODE
_OVER
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
db
,
TSDB_DB_FNAME_LEN
,
_OVER
);
db
=
taosHashIterate
(
pUser
->
readDbs
,
db
);
}
db
=
taosHashIterate
(
pUser
->
writeDbs
,
NULL
);
while
(
db
!=
NULL
)
{
SDB_SET_BINARY
(
pRaw
,
dataPos
,
db
,
TSDB_DB_FNAME_LEN
,
USER_ENCODE
_OVER
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
db
,
TSDB_DB_FNAME_LEN
,
_OVER
);
db
=
taosHashIterate
(
pUser
->
writeDbs
,
db
);
}
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
USER_RESERVE_SIZE
,
USER_ENCODE
_OVER
)
SDB_SET_DATALEN
(
pRaw
,
dataPos
,
USER_ENCODE
_OVER
)
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
USER_RESERVE_SIZE
,
_OVER
)
SDB_SET_DATALEN
(
pRaw
,
dataPos
,
_OVER
)
terrno
=
0
;
USER_ENCODE
_OVER:
_OVER:
if
(
terrno
!=
0
)
{
mError
(
"user:%s, failed to encode to raw:%p since %s"
,
pUser
->
user
,
pRaw
,
terrstr
());
sdbFreeRaw
(
pRaw
);
...
...
@@ -140,55 +140,54 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
int8_t
sver
=
0
;
if
(
sdbGetRawSoftVer
(
pRaw
,
&
sver
)
!=
0
)
goto
USER_DECODE
_OVER
;
if
(
sdbGetRawSoftVer
(
pRaw
,
&
sver
)
!=
0
)
goto
_OVER
;
if
(
sver
!=
USER_VER_NUMBER
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_VER
;
goto
USER_DECODE
_OVER
;
goto
_OVER
;
}
SSdbRow
*
pRow
=
sdbAllocRow
(
sizeof
(
SUserObj
));
if
(
pRow
==
NULL
)
goto
USER_DECODE
_OVER
;
if
(
pRow
==
NULL
)
goto
_OVER
;
SUserObj
*
pUser
=
sdbGetRowObj
(
pRow
);
if
(
pUser
==
NULL
)
goto
USER_DECODE_OVER
;
pUser
->
readDbs
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
true
);
pUser
->
writeDbs
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
true
);
if
(
pUser
->
readDbs
==
NULL
||
pUser
->
writeDbs
==
NULL
)
goto
USER_DECODE_OVER
;
if
(
pUser
==
NULL
)
goto
_OVER
;
int32_t
dataPos
=
0
;
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pUser
->
user
,
TSDB_USER_LEN
,
USER_DECODE
_OVER
)
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pUser
->
pass
,
TSDB_PASSWORD_LEN
,
USER_DECODE
_OVER
)
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pUser
->
acct
,
TSDB_USER_LEN
,
USER_DECODE
_OVER
)
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pUser
->
createdTime
,
USER_DECODE
_OVER
)
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pUser
->
updateTime
,
USER_DECODE
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
pUser
->
superUser
,
USER_DECODE
_OVER
)
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pUser
->
user
,
TSDB_USER_LEN
,
_OVER
)
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pUser
->
pass
,
TSDB_PASSWORD_LEN
,
_OVER
)
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pUser
->
acct
,
TSDB_USER_LEN
,
_OVER
)
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pUser
->
createdTime
,
_OVER
)
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pUser
->
updateTime
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
pUser
->
superUser
,
_OVER
)
int32_t
numOfReadDbs
=
0
;
int32_t
numOfWriteDbs
=
0
;
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
numOfReadDbs
,
USER_DECODE_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
numOfWriteDbs
,
USER_DECODE_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
numOfReadDbs
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
numOfWriteDbs
,
_OVER
)
pUser
->
readDbs
=
taosHashInit
(
numOfReadDbs
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
true
);
pUser
->
writeDbs
=
taosHashInit
(
numOfWriteDbs
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
true
);
if
(
pUser
->
readDbs
==
NULL
||
pUser
->
writeDbs
==
NULL
)
goto
_OVER
;
for
(
int32_t
i
=
0
;
i
<
numOfReadDbs
;
++
i
)
{
char
db
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
SDB_GET_BINARY
(
pRaw
,
dataPos
,
db
,
TSDB_DB_FNAME_LEN
,
USER_DECODE
_OVER
)
SDB_GET_BINARY
(
pRaw
,
dataPos
,
db
,
TSDB_DB_FNAME_LEN
,
_OVER
)
int32_t
len
=
strlen
(
db
)
+
1
;
taosHashPut
(
pUser
->
readDbs
,
db
,
len
,
db
,
TSDB_DB_FNAME_LEN
);
}
for
(
int32_t
i
=
0
;
i
<
numOfWriteDbs
;
++
i
)
{
char
db
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
SDB_GET_BINARY
(
pRaw
,
dataPos
,
db
,
TSDB_DB_FNAME_LEN
,
USER_DECODE
_OVER
)
SDB_GET_BINARY
(
pRaw
,
dataPos
,
db
,
TSDB_DB_FNAME_LEN
,
_OVER
)
int32_t
len
=
strlen
(
db
)
+
1
;
taosHashPut
(
pUser
->
writeDbs
,
db
,
len
,
db
,
TSDB_DB_FNAME_LEN
);
}
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
USER_RESERVE_SIZE
,
USER_DECODE
_OVER
)
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
USER_RESERVE_SIZE
,
_OVER
)
terrno
=
0
;
USER_DECODE
_OVER:
_OVER:
if
(
terrno
!=
0
)
{
mError
(
"user:%s, failed to decode from raw:%p since %s"
,
pUser
->
user
,
pRaw
,
terrstr
());
taosHashCleanup
(
pUser
->
readDbs
);
...
...
@@ -220,6 +219,8 @@ static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) {
mTrace
(
"user:%s, perform delete action, row:%p"
,
pUser
->
user
,
pUser
);
taosHashCleanup
(
pUser
->
readDbs
);
taosHashCleanup
(
pUser
->
writeDbs
);
pUser
->
readDbs
=
NULL
;
pUser
->
writeDbs
=
NULL
;
return
0
;
}
...
...
@@ -228,13 +229,8 @@ static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOld, SUserObj *pNew) {
memcpy
(
pOld
->
pass
,
pNew
->
pass
,
TSDB_PASSWORD_LEN
);
pOld
->
updateTime
=
pNew
->
updateTime
;
void
*
tmp1
=
pOld
->
readDbs
;
pOld
->
readDbs
=
pNew
->
readDbs
;
pNew
->
readDbs
=
tmp1
;
void
*
tmp2
=
pOld
->
writeDbs
;
pOld
->
writeDbs
=
pNew
->
writeDbs
;
pNew
->
writeDbs
=
tmp2
;
TSWAP
(
pOld
->
readDbs
,
pNew
->
readDbs
,
(
void
*
));
TSWAP
(
pOld
->
writeDbs
,
pNew
->
writeDbs
,
(
void
*
));
return
0
;
}
...
...
@@ -277,6 +273,9 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate
}
sdbSetRawStatus
(
pRedoRaw
,
SDB_STATUS_READY
);
char
*
param
=
strdup
(
"====> test code to be deleted later <====="
);
mndTransSetCb
(
pTrans
,
TEST_TRANS_START_FUNC
,
TEST_TRANS_STOP_FUNC
,
param
,
strlen
(
param
)
+
1
);
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
...
...
@@ -296,41 +295,41 @@ static int32_t mndProcessCreateUserReq(SNodeMsg *pReq) {
if
(
tDeserializeSCreateUserReq
(
pReq
->
rpcMsg
.
pCont
,
pReq
->
rpcMsg
.
contLen
,
&
createReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
goto
CREATE_USER
_OVER
;
goto
_OVER
;
}
mDebug
(
"user:%s, start to create"
,
createReq
.
user
);
if
(
createReq
.
user
[
0
]
==
0
)
{
terrno
=
TSDB_CODE_MND_INVALID_USER_FORMAT
;
goto
CREATE_USER
_OVER
;
goto
_OVER
;
}
if
(
createReq
.
pass
[
0
]
==
0
)
{
terrno
=
TSDB_CODE_MND_INVALID_PASS_FORMAT
;
goto
CREATE_USER
_OVER
;
goto
_OVER
;
}
pUser
=
mndAcquireUser
(
pMnode
,
createReq
.
user
);
if
(
pUser
!=
NULL
)
{
terrno
=
TSDB_CODE_MND_USER_ALREADY_EXIST
;
goto
CREATE_USER
_OVER
;
goto
_OVER
;
}
pOperUser
=
mndAcquireUser
(
pMnode
,
pReq
->
user
);
if
(
pOperUser
==
NULL
)
{
terrno
=
TSDB_CODE_MND_NO_USER_FROM_CONN
;
goto
CREATE_USER
_OVER
;
goto
_OVER
;
}
if
(
mndCheckCreateUserAuth
(
pOperUser
)
!=
0
)
{
goto
CREATE_USER
_OVER
;
goto
_OVER
;
}
code
=
mndCreateUser
(
pMnode
,
pOperUser
->
acct
,
&
createReq
,
pReq
);
if
(
code
==
0
)
code
=
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
CREATE_USER
_OVER:
_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
mError
(
"user:%s, failed to create since %s"
,
createReq
.
user
,
terrstr
());
}
...
...
@@ -399,38 +398,38 @@ static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) {
if
(
tDeserializeSAlterUserReq
(
pReq
->
rpcMsg
.
pCont
,
pReq
->
rpcMsg
.
contLen
,
&
alterReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
goto
ALTER_USER
_OVER
;
goto
_OVER
;
}
mDebug
(
"user:%s, start to alter"
,
alterReq
.
user
);
if
(
alterReq
.
user
[
0
]
==
0
)
{
terrno
=
TSDB_CODE_MND_INVALID_USER_FORMAT
;
goto
ALTER_USER
_OVER
;
goto
_OVER
;
}
if
(
alterReq
.
pass
[
0
]
==
0
)
{
terrno
=
TSDB_CODE_MND_INVALID_PASS_FORMAT
;
goto
ALTER_USER
_OVER
;
goto
_OVER
;
}
pUser
=
mndAcquireUser
(
pMnode
,
alterReq
.
user
);
if
(
pUser
==
NULL
)
{
terrno
=
TSDB_CODE_MND_USER_NOT_EXIST
;
goto
ALTER_USER
_OVER
;
goto
_OVER
;
}
pOperUser
=
mndAcquireUser
(
pMnode
,
pReq
->
user
);
if
(
pOperUser
==
NULL
)
{
terrno
=
TSDB_CODE_MND_NO_USER_FROM_CONN
;
goto
ALTER_USER
_OVER
;
goto
_OVER
;
}
memcpy
(
&
newUser
,
pUser
,
sizeof
(
SUserObj
));
newUser
.
readDbs
=
mndDupDbHash
(
pUser
->
readDbs
);
newUser
.
writeDbs
=
mndDupDbHash
(
pUser
->
writeDbs
);
if
(
newUser
.
readDbs
==
NULL
||
newUser
.
writeDbs
==
NULL
)
{
goto
ALTER_USER
_OVER
;
goto
_OVER
;
}
int32_t
len
=
strlen
(
alterReq
.
dbname
)
+
1
;
...
...
@@ -446,50 +445,50 @@ static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) {
}
else
if
(
alterReq
.
alterType
==
TSDB_ALTER_USER_ADD_READ_DB
)
{
if
(
pDb
==
NULL
)
{
terrno
=
TSDB_CODE_MND_DB_NOT_EXIST
;
goto
ALTER_USER
_OVER
;
goto
_OVER
;
}
if
(
taosHashPut
(
newUser
.
readDbs
,
alterReq
.
dbname
,
len
,
alterReq
.
dbname
,
TSDB_DB_FNAME_LEN
)
!=
0
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
ALTER_USER
_OVER
;
goto
_OVER
;
}
}
else
if
(
alterReq
.
alterType
==
TSDB_ALTER_USER_REMOVE_READ_DB
)
{
if
(
taosHashRemove
(
newUser
.
readDbs
,
alterReq
.
dbname
,
len
)
!=
0
)
{
terrno
=
TSDB_CODE_MND_DB_NOT_EXIST
;
goto
ALTER_USER
_OVER
;
goto
_OVER
;
}
}
else
if
(
alterReq
.
alterType
==
TSDB_ALTER_USER_CLEAR_READ_DB
)
{
taosHashClear
(
newUser
.
readDbs
);
}
else
if
(
alterReq
.
alterType
==
TSDB_ALTER_USER_ADD_WRITE_DB
)
{
if
(
pDb
==
NULL
)
{
terrno
=
TSDB_CODE_MND_DB_NOT_EXIST
;
goto
ALTER_USER
_OVER
;
goto
_OVER
;
}
if
(
taosHashPut
(
newUser
.
writeDbs
,
alterReq
.
dbname
,
len
,
alterReq
.
dbname
,
TSDB_DB_FNAME_LEN
)
!=
0
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
ALTER_USER
_OVER
;
goto
_OVER
;
}
}
else
if
(
alterReq
.
alterType
==
TSDB_ALTER_USER_REMOVE_WRITE_DB
)
{
if
(
taosHashRemove
(
newUser
.
writeDbs
,
alterReq
.
dbname
,
len
)
!=
0
)
{
terrno
=
TSDB_CODE_MND_DB_NOT_EXIST
;
goto
ALTER_USER
_OVER
;
goto
_OVER
;
}
}
else
if
(
alterReq
.
alterType
==
TSDB_ALTER_USER_CLEAR_WRITE_DB
)
{
taosHashClear
(
newUser
.
writeDbs
);
}
else
{
terrno
=
TSDB_CODE_MND_INVALID_ALTER_OPER
;
goto
ALTER_USER
_OVER
;
goto
_OVER
;
}
newUser
.
updateTime
=
taosGetTimestampMs
();
if
(
mndCheckAlterUserAuth
(
pOperUser
,
pUser
,
pDb
,
&
alterReq
)
!=
0
)
{
goto
ALTER_USER
_OVER
;
goto
_OVER
;
}
code
=
mndUpdateUser
(
pMnode
,
pUser
,
&
newUser
,
pReq
);
if
(
code
==
0
)
code
=
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
ALTER_USER
_OVER:
_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
mError
(
"user:%s, failed to alter since %s"
,
alterReq
.
user
,
terrstr
());
}
...
...
@@ -537,36 +536,36 @@ static int32_t mndProcessDropUserReq(SNodeMsg *pReq) {
if
(
tDeserializeSDropUserReq
(
pReq
->
rpcMsg
.
pCont
,
pReq
->
rpcMsg
.
contLen
,
&
dropReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
goto
DROP_USER
_OVER
;
goto
_OVER
;
}
mDebug
(
"user:%s, start to drop"
,
dropReq
.
user
);
if
(
dropReq
.
user
[
0
]
==
0
)
{
terrno
=
TSDB_CODE_MND_INVALID_USER_FORMAT
;
goto
DROP_USER
_OVER
;
goto
_OVER
;
}
pUser
=
mndAcquireUser
(
pMnode
,
dropReq
.
user
);
if
(
pUser
==
NULL
)
{
terrno
=
TSDB_CODE_MND_USER_NOT_EXIST
;
goto
DROP_USER
_OVER
;
goto
_OVER
;
}
pOperUser
=
mndAcquireUser
(
pMnode
,
pReq
->
user
);
if
(
pOperUser
==
NULL
)
{
terrno
=
TSDB_CODE_MND_NO_USER_FROM_CONN
;
goto
DROP_USER
_OVER
;
goto
_OVER
;
}
if
(
mndCheckDropUserAuth
(
pOperUser
)
!=
0
)
{
goto
DROP_USER
_OVER
;
goto
_OVER
;
}
code
=
mndDropUser
(
pMnode
,
pReq
,
pUser
);
if
(
code
==
0
)
code
=
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
DROP_USER
_OVER:
_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
mError
(
"user:%s, failed to drop since %s"
,
dropReq
.
user
,
terrstr
());
}
...
...
@@ -586,7 +585,7 @@ static int32_t mndProcessGetUserAuthReq(SNodeMsg *pReq) {
if
(
tDeserializeSGetUserAuthReq
(
pReq
->
rpcMsg
.
pCont
,
pReq
->
rpcMsg
.
contLen
,
&
authReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
goto
GET_AUTH
_OVER
;
goto
_OVER
;
}
mTrace
(
"user:%s, start to get auth"
,
authReq
.
user
);
...
...
@@ -594,7 +593,7 @@ static int32_t mndProcessGetUserAuthReq(SNodeMsg *pReq) {
pUser
=
mndAcquireUser
(
pMnode
,
authReq
.
user
);
if
(
pUser
==
NULL
)
{
terrno
=
TSDB_CODE_MND_USER_NOT_EXIST
;
goto
GET_AUTH
_OVER
;
goto
_OVER
;
}
memcpy
(
authRsp
.
user
,
pUser
->
user
,
TSDB_USER_LEN
);
...
...
@@ -622,7 +621,7 @@ static int32_t mndProcessGetUserAuthReq(SNodeMsg *pReq) {
void
*
pRsp
=
rpcMallocCont
(
contLen
);
if
(
pRsp
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
GET_AUTH
_OVER
;
goto
_OVER
;
}
tSerializeSGetUserAuthRsp
(
pRsp
,
contLen
,
&
authRsp
);
...
...
@@ -631,7 +630,7 @@ static int32_t mndProcessGetUserAuthReq(SNodeMsg *pReq) {
pReq
->
rspLen
=
contLen
;
code
=
0
;
GET_AUTH
_OVER:
_OVER:
mndReleaseUser
(
pMnode
,
pUser
);
taosHashCleanup
(
authRsp
.
readDbs
);
taosHashCleanup
(
authRsp
.
writeDbs
);
...
...
source/dnode/mnode/impl/src/mndVgroup.c
浏览文件 @
15a0befa
...
...
@@ -45,8 +45,8 @@ int32_t mndInitVgroup(SMnode *pMnode) {
.
encodeFp
=
(
SdbEncodeFp
)
mndVgroupActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndVgroupActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mndVgroupActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndVgroupAction
Dele
te
,
.
deleteFp
=
(
SdbDeleteFp
)
mndVgroupAction
Upda
te
};
.
updateFp
=
(
SdbUpdateFp
)
mndVgroupAction
Upda
te
,
.
deleteFp
=
(
SdbDeleteFp
)
mndVgroupAction
Dele
te
};
mndSetMsgHandle
(
pMnode
,
TDMT_DND_CREATE_VNODE_RSP
,
mndProcessCreateVnodeRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_ALTER_VNODE_RSP
,
mndProcessAlterVnodeRsp
);
...
...
source/dnode/mnode/sdb/inc/sdbInt.h
浏览文件 @
15a0befa
...
...
@@ -19,10 +19,6 @@
#include "os.h"
#include "sdb.h"
#include "thash.h"
#include "tlockfree.h"
#include "tlog.h"
#include "tmsg.h"
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -53,26 +49,6 @@ typedef struct SSdbRow {
char
pObj
[];
}
SSdbRow
;
typedef
struct
SSdb
{
SMnode
*
pMnode
;
char
*
currDir
;
char
*
syncDir
;
char
*
tmpDir
;
int64_t
lastCommitVer
;
int64_t
curVer
;
int64_t
tableVer
[
SDB_MAX
];
int64_t
maxId
[
SDB_MAX
];
EKeyType
keyTypes
[
SDB_MAX
];
SHashObj
*
hashObjs
[
SDB_MAX
];
SRWLatch
locks
[
SDB_MAX
];
SdbInsertFp
insertFps
[
SDB_MAX
];
SdbUpdateFp
updateFps
[
SDB_MAX
];
SdbDeleteFp
deleteFps
[
SDB_MAX
];
SdbDeployFp
deployFps
[
SDB_MAX
];
SdbEncodeFp
encodeFps
[
SDB_MAX
];
SdbDecodeFp
decodeFps
[
SDB_MAX
];
}
SSdb
;
const
char
*
sdbTableName
(
ESdbType
type
);
void
sdbPrintOper
(
SSdb
*
pSdb
,
SSdbRow
*
pRow
,
const
char
*
oper
);
...
...
source/dnode/mnode/sdb/src/sdb.c
浏览文件 @
15a0befa
...
...
@@ -87,7 +87,7 @@ void sdbCleanup(SSdb *pSdb) {
SSdbRow
*
pRow
=
*
ppRow
;
if
(
pRow
==
NULL
)
continue
;
sdbFreeRow
(
pSdb
,
pRow
);
sdbFreeRow
(
pSdb
,
pRow
,
true
);
ppRow
=
taosHashIterate
(
hash
,
ppRow
);
}
}
...
...
source/dnode/mnode/sdb/src/sdbHash.c
浏览文件 @
15a0befa
...
...
@@ -137,7 +137,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
SSdbRow
*
pOldRow
=
taosHashGet
(
hash
,
pRow
->
pObj
,
keySize
);
if
(
pOldRow
!=
NULL
)
{
taosWUnLockLatch
(
pLock
);
sdbFreeRow
(
pSdb
,
pRow
);
sdbFreeRow
(
pSdb
,
pRow
,
false
);
terrno
=
TSDB_CODE_SDB_OBJ_ALREADY_THERE
;
return
terrno
;
}
...
...
@@ -148,7 +148,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
if
(
taosHashPut
(
hash
,
pRow
->
pObj
,
keySize
,
&
pRow
,
sizeof
(
void
*
))
!=
0
)
{
taosWUnLockLatch
(
pLock
);
sdbFreeRow
(
pSdb
,
pRow
);
sdbFreeRow
(
pSdb
,
pRow
,
false
);
terrno
=
TSDB_CODE_SDB_OBJ_ALREADY_THERE
;
return
terrno
;
}
...
...
@@ -164,7 +164,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
taosWLockLatch
(
pLock
);
taosHashRemove
(
hash
,
pRow
->
pObj
,
keySize
);
taosWUnLockLatch
(
pLock
);
sdbFreeRow
(
pSdb
,
pRow
);
sdbFreeRow
(
pSdb
,
pRow
,
false
);
terrno
=
code
;
return
terrno
;
}
...
...
@@ -202,7 +202,7 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
code
=
(
*
updateFp
)(
pSdb
,
pOldRow
->
pObj
,
pNewRow
->
pObj
);
}
sdbFreeRow
(
pSdb
,
pNewRow
);
sdbFreeRow
(
pSdb
,
pNewRow
,
false
);
pSdb
->
tableVer
[
pOldRow
->
type
]
++
;
return
code
;
...
...
@@ -215,7 +215,7 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
SSdbRow
**
ppOldRow
=
taosHashGet
(
hash
,
pRow
->
pObj
,
keySize
);
if
(
ppOldRow
==
NULL
||
*
ppOldRow
==
NULL
)
{
taosWUnLockLatch
(
pLock
);
sdbFreeRow
(
pSdb
,
pRow
);
sdbFreeRow
(
pSdb
,
pRow
,
false
);
terrno
=
TSDB_CODE_SDB_OBJ_NOT_THERE
;
return
terrno
;
}
...
...
@@ -228,7 +228,7 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
taosWUnLockLatch
(
pLock
);
pSdb
->
tableVer
[
pOldRow
->
type
]
++
;
sdbFreeRow
(
pSdb
,
pRow
);
sdbFreeRow
(
pSdb
,
pRow
,
false
);
sdbCheck
(
pSdb
,
pOldRow
);
// sdbRelease(pSdb, pOldRow->pObj);
...
...
@@ -322,7 +322,7 @@ static void sdbCheck(SSdb *pSdb, SSdbRow *pRow) {
int32_t
ref
=
atomic_load_32
(
&
pRow
->
refCount
);
sdbPrintOper
(
pSdb
,
pRow
,
"check"
);
if
(
ref
<=
0
&&
pRow
->
status
==
SDB_STATUS_DROPPED
)
{
sdbFreeRow
(
pSdb
,
pRow
);
sdbFreeRow
(
pSdb
,
pRow
,
true
);
}
taosRUnLockLatch
(
pLock
);
...
...
@@ -340,7 +340,7 @@ void sdbRelease(SSdb *pSdb, void *pObj) {
int32_t
ref
=
atomic_sub_fetch_32
(
&
pRow
->
refCount
,
1
);
sdbPrintOper
(
pSdb
,
pRow
,
"release"
);
if
(
ref
<=
0
&&
pRow
->
status
==
SDB_STATUS_DROPPED
)
{
sdbFreeRow
(
pSdb
,
pRow
);
sdbFreeRow
(
pSdb
,
pRow
,
true
);
}
taosRUnLockLatch
(
pLock
);
...
...
source/dnode/mnode/sdb/src/sdbRaw.c
浏览文件 @
15a0befa
...
...
@@ -107,7 +107,9 @@ int32_t sdbSetRawBinary(SSdbRaw *pRaw, int32_t dataPos, const char *pVal, int32_
return
-
1
;
}
memcpy
(
pRaw
->
pData
+
dataPos
,
pVal
,
valLen
);
if
(
pVal
!=
NULL
)
{
memcpy
(
pRaw
->
pData
+
dataPos
,
pVal
,
valLen
);
}
return
0
;
}
...
...
source/dnode/mnode/sdb/src/sdbRow.c
浏览文件 @
15a0befa
...
...
@@ -36,11 +36,11 @@ void *sdbGetRowObj(SSdbRow *pRow) {
return
pRow
->
pObj
;
}
void
sdbFreeRow
(
SSdb
*
pSdb
,
SSdbRow
*
pRow
)
{
void
sdbFreeRow
(
SSdb
*
pSdb
,
SSdbRow
*
pRow
,
bool
callFunc
)
{
// remove attached object such as trans
SdbDeleteFp
deleteFp
=
pSdb
->
deleteFps
[
pRow
->
type
];
if
(
deleteFp
!=
NULL
)
{
(
*
deleteFp
)(
pSdb
,
pRow
->
pObj
);
(
*
deleteFp
)(
pSdb
,
pRow
->
pObj
,
callFunc
);
}
sdbPrintOper
(
pSdb
,
pRow
,
"free"
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录