Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f13b6149
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
f13b6149
编写于
11月 09, 2021
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into feature/3.0_liaohj
上级
3c8a25aa
d2a6d4ea
变更
22
隐藏空白更改
内联
并排
Showing
22 changed file
with
1143 addition
and
439 deletion
+1143
-439
include/common/taosmsg.h
include/common/taosmsg.h
+35
-30
include/server/mnode/mnode.h
include/server/mnode/mnode.h
+24
-13
include/server/vnode/tq/tq.h
include/server/vnode/tq/tq.h
+0
-3
include/server/vnode/vnode.h
include/server/vnode/vnode.h
+19
-18
include/util/taoserror.h
include/util/taoserror.h
+12
-8
source/dnode/mgmt/inc/dnodeMnode.h
source/dnode/mgmt/inc/dnodeMnode.h
+5
-1
source/dnode/mgmt/src/dnodeMnode.c
source/dnode/mgmt/src/dnodeMnode.c
+434
-132
source/dnode/mgmt/src/dnodeTransport.c
source/dnode/mgmt/src/dnodeTransport.c
+51
-49
source/dnode/mgmt/src/dnodeVnodes.c
source/dnode/mgmt/src/dnodeVnodes.c
+1
-1
source/dnode/mnode/inc/mnodeInt.h
source/dnode/mnode/inc/mnodeInt.h
+0
-1
source/dnode/mnode/inc/mnodeWorker.h
source/dnode/mnode/inc/mnodeWorker.h
+0
-1
source/dnode/mnode/src/mnodeTelem.c
source/dnode/mnode/src/mnodeTelem.c
+12
-12
source/dnode/mnode/src/mnodeWorker.c
source/dnode/mnode/src/mnodeWorker.c
+16
-17
source/dnode/mnode/src/mondeInt.c
source/dnode/mnode/src/mondeInt.c
+27
-32
source/dnode/vnode/tq/CMakeLists.txt
source/dnode/vnode/tq/CMakeLists.txt
+4
-0
source/dnode/vnode/tq/inc/tqMetaStore.h
source/dnode/vnode/tq/inc/tqMetaStore.h
+37
-15
source/dnode/vnode/tq/src/tqMetaStore.c
source/dnode/vnode/tq/src/tqMetaStore.c
+305
-102
source/dnode/vnode/tq/test/CMakeLists.txt
source/dnode/vnode/tq/test/CMakeLists.txt
+20
-0
source/dnode/vnode/tq/test/tqMetaTest.cpp
source/dnode/vnode/tq/test/tqMetaTest.cpp
+133
-0
source/dnode/vnode/tq/test/tqTests.cpp
source/dnode/vnode/tq/test/tqTests.cpp
+0
-0
source/os/src/osDir.c
source/os/src/osDir.c
+2
-2
source/util/src/terror.c
source/util/src/terror.c
+6
-2
未找到文件。
include/common/taosmsg.h
浏览文件 @
f13b6149
...
...
@@ -75,9 +75,9 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_FUNCTION, "create-function" )
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_ALTER_FUNCTION
,
"alter-function"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DROP_FUNCTION
,
"drop-function"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_CREATE_STABLE
,
"create-stable"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_STABLE_VGROUP
,
"stable-vgroup"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DROP_STABLE
,
"drop-stable"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_ALTER_STABLE
,
"alter-stable"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DROP_STABLE
,
"drop-stable"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_STABLE_VGROUP
,
"stable-vgroup"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_KILL_QUERY
,
"kill-query"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_KILL_CONN
,
"kill-conn"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_HEARTBEAT
,
"heartbeat"
)
...
...
@@ -108,6 +108,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_AUTH_VNODE_IN, "auth-vnode" )
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_SYNC_VNODE_IN
,
"sync-vnode"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_COMPACT_VNODE_IN
,
"compact-vnode"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_CREATE_MNODE_IN
,
"create-mnode"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_ALTER_MNODE_IN
,
"alter-mnode"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DROP_MNODE_IN
,
"drop-mnode"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_CONFIG_DNODE_IN
,
"config-dnode"
)
...
...
@@ -700,32 +701,33 @@ typedef struct {
}
SStatusRsp
;
typedef
struct
{
uint16_t
port
;
char
fqdn
[
TSDB_FQDN_LEN
];
}
SVnodeDesc
;
typedef
struct
{
char
db
[
TSDB_FULL_DB_NAME_LEN
];
uint32_t
vgId
;
int32_t
cacheBlockSize
;
int32_t
totalBlocks
;
int32_t
daysPerFile
;
int32_t
daysToKeep0
;
int32_t
daysToKeep1
;
int32_t
daysToKeep2
;
int32_t
minRowsPerFileBlock
;
int32_t
maxRowsPerFileBlock
;
int32_t
fsyncPeriod
;
int8_t
reserved
[
16
];
int8_t
precision
;
int8_t
compression
;
int8_t
cacheLastRow
;
int8_t
update
;
int8_t
walLevel
;
int8_t
replica
;
int8_t
quorum
;
int8_t
selfIndex
;
SVnodeDesc
replicas
[
TSDB_MAX_REPLICA
];
int32_t
id
;
uint16_t
port
;
// node sync Port
char
fqdn
[
TSDB_FQDN_LEN
];
// node FQDN
}
SReplica
;
typedef
struct
{
char
db
[
TSDB_FULL_DB_NAME_LEN
];
uint32_t
vgId
;
int32_t
cacheBlockSize
;
int32_t
totalBlocks
;
int32_t
daysPerFile
;
int32_t
daysToKeep0
;
int32_t
daysToKeep1
;
int32_t
daysToKeep2
;
int32_t
minRowsPerFileBlock
;
int32_t
maxRowsPerFileBlock
;
int32_t
fsyncPeriod
;
int8_t
reserved
[
16
];
int8_t
precision
;
int8_t
compression
;
int8_t
cacheLastRow
;
int8_t
update
;
int8_t
walLevel
;
int8_t
quorum
;
int8_t
replica
;
int8_t
selfIndex
;
SReplica
replicas
[
TSDB_MAX_REPLICA
];
}
SCreateVnodeMsg
,
SAlterVnodeMsg
;
typedef
struct
{
...
...
@@ -829,8 +831,11 @@ typedef struct {
}
SCreateDnodeMsg
,
SDropDnodeMsg
;
typedef
struct
{
int32_t
dnodeId
;
}
SCreateMnodeMsg
,
SDropMnodeMsg
;
int32_t
dnodeId
;
int8_t
replica
;
int8_t
reserved
[
3
];
SReplica
replicas
[
TSDB_MAX_REPLICA
];
}
SCreateMnodeMsg
,
SAlterMnodeMsg
,
SDropMnodeMsg
;
typedef
struct
{
int32_t
dnodeId
;
...
...
include/server/mnode/mnode.h
浏览文件 @
f13b6149
...
...
@@ -20,6 +20,16 @@
extern
"C"
{
#endif
typedef
enum
{
MN_MSG_TYPE_WRITE
=
1
,
MN_MSG_TYPE_APPLY
,
MN_MSG_TYPE_SYNC
,
MN_MSG_TYPE_READ
}
EMnMsgType
;
typedef
struct
SMnodeMsg
SMnodeMsg
;
typedef
struct
{
int8_t
replica
;
int8_t
selfIndex
;
SReplica
replicas
[
TSDB_MAX_REPLICA
];
}
SMnodeCfg
;
typedef
struct
{
int64_t
numOfDnode
;
int64_t
numOfMnode
;
...
...
@@ -31,32 +41,33 @@ typedef struct {
int64_t
totalPoints
;
int64_t
totalStorage
;
int64_t
compStorage
;
}
SMnode
Stat
;
}
SMnode
Load
;
typedef
struct
{
int32_t
dnodeId
;
int64_t
clusterId
;
void
(
*
SendMsgToDnode
)(
struct
SEpSet
*
epSet
,
struct
SRpcMsg
*
rpcMsg
);
void
(
*
SendMsgToMnode
)(
struct
SRpcMsg
*
rpcMsg
);
void
(
*
SendRedirectMsg
)(
struct
SRpcMsg
*
rpcMsg
,
bool
forShell
);
void
(
*
GetDnodeEp
)(
int32_t
dnodeId
,
char
*
ep
,
char
*
fqdn
,
uint16_t
*
port
);
}
SMnodeFp
;
typedef
struct
{
SMnodeFp
fp
;
int64_t
clusterId
;
int32_t
dnodeId
;
int32_t
(
*
PutMsgIntoApplyQueue
)(
SMnodeMsg
*
pMsg
);
}
SMnodePara
;
int32_t
mnodeInit
(
SMnodePara
para
);
void
mnodeCleanup
();
int32_t
mnodeDeploy
();
void
mnodeUnDeploy
();
int32_t
mnodeStart
();
int32_t
mnodeDeploy
(
char
*
path
,
SMnodeCfg
*
pCfg
);
void
mnodeUnDeploy
(
char
*
path
);
int32_t
mnodeStart
(
char
*
path
,
SMnodeCfg
*
pCfg
);
int32_t
mnodeAlter
(
SMnodeCfg
*
pCfg
);
void
mnodeStop
();
int32_t
mnodeGet
Statistics
(
SMnodeStat
*
stat
);
int32_t
mnodeGet
Load
(
SMnodeLoad
*
pLoad
);
int32_t
mnodeRetriveAuth
(
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
);
void
mnodeProcessMsg
(
SRpcMsg
*
rpcMsg
);
SMnodeMsg
*
mnodeInitMsg
(
int32_t
msgNum
);
int32_t
mnodeAppendMsg
(
SMnodeMsg
*
pMsg
,
SRpcMsg
*
pRpcMsg
);
void
mnodeCleanupMsg
(
SMnodeMsg
*
pMsg
);
void
mnodeProcessMsg
(
SMnodeMsg
*
pMsg
,
EMnMsgType
msgType
);
#ifdef __cplusplus
}
...
...
include/server/vnode/tq/tq.h
浏览文件 @
f13b6149
...
...
@@ -19,9 +19,6 @@
#include "os.h"
#include "tutil.h"
#define TQ_ACTION_INSERT 0x7f7f7f7fULL
#define TQ_ACTION_DELETE 0x80808080ULL
#ifdef __cplusplus
extern
"C"
{
#endif
...
...
include/server/vnode/vnode.h
浏览文件 @
f13b6149
...
...
@@ -27,24 +27,25 @@ extern "C" {
typedef
struct
SVnode
SVnode
;
typedef
struct
{
char
db
[
TSDB_FULL_DB_NAME_LEN
];
int32_t
cacheBlockSize
;
// MB
int32_t
totalBlocks
;
int32_t
daysPerFile
;
int32_t
daysToKeep0
;
int32_t
daysToKeep1
;
int32_t
daysToKeep2
;
int32_t
minRowsPerFileBlock
;
int32_t
maxRowsPerFileBlock
;
int8_t
precision
;
// time resolution
int8_t
compression
;
int8_t
cacheLastRow
;
int8_t
update
;
int8_t
quorum
;
int8_t
replica
;
int8_t
walLevel
;
int32_t
fsyncPeriod
;
// millisecond
SVnodeDesc
replicas
[
TSDB_MAX_REPLICA
];
char
db
[
TSDB_FULL_DB_NAME_LEN
];
int32_t
cacheBlockSize
;
// MB
int32_t
totalBlocks
;
int32_t
daysPerFile
;
int32_t
daysToKeep0
;
int32_t
daysToKeep1
;
int32_t
daysToKeep2
;
int32_t
minRowsPerFileBlock
;
int32_t
maxRowsPerFileBlock
;
int8_t
precision
;
// time resolution
int8_t
compression
;
int8_t
cacheLastRow
;
int8_t
update
;
int8_t
quorum
;
int8_t
replica
;
int8_t
selfIndex
;
int8_t
walLevel
;
int32_t
fsyncPeriod
;
// millisecond
SReplica
replicas
[
TSDB_MAX_REPLICA
];
}
SVnodeCfg
;
typedef
enum
{
...
...
include/util/taoserror.h
浏览文件 @
f13b6149
...
...
@@ -216,14 +216,18 @@ int32_t* taosGetErrno();
// dnode
#define TSDB_CODE_DND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0400) //"Message not processed")
#define TSDB_CODE_DND_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0401) //"Dnode out of memory")
#define TSDB_CODE_DND_DNODE_ID_NOT_MATCHED TAOS_DEF_ERROR_CODE(0, 0x0402) //"Dnode Id not matched")
#define TSDB_CODE_DND_MNODE_ALREADY_DROPPED TAOS_DEF_ERROR_CODE(0, 0x0403) //"Mnode already deployed")
#define TSDB_CODE_DND_NO_WRITE_ACCESS TAOS_DEF_ERROR_CODE(0, 0x0404) //"No permission for disk files in dnode")
#define TSDB_CODE_DND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0405) //"Invalid message length")
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0406) //"Action in progress")
#define TSDB_CODE_DND_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x0407) //"Too many vnode directories")
#define TSDB_CODE_DND_EXITING TAOS_DEF_ERROR_CODE(0, 0x0408) //"Dnode is exiting"
#define TSDB_CODE_DND_PARSE_VNODE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0409) //"Parse vnodes.json error")
#define TSDB_CODE_DND_MNODE_ID_NOT_MATCH_DNODE TAOS_DEF_ERROR_CODE(0, 0x0402) //"Mnode Id not match Dnode")
#define TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0403) //"Mnode already deployed")
#define TSDB_CODE_DND_MNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0404) //"Mnode not deployed")
#define TSDB_CODE_DND_READ_MNODE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0405) //"Read mnode.json error")
#define TSDB_CODE_DND_WRITE_MNODE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0406) //"Write mnode.json error")
#define TSDB_CODE_DND_NO_WRITE_ACCESS TAOS_DEF_ERROR_CODE(0, 0x0407) //"No permission for disk files in dnode")
#define TSDB_CODE_DND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0408) //"Invalid message length")
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0409) //"Action in progress")
#define TSDB_CODE_DND_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x040A) //"Too many vnode directories")
#define TSDB_CODE_DND_EXITING TAOS_DEF_ERROR_CODE(0, 0x040B) //"Dnode is exiting"
#define TSDB_CODE_DND_PARSE_VNODE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x040C) //"Parse vnodes.json error")
#define TSDB_CODE_DND_PARSE_DNODE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x040D) //"Parse dnodes.json error")
// vnode
#define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) //"Action in progress")
...
...
source/dnode/mgmt/inc/dnodeMnode.h
浏览文件 @
f13b6149
...
...
@@ -23,9 +23,13 @@ extern "C" {
int32_t
dnodeInitMnode
();
void
dnodeCleanupMnode
();
void
dnodeProcessMnodeMsg
(
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
int32_t
dnodeGetUserAuthFromMnode
(
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
);
void
dnodeProcessMnodeMgmtMsg
(
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
void
dnodeProcessMnodeReadMsg
(
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
void
dnodeProcessMnodeWriteMsg
(
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
void
dnodeProcessMnodeSyncMsg
(
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
#ifdef __cplusplus
}
#endif
...
...
source/dnode/mgmt/src/dnodeMnode.c
浏览文件 @
f13b6149
...
...
@@ -15,73 +15,120 @@
#define _DEFAULT_SOURCE
#include "dnodeMnode.h"
#include "cJSON.h"
#include "dnodeDnode.h"
#include "dnodeTransport.h"
#include "cJSON.h"
#include "mnode.h"
#include "tlockfree.h"
#include "tqueue.h"
#include "tstep.h"
#include "tworker.h"
static
struct
{
int8_t
deployed
;
int8_t
dropped
;
char
file
[
PATH_MAX
+
20
];
pthread_mutex_t
mutex
;
int32_t
refCount
;
int8_t
deployed
;
int8_t
dropped
;
SWorkerPool
mgmtPool
;
SWorkerPool
readPool
;
SWorkerPool
writePool
;
SWorkerPool
syncPool
;
taos_queue
pReadQ
;
taos_queue
pWriteQ
;
taos_queue
pApplyQ
;
taos_queue
pSyncQ
;
taos_queue
pMgmtQ
;
SSteps
*
pSteps
;
SRWLatch
latch
;
}
tsMnode
=
{
0
};
static
int32_t
dnodeReadMnode
()
{
static
int32_t
dnodeAllocMnodeReadQueue
();
static
void
dnodeFreeMnodeReadQueue
();
static
int32_t
dnodeAllocMnodeWriteQueue
();
static
void
dnodeFreeMnodeWriteQueue
();
static
int32_t
dnodeAllocMnodeApplyQueue
();
static
void
dnodeFreeMnodeApplyQueue
();
static
int32_t
dnodeAllocMnodeSyncQueue
();
static
void
dnodeFreeMnodeSyncQueue
();
static
int32_t
dnodeAcquireMnode
()
{
taosRLockLatch
(
&
tsMnode
.
latch
);
int32_t
code
=
tsMnode
.
deployed
?
0
:
TSDB_CODE_DND_MNODE_NOT_DEPLOYED
;
if
(
code
==
0
)
{
atomic_add_fetch_32
(
&
tsMnode
.
refCount
,
1
);
}
taosRUnLockLatch
(
&
tsMnode
.
latch
);
return
code
;
}
static
void
dnodeReleaseMnode
()
{
atomic_sub_fetch_32
(
&
tsMnode
.
refCount
,
1
);
}
static
int32_t
dnodeReadMnodeFile
()
{
int32_t
code
=
TSDB_CODE_DND_READ_MNODE_FILE_ERROR
;
int32_t
len
=
0
;
int32_t
maxLen
=
300
;
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
cJSON
*
root
=
NULL
;
FILE
*
fp
=
NULL
;
char
file
[
PATH_MAX
+
20
]
=
{
0
};
fp
=
fopen
(
tsMnode
.
file
,
"r"
);
snprintf
(
file
,
sizeof
(
file
),
"%s/mnode.json"
,
tsDnodeDir
);
fp
=
fopen
(
file
,
"r"
);
if
(
!
fp
)
{
dDebug
(
"file %s not exist"
,
tsMnode
.
file
);
dDebug
(
"file %s not exist"
,
file
);
code
=
0
;
goto
PRASE_MNODE_OVER
;
}
len
=
(
int32_t
)
fread
(
content
,
1
,
maxLen
,
fp
);
if
(
len
<=
0
)
{
dError
(
"failed to read %s since content is null"
,
tsMnode
.
file
);
dError
(
"failed to read %s since content is null"
,
file
);
goto
PRASE_MNODE_OVER
;
}
content
[
len
]
=
0
;
root
=
cJSON_Parse
(
content
);
if
(
root
==
NULL
)
{
dError
(
"failed to read %s since invalid json format"
,
tsMnode
.
file
);
dError
(
"failed to read %s since invalid json format"
,
file
);
goto
PRASE_MNODE_OVER
;
}
cJSON
*
deployed
=
cJSON_GetObjectItem
(
root
,
"deployed"
);
if
(
!
deployed
||
deployed
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s since deployed not found"
,
tsMnode
.
file
);
dError
(
"failed to read %s since deployed not found"
,
file
);
goto
PRASE_MNODE_OVER
;
}
tsMnode
.
deployed
=
atoi
(
deployed
->
valuestring
);
cJSON
*
dropped
=
cJSON_GetObjectItem
(
root
,
"dropped"
);
if
(
!
dropped
||
dropped
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s since dropped not found"
,
tsMnode
.
file
);
dError
(
"failed to read %s since dropped not found"
,
file
);
goto
PRASE_MNODE_OVER
;
}
tsMnode
.
dropped
=
atoi
(
dropped
->
valuestring
);
dInfo
(
"succcessed to read file %s"
,
tsMnode
.
file
);
code
=
0
;
dInfo
(
"succcessed to read file %s"
,
file
);
PRASE_MNODE_OVER:
if
(
content
!=
NULL
)
free
(
content
);
if
(
root
!=
NULL
)
cJSON_Delete
(
root
);
if
(
fp
!=
NULL
)
fclose
(
fp
);
return
0
;
return
code
;
}
static
int32_t
dnodeWriteMnode
()
{
FILE
*
fp
=
fopen
(
tsMnode
.
file
,
"w"
);
static
int32_t
dnodeWriteMnodeFile
()
{
char
file
[
PATH_MAX
+
20
]
=
{
0
};
char
realfile
[
PATH_MAX
+
20
]
=
{
0
};
snprintf
(
file
,
sizeof
(
file
),
"%s/mnode.json.bak"
,
tsDnodeDir
);
snprintf
(
realfile
,
sizeof
(
realfile
),
"%s/mnode.json"
,
tsDnodeDir
);
FILE
*
fp
=
fopen
(
file
,
"w"
);
if
(
!
fp
)
{
dError
(
"failed to write %s since %s"
,
tsMnode
.
file
,
strerror
(
errno
));
return
-
1
;
dError
(
"failed to write %s since %s"
,
file
,
strerror
(
errno
));
return
TSDB_CODE_DND_WRITE_MNODE_FILE_ERROR
;
}
int32_t
len
=
0
;
...
...
@@ -97,197 +144,452 @@ static int32_t dnodeWriteMnode() {
taosFsyncFile
(
fileno
(
fp
));
fclose
(
fp
);
free
(
content
);
terrno
=
0
;
dInfo
(
"successed to write %s"
,
tsMnode
.
file
);
int32_t
code
=
taosRenameFile
(
file
,
realfile
);
if
(
code
!=
0
)
{
dError
(
"failed to rename %s since %s"
,
file
,
tstrerror
(
code
));
return
TSDB_CODE_DND_WRITE_MNODE_FILE_ERROR
;
}
dInfo
(
"successed to write %s"
,
realfile
);
return
0
;
}
static
int32_t
dnodeStartMnode
(
SCreateMnodeMsg
*
pCfg
)
{
int32_t
code
=
0
;
static
int32_t
dnodeStartMnode
()
{
int32_t
code
=
dnodeAllocMnodeReadQueue
();
if
(
code
!=
0
)
{
return
code
;
}
if
(
pCfg
->
dnodeId
!=
dnodeGetDnodeId
())
{
code
=
TSDB_CODE_DND_DNODE_ID_NOT_MATCHED
;
dError
(
"failed to start mnode since %s"
,
tstrerror
(
code
));
code
=
dnodeAllocMnodeWriteQueue
();
if
(
code
!=
0
)
{
return
code
;
}
if
(
tsMnode
.
dropped
)
{
code
=
TSDB_CODE_DND_MNODE_ALREADY_DROPPED
;
dError
(
"failed to start mnode since %s"
,
tstrerror
(
code
));
code
=
dnodeAllocMnodeApplyQueue
();
if
(
code
!=
0
)
{
return
code
;
}
if
(
tsMnode
.
deployed
)
{
dError
(
"failed to start mnode since its already deployed"
);
return
0
;
code
=
dnodeAllocMnodeSyncQueue
();
if
(
code
!=
0
)
{
return
code
;
}
taosWLockLatch
(
&
tsMnode
.
latch
);
tsMnode
.
deployed
=
1
;
t
sMnode
.
dropped
=
0
;
t
aosWUnLockLatch
(
&
tsMnode
.
latch
)
;
code
=
dnodeWriteMnode
();
return
code
;
}
static
void
dnodeStopMnode
()
{
taosWLockLatch
(
&
tsMnode
.
latch
);
tsMnode
.
deployed
=
0
;
taosWUnLockLatch
(
&
tsMnode
.
latch
);
dnodeReleaseMnode
();
while
(
tsMnode
.
refCount
>
0
)
taosMsleep
(
10
);
while
(
!
taosQueueEmpty
(
tsMnode
.
pReadQ
))
taosMsleep
(
10
);
while
(
!
taosQueueEmpty
(
tsMnode
.
pApplyQ
))
taosMsleep
(
10
);
while
(
!
taosQueueEmpty
(
tsMnode
.
pWriteQ
))
taosMsleep
(
10
);
while
(
!
taosQueueEmpty
(
tsMnode
.
pSyncQ
))
taosMsleep
(
10
);
dnodeFreeMnodeReadQueue
();
dnodeFreeMnodeWriteQueue
();
dnodeFreeMnodeApplyQueue
();
dnodeFreeMnodeSyncQueue
();
}
static
int32_t
dnodeUnDeployMnode
()
{
tsMnode
.
dropped
=
1
;
int32_t
code
=
dnodeWriteMnodeFile
();
if
(
code
!=
0
)
{
tsMnode
.
dropped
=
0
;
dError
(
"failed to undeploy mnode since %s"
,
tstrerror
(
code
));
return
code
;
}
dnodeStopMnode
();
mnodeUnDeploy
(
tsMnodeDir
);
dnodeWriteMnodeFile
();
return
code
;
}
static
int32_t
dnodeDeployMnode
(
SMnodeCfg
*
pCfg
)
{
int32_t
code
=
mnodeDeploy
(
tsMnodeDir
,
pCfg
);
if
(
code
!=
0
)
{
tsMnode
.
deployed
=
0
;
dError
(
"failed to start mnode since %s"
,
tstrerror
(
code
));
dError
(
"failed to deploy mnode since %s"
,
tstrerror
(
code
));
return
code
;
}
code
=
mnodeDeploy
();
code
=
dnodeStartMnode
();
if
(
code
!=
0
)
{
tsMnode
.
deployed
=
0
;
dError
(
"failed to
start
mnode since %s"
,
tstrerror
(
code
));
dnodeUnDeployMnode
()
;
dError
(
"failed to
deploy
mnode since %s"
,
tstrerror
(
code
));
return
code
;
}
code
=
mnodeStart
();
code
=
dnodeWriteMnodeFile
();
if
(
code
!=
0
)
{
tsMnode
.
deployed
=
0
;
dError
(
"failed to
start
mnode since %s"
,
tstrerror
(
code
));
dnodeUnDeployMnode
()
;
dError
(
"failed to
deploy
mnode since %s"
,
tstrerror
(
code
));
return
code
;
}
tsMnode
.
deployed
=
1
;
return
0
;
dInfo
(
"deploy mnode success"
)
;
return
code
;
}
static
int32_t
dnodeDropMnode
(
SDropMnodeMsg
*
pCfg
)
{
int32_t
code
=
0
;
static
int32_t
dnodeAlterMnode
(
SMnodeCfg
*
pCfg
)
{
int32_t
code
=
dnodeAcquireMnode
();
if
(
code
==
0
)
{
code
=
mnodeAlter
(
pCfg
);
dnodeReleaseMnode
();
}
return
code
;
}
static
SCreateMnodeMsg
*
dnodeParseCreateMnodeMsg
(
SRpcMsg
*
pRpcMsg
)
{
SCreateMnodeMsg
*
pMsg
=
pRpcMsg
->
pCont
;
pMsg
->
dnodeId
=
htonl
(
pMsg
->
dnodeId
);
for
(
int32_t
i
=
0
;
i
<
pMsg
->
replica
;
++
i
)
{
pMsg
->
replicas
[
i
].
port
=
htons
(
pMsg
->
replicas
[
i
].
port
);
}
return
pMsg
;
}
static
int32_t
dnodeProcessCreateMnodeReq
(
SRpcMsg
*
pRpcMsg
)
{
SAlterMnodeMsg
*
pMsg
=
(
SAlterMnodeMsg
*
)
dnodeParseCreateMnodeMsg
(
pRpcMsg
->
pCont
);
if
(
pMsg
->
dnodeId
!=
dnodeGetDnodeId
())
{
return
TSDB_CODE_DND_MNODE_ID_NOT_MATCH_DNODE
;
}
else
{
SMnodeCfg
cfg
=
{
0
};
cfg
.
replica
=
pMsg
->
replica
;
memcpy
(
cfg
.
replicas
,
pMsg
->
replicas
,
sizeof
(
SReplica
)
*
sizeof
(
TSDB_MAX_REPLICA
));
return
dnodeDeployMnode
(
&
cfg
);
}
}
static
int32_t
dnodeProcessAlterMnodeReq
(
SRpcMsg
*
pRpcMsg
)
{
SAlterMnodeMsg
*
pMsg
=
(
SAlterMnodeMsg
*
)
dnodeParseCreateMnodeMsg
(
pRpcMsg
->
pCont
);
if
(
pMsg
->
dnodeId
!=
dnodeGetDnodeId
())
{
return
TSDB_CODE_DND_MNODE_ID_NOT_MATCH_DNODE
;
}
else
{
SMnodeCfg
cfg
=
{
0
};
cfg
.
replica
=
pMsg
->
replica
;
memcpy
(
cfg
.
replicas
,
pMsg
->
replicas
,
sizeof
(
SReplica
)
*
sizeof
(
TSDB_MAX_REPLICA
));
return
dnodeAlterMnode
(
&
cfg
);
}
}
static
int32_t
dnodeProcessDropMnodeReq
(
SRpcMsg
*
pMsg
)
{
SAlterMnodeMsg
*
pCfg
=
pMsg
->
pCont
;
pCfg
->
dnodeId
=
htonl
(
pCfg
->
dnodeId
);
if
(
pCfg
->
dnodeId
!=
dnodeGetDnodeId
())
{
code
=
TSDB_CODE_DND_DNODE_ID_NOT_MATCHED
;
dError
(
"failed to drop mnode since %s"
,
tstrerror
(
code
));
return
code
;
return
TSDB_CODE_DND_MNODE_ID_NOT_MATCH_DNODE
;
}
else
{
return
dnodeUnDeployMnode
()
;
}
}
if
(
tsMnode
.
dropped
)
{
code
=
TSDB_CODE_DND_MNODE_ALREADY_DROPPED
;
dError
(
"failed to drop mnode since %s"
,
tstrerror
(
code
));
return
code
;
static
void
dnodeProcessMnodeMgmtQueue
(
void
*
unused
,
SRpcMsg
*
pMsg
)
{
int32_t
code
=
0
;
switch
(
pMsg
->
msgType
)
{
case
TSDB_MSG_TYPE_CREATE_MNODE_IN
:
code
=
dnodeProcessCreateMnodeReq
(
pMsg
);
break
;
case
TSDB_MSG_TYPE_ALTER_MNODE_IN
:
code
=
dnodeProcessAlterMnodeReq
(
pMsg
);
break
;
case
TSDB_MSG_TYPE_DROP_MNODE_IN
:
code
=
dnodeProcessDropMnodeReq
(
pMsg
);
break
;
default:
code
=
TSDB_CODE_DND_MSG_NOT_PROCESSED
;
break
;
}
if
(
!
tsMnode
.
deployed
)
{
dError
(
"failed to drop mnode since not deployed"
);
return
0
;
SRpcMsg
rsp
=
{.
code
=
code
,
.
handle
=
pMsg
->
handle
};
rpcSendResponse
(
&
rsp
);
rpcFreeCont
(
pMsg
->
pCont
);
taosFreeQitem
(
pMsg
);
}
static
void
dnodeProcessMnodeReadQueue
(
void
*
unused
,
SMnodeMsg
*
pMsg
)
{
mnodeProcessMsg
(
pMsg
,
MN_MSG_TYPE_READ
);
}
static
void
dnodeProcessMnodeWriteQueue
(
void
*
unused
,
SMnodeMsg
*
pMsg
)
{
mnodeProcessMsg
(
pMsg
,
MN_MSG_TYPE_WRITE
);
}
static
void
dnodeProcessMnodeApplyQueue
(
void
*
unused
,
SMnodeMsg
*
pMsg
)
{
mnodeProcessMsg
(
pMsg
,
MN_MSG_TYPE_APPLY
);
}
static
void
dnodeProcessMnodeSyncQueue
(
void
*
unused
,
SMnodeMsg
*
pMsg
)
{
mnodeProcessMsg
(
pMsg
,
MN_MSG_TYPE_SYNC
);
}
static
int32_t
dnodeWriteMnodeMsgToQueue
(
taos_queue
pQueue
,
SRpcMsg
*
pRpcMsg
)
{
int32_t
code
=
0
;
if
(
pQueue
==
NULL
)
{
code
=
TSDB_CODE_DND_MSG_NOT_PROCESSED
;
}
else
{
SMnodeMsg
*
pMsg
=
mnodeInitMsg
(
1
);
if
(
pMsg
==
NULL
)
{
code
=
TSDB_CODE_DND_OUT_OF_MEMORY
;
}
else
{
mnodeAppendMsg
(
pMsg
,
pRpcMsg
);
code
=
taosWriteQitem
(
pQueue
,
pMsg
);
}
}
mnodeStop
();
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
SRpcMsg
rsp
=
{.
handle
=
pRpcMsg
->
handle
,
.
code
=
code
};
rpcSendResponse
(
&
rsp
);
rpcFreeCont
(
pRpcMsg
->
pCont
);
}
}
tsMnode
.
deployed
=
0
;
tsMnode
.
dropped
=
1
;
void
dnodeProcessMnodeMgmtMsg
(
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
dnodeWriteMnodeMsgToQueue
(
tsMnode
.
pMgmtQ
,
pMsg
);
}
code
=
dnodeWriteMnode
();
if
(
code
!
=
0
)
{
tsMnode
.
deployed
=
1
;
tsMnode
.
dropped
=
0
;
dError
(
"failed to drop mnode since %s"
,
tstrerror
(
code
));
return
code
;
void
dnodeProcessMnodeWriteMsg
(
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
if
(
dnodeAcquireMnode
()
=
=
0
)
{
dnodeWriteMnodeMsgToQueue
(
tsMnode
.
pWriteQ
,
pMsg
)
;
dnodeReleaseMnode
()
;
}
else
{
dnodeSendRedirectMsg
(
pMsg
,
0
)
;
}
}
mnodeUnDeploy
();
void
dnodeProcessMnodeSyncMsg
(
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
int32_t
code
=
dnodeAcquireMnode
();
if
(
code
==
0
)
{
dnodeWriteMnodeMsgToQueue
(
tsMnode
.
pSyncQ
,
pMsg
);
dnodeReleaseMnode
();
}
else
{
SRpcMsg
rsp
=
{.
handle
=
pMsg
->
handle
,
.
code
=
code
};
rpcSendResponse
(
&
rsp
);
rpcFreeCont
(
pMsg
->
pCont
);
}
}
tsMnode
.
deployed
=
0
;
void
dnodeProcessMnodeReadMsg
(
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
if
(
dnodeAcquireMnode
()
==
0
)
{
dnodeWriteMnodeMsgToQueue
(
tsMnode
.
pReadQ
,
pMsg
);
dnodeReleaseMnode
();
}
else
{
dnodeSendRedirectMsg
(
pMsg
,
0
);
}
}
static
int32_t
dnodePutMsgIntoMnodeApplyQueue
(
SMnodeMsg
*
pMsg
)
{
int32_t
code
=
dnodeAcquireMnode
();
if
(
code
!=
0
)
return
code
;
code
=
taosWriteQitem
(
tsMnode
.
pApplyQ
,
pMsg
);
dnodeReleaseMnode
();
return
code
;
}
static
int32_t
dnodeAllocMnodeMgmtQueue
()
{
tsMnode
.
pMgmtQ
=
tWorkerAllocQueue
(
&
tsMnode
.
mgmtPool
,
NULL
,
(
FProcessItem
)
dnodeProcessMnodeMgmtQueue
);
if
(
tsMnode
.
pMgmtQ
==
NULL
)
{
return
TSDB_CODE_DND_OUT_OF_MEMORY
;
}
return
0
;
}
static
void
dnodeProcessCreateMnodeReq
(
SRpcMsg
*
pMsg
)
{
SCreateMnodeMsg
*
pCfg
=
pMsg
->
pCont
;
pCfg
->
dnodeId
=
htonl
(
pCfg
->
dnodeId
);
static
void
dnodeFreeMnodeMgmtQueue
()
{
tWorkerFreeQueue
(
&
tsMnode
.
mgmtPool
,
tsMnode
.
pMgmtQ
);
tsMnode
.
pMgmtQ
=
NULL
;
}
int32_t
code
=
dnodeStartMnode
(
pCfg
);
SRpcMsg
rspMsg
=
{.
handle
=
pMsg
->
handle
,
.
code
=
code
};
rpcSendResponse
(
&
rspMsg
);
rpcFreeCont
(
pMsg
->
pCont
);
static
int32_t
dnodeInitMnodeMgmtWorker
()
{
SWorkerPool
*
pPool
=
&
tsMnode
.
mgmtPool
;
pPool
->
name
=
"mnode-mgmt"
;
pPool
->
min
=
1
;
pPool
->
max
=
1
;
return
tWorkerInit
(
pPool
);
}
static
void
dnodeProcessDropMnodeReq
(
SRpcMsg
*
pMsg
)
{
SDropMnodeMsg
*
pCfg
=
pMsg
->
pCont
;
pCfg
->
dnodeId
=
htonl
(
pCfg
->
dnodeId
);
static
void
dnodeCleanupMnodeMgmtWorker
()
{
tWorkerCleanup
(
&
tsMnode
.
mgmtPool
);
}
int32_t
code
=
dnodeDropMnode
(
pCfg
);
SRpcMsg
rspMsg
=
{.
handle
=
pMsg
->
handle
,
.
code
=
code
};
rpcSendResponse
(
&
rspMsg
);
rpcFreeCont
(
pMsg
->
pCont
);
static
int32_t
dnodeAllocMnodeReadQueue
()
{
tsMnode
.
pReadQ
=
tWorkerAllocQueue
(
&
tsMnode
.
readPool
,
NULL
,
(
FProcessItem
)
dnodeProcessMnodeReadQueue
);
if
(
tsMnode
.
pReadQ
==
NULL
)
{
return
TSDB_CODE_DND_OUT_OF_MEMORY
;
}
return
0
;
}
static
bool
dnodeNeedDeployMnode
()
{
if
(
dnodeGetDnodeId
()
>
0
)
return
false
;
if
(
dnodeGetClusterId
()
>
0
)
return
false
;
if
(
strcmp
(
tsFirst
,
tsLocalEp
)
!=
0
)
return
false
;
return
true
;
static
void
dnodeFreeMnodeReadQueue
()
{
tWorkerFreeQueue
(
&
tsMnode
.
readPool
,
tsMnode
.
pReadQ
);
tsMnode
.
pReadQ
=
NULL
;
}
int32_t
dnodeInitMnode
()
{
tsMnode
.
dropped
=
0
;
tsMnode
.
deployed
=
0
;
snprintf
(
tsMnode
.
file
,
sizeof
(
tsMnode
.
file
),
"%s/mnode.json"
,
tsDnodeDir
);
static
int32_t
dnodeInitMnodeReadWorker
()
{
SWorkerPool
*
pPool
=
&
tsMnode
.
readPool
;
pPool
->
name
=
"mnode-read"
;
pPool
->
min
=
0
;
pPool
->
max
=
1
;
return
tWorkerInit
(
pPool
);
}
static
void
dnodeCleanupMnodeReadWorker
()
{
tWorkerCleanup
(
&
tsMnode
.
readPool
);
}
static
int32_t
dnodeAllocMnodeWriteQueue
()
{
tsMnode
.
pWriteQ
=
tWorkerAllocQueue
(
&
tsMnode
.
writePool
,
NULL
,
(
FProcessItem
)
dnodeProcessMnodeWriteQueue
);
if
(
tsMnode
.
pWriteQ
==
NULL
)
{
return
TSDB_CODE_DND_OUT_OF_MEMORY
;
}
return
0
;
}
static
void
dnodeFreeMnodeWriteQueue
()
{
tWorkerFreeQueue
(
&
tsMnode
.
writePool
,
tsMnode
.
pWriteQ
);
tsMnode
.
pWriteQ
=
NULL
;
}
static
int32_t
dnodeAllocMnodeApplyQueue
()
{
tsMnode
.
pApplyQ
=
tWorkerAllocQueue
(
&
tsMnode
.
writePool
,
NULL
,
(
FProcessItem
)
dnodeProcessMnodeApplyQueue
);
if
(
tsMnode
.
pApplyQ
==
NULL
)
{
return
TSDB_CODE_DND_OUT_OF_MEMORY
;
}
return
0
;
}
static
void
dnodeFreeMnodeApplyQueue
()
{
tWorkerFreeQueue
(
&
tsMnode
.
writePool
,
tsMnode
.
pApplyQ
);
tsMnode
.
pApplyQ
=
NULL
;
}
static
int32_t
dnodeInitMnodeWriteWorker
()
{
SWorkerPool
*
pPool
=
&
tsMnode
.
writePool
;
pPool
->
name
=
"mnode-write"
;
pPool
->
min
=
0
;
pPool
->
max
=
1
;
return
tWorkerInit
(
pPool
);
}
static
void
dnodeCleanupMnodeWriteWorker
()
{
tWorkerCleanup
(
&
tsMnode
.
writePool
);
}
static
int32_t
dnodeAllocMnodeSyncQueue
()
{
tsMnode
.
pSyncQ
=
tWorkerAllocQueue
(
&
tsMnode
.
syncPool
,
NULL
,
(
FProcessItem
)
dnodeProcessMnodeSyncQueue
);
if
(
tsMnode
.
pSyncQ
==
NULL
)
{
return
TSDB_CODE_DND_OUT_OF_MEMORY
;
}
return
0
;
}
static
void
dnodeFreeMnodeSyncQueue
()
{
tWorkerFreeQueue
(
&
tsMnode
.
syncPool
,
tsMnode
.
pSyncQ
);
tsMnode
.
pSyncQ
=
NULL
;
}
static
int32_t
dnodeInitMnodeSyncWorker
()
{
SWorkerPool
*
pPool
=
&
tsMnode
.
syncPool
;
pPool
->
name
=
"mnode-sync"
;
pPool
->
min
=
0
;
pPool
->
max
=
1
;
return
tWorkerInit
(
pPool
);
}
static
void
dnodeCleanupMnodeSyncWorker
()
{
tWorkerCleanup
(
&
tsMnode
.
syncPool
);
}
static
int32_t
dnodeInitMnodeModule
()
{
taosInitRWLatch
(
&
tsMnode
.
latch
);
SMnodePara
para
;
para
.
fp
.
GetDnodeEp
=
dnodeGetDnodeEp
;
para
.
fp
.
SendMsgToDnode
=
dnodeSendMsgToDnode
;
para
.
fp
.
SendMsgToMnode
=
dnodeSendMsgToMnode
;
para
.
fp
.
SendRedirectMsg
=
dnodeSendRedirectMsg
;
para
.
dnodeId
=
dnodeGetDnodeId
();
para
.
clusterId
=
dnodeGetClusterId
();
para
.
SendMsgToDnode
=
dnodeSendMsgToDnode
;
para
.
SendMsgToMnode
=
dnodeSendMsgToMnode
;
para
.
SendRedirectMsg
=
dnodeSendRedirectMsg
;
int32_t
code
=
mnodeInit
(
para
);
if
(
code
!=
0
)
{
dError
(
"failed to init mnode module since %s"
,
tstrerror
(
code
));
return
code
;
}
return
mnodeInit
(
para
);
}
code
=
dnodeReadMnode
();
static
void
dnodeCleanupMnodeModule
()
{
mnodeCleanup
();
}
static
bool
dnodeNeedDeployMnode
()
{
if
(
dnodeGetDnodeId
()
>
0
)
return
false
;
if
(
dnodeGetClusterId
()
>
0
)
return
false
;
if
(
strcmp
(
tsFirst
,
tsLocalEp
)
!=
0
)
return
false
;
return
true
;
}
static
int32_t
dnodeOpenMnode
()
{
int32_t
code
=
dnodeReadMnodeFile
();
if
(
code
!=
0
)
{
dError
(
"failed to read
file:%s since %s"
,
tsMnode
.
file
,
tstrerror
(
code
));
dError
(
"failed to read
open mnode since %s"
,
tstrerror
(
code
));
return
code
;
}
if
(
tsMnode
.
dropped
)
{
dError
(
"mnode already dropped, undeploy it"
);
mnodeUnDeploy
();
return
0
;
dInfo
(
"mnode already dropped, undeploy it"
);
return
dnodeUnDeployMnode
();
}
if
(
!
tsMnode
.
deployed
)
{
bool
needDeploy
=
dnodeNeedDeployMnode
();
if
(
needDeploy
)
{
code
=
mnodeDeploy
();
}
else
{
return
0
;
}
if
(
code
!=
0
)
{
dError
(
"failed to deploy mnode since %s"
,
tstrerror
(
code
));
return
code
;
}
tsMnode
.
deployed
=
1
;
if
(
!
needDeploy
)
return
0
;
dInfo
(
"start to deploy mnode"
);
SMnodeCfg
cfg
=
{.
replica
=
1
};
cfg
.
replicas
[
0
].
port
=
tsServerPort
;
tstrncpy
(
cfg
.
replicas
[
0
].
fqdn
,
tsLocalFqdn
,
TSDB_FQDN_LEN
);
return
dnodeDeployMnode
(
&
cfg
);
}
else
{
dInfo
(
"start to open mnode"
);
return
dnodeStartMnode
();
}
return
mnodeStart
();
}
void
dnodeCleanup
Mnode
()
{
if
(
tsMnode
.
deployed
)
{
mnodeStop
();
static
void
dnodeClose
Mnode
()
{
if
(
dnodeAcquireMnode
()
==
0
)
{
dnodeStopMnode
();
}
}
mnodeCleanup
();
int32_t
dnodeInitMnode
()
{
dInfo
(
"dnode-mnode start to init"
);
SSteps
*
pSteps
=
taosStepInit
(
6
,
dnodeReportStartup
);
taosStepAdd
(
pSteps
,
"dnode-mnode-env"
,
dnodeInitMnodeModule
,
dnodeCleanupMnodeModule
);
taosStepAdd
(
pSteps
,
"dnode-mnode-mgmt"
,
dnodeInitMnodeMgmtWorker
,
dnodeCleanupMnodeMgmtWorker
);
taosStepAdd
(
pSteps
,
"dnode-mnode-read"
,
dnodeInitMnodeReadWorker
,
dnodeCleanupMnodeReadWorker
);
taosStepAdd
(
pSteps
,
"dnode-mnode-write"
,
dnodeInitMnodeWriteWorker
,
dnodeCleanupMnodeWriteWorker
);
taosStepAdd
(
pSteps
,
"dnode-mnode-sync"
,
dnodeInitMnodeSyncWorker
,
dnodeCleanupMnodeSyncWorker
);
taosStepAdd
(
pSteps
,
"dnode-mnode"
,
dnodeOpenMnode
,
dnodeCloseMnode
);
tsMnode
.
pSteps
=
pSteps
;
int32_t
code
=
taosStepExec
(
pSteps
);
if
(
code
!=
0
)
{
dError
(
"dnode-mnode init failed since %s"
,
tstrerror
(
code
));
}
else
{
dInfo
(
"dnode-mnode is initialized"
);
}
}
void
dnodeProcessMnodeMsg
(
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
switch
(
pMsg
->
msgType
)
{
case
TSDB_MSG_TYPE_CREATE_MNODE_IN
:
dnodeProcessCreateMnodeReq
(
pMsg
);
break
;
case
TSDB_MSG_TYPE_DROP_MNODE_IN
:
dnodeProcessDropMnodeReq
(
pMsg
);
break
;
default:
mnodeProcessMsg
(
pMsg
);
void
dnodeCleanupMnode
()
{
if
(
tsMnode
.
pSteps
==
NULL
)
{
dInfo
(
"dnode-mnode start to clean up"
);
taosStepCleanup
(
tsMnode
.
pSteps
);
tsMnode
.
pSteps
=
NULL
;
dInfo
(
"dnode-mnode is cleaned up"
);
}
}
int32_t
dnodeGetUserAuthFromMnode
(
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
)
{
return
mnodeRetriveAuth
(
user
,
spi
,
encrypt
,
secret
,
ckey
);
int32_t
code
=
dnodeAcquireMnode
();
if
(
code
!=
0
)
{
dTrace
(
"failed to get user auth since mnode not deployed"
);
return
code
;
}
code
=
mnodeRetriveAuth
(
user
,
spi
,
encrypt
,
secret
,
ckey
);
dnodeReleaseMnode
();
return
code
;
}
\ No newline at end of file
source/dnode/mgmt/src/dnodeTransport.c
浏览文件 @
f13b6149
...
...
@@ -51,76 +51,78 @@ static void dnodeInitMsgFp() {
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_MQ_RESET
]
=
dnodeProcessVnodeWriteMsg
;
// msg from client to mnode
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CONNECT
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_ACCT
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_ACCT
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_ACCT
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_USER
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_USER
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_USER
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_DNODE
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CONFIG_DNODE
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_DNODE
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_DB
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_DB
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_USE_DB
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_DB
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_SYNC_DB
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_TOPIC
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_TOPIC
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_TOPIC
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_FUNCTION
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_FUNCTION
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_FUNCTION
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_STABLE
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_
DROP_STABLE
]
=
dnodeProcessMnod
eMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_
ALTER_STABLE
]
=
dnodeProcessMnod
eMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_STABLE_VGROUP
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_KILL_QUERY
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_KILL_CONN
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_HEARTBEAT
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_SHOW
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_SHOW_RETRIEVE
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_COMPACT_VNODE
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CONNECT
]
=
dnodeProcessMnode
Read
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_ACCT
]
=
dnodeProcessMnode
Write
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_ACCT
]
=
dnodeProcessMnode
Write
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_ACCT
]
=
dnodeProcessMnode
Write
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_USER
]
=
dnodeProcessMnode
Write
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_USER
]
=
dnodeProcessMnode
Write
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_USER
]
=
dnodeProcessMnode
Write
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_DNODE
]
=
dnodeProcessMnode
Write
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CONFIG_DNODE
]
=
dnodeProcessMnode
Write
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_DNODE
]
=
dnodeProcessMnode
Write
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_DB
]
=
dnodeProcessMnode
Write
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_DB
]
=
dnodeProcessMnode
Write
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_USE_DB
]
=
dnodeProcessMnode
Write
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_DB
]
=
dnodeProcessMnode
Write
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_SYNC_DB
]
=
dnodeProcessMnode
Write
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_TOPIC
]
=
dnodeProcessMnode
Write
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_TOPIC
]
=
dnodeProcessMnode
Write
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_TOPIC
]
=
dnodeProcessMnode
Write
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_FUNCTION
]
=
dnodeProcessMnode
Write
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_FUNCTION
]
=
dnodeProcessMnode
Write
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_FUNCTION
]
=
dnodeProcessMnode
Write
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_STABLE
]
=
dnodeProcessMnode
Write
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_
ALTER_STABLE
]
=
dnodeProcessMnodeWrit
eMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_
DROP_STABLE
]
=
dnodeProcessMnodeWrit
eMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_STABLE_VGROUP
]
=
dnodeProcessMnode
Read
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_KILL_QUERY
]
=
dnodeProcessMnode
Write
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_KILL_CONN
]
=
dnodeProcessMnode
Write
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_HEARTBEAT
]
=
dnodeProcessMnode
Read
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_SHOW
]
=
dnodeProcessMnode
Read
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_SHOW_RETRIEVE
]
=
dnodeProcessMnode
Read
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC
]
=
dnodeProcessMnode
Read
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_COMPACT_VNODE
]
=
dnodeProcessMnode
Write
Msg
;
// message from client to dnode
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_NETWORK_TEST
]
=
dnodeProcessDnodeMsg
;
// message from mnode to vnode
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_STABLE_IN
]
=
dnodeProcessVnodeWriteMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_STABLE_IN_RSP
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_STABLE_IN_RSP
]
=
dnodeProcessMnode
Write
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_STABLE_IN
]
=
dnodeProcessVnodeWriteMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_STABLE_IN_RSP
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_STABLE_IN_RSP
]
=
dnodeProcessMnode
Write
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_STABLE_IN
]
=
dnodeProcessVnodeWriteMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_STABLE_IN
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_STABLE_IN
]
=
dnodeProcessMnode
Write
Msg
;
// message from mnode to dnode
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_VNODE_IN
]
=
dnodeProcessVnodeMgmtMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_VNODE_IN_RSP
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_VNODE_IN_RSP
]
=
dnodeProcessMnode
Write
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_VNODE_IN
]
=
dnodeProcessVnodeMgmtMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_VNODE_IN_RSP
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_VNODE_IN_RSP
]
=
dnodeProcessMnode
Write
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_VNODE_IN
]
=
dnodeProcessVnodeMgmtMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_VNODE_IN_RSP
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_VNODE_IN_RSP
]
=
dnodeProcessMnode
Write
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_SYNC_VNODE_IN
]
=
dnodeProcessVnodeMgmtMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_SYNC_VNODE_IN_RSP
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_SYNC_VNODE_IN_RSP
]
=
dnodeProcessMnode
Write
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_AUTH_VNODE_IN
]
=
dnodeProcessVnodeMgmtMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_AUTH_VNODE_IN_RSP
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_AUTH_VNODE_IN_RSP
]
=
dnodeProcessMnode
Write
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_COMPACT_VNODE_IN
]
=
dnodeProcessVnodeMgmtMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_COMPACT_VNODE_IN_RSP
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_MNODE_IN
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_MNODE_IN_RSP
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_MNODE_IN
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_MNODE_IN_RSP
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_COMPACT_VNODE_IN_RSP
]
=
dnodeProcessMnodeWriteMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_MNODE_IN
]
=
dnodeProcessMnodeMgmtMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CREATE_MNODE_IN_RSP
]
=
dnodeProcessMnodeWriteMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_MNODE_IN
]
=
dnodeProcessMnodeMgmtMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_ALTER_MNODE_IN_RSP
]
=
dnodeProcessMnodeWriteMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_MNODE_IN
]
=
dnodeProcessMnodeMgmtMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_DROP_MNODE_IN_RSP
]
=
dnodeProcessMnodeWriteMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CONFIG_DNODE_IN
]
=
dnodeProcessDnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CONFIG_DNODE_IN_RSP
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_CONFIG_DNODE_IN_RSP
]
=
dnodeProcessMnode
Write
Msg
;
// message from dnode to mnode
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_AUTH
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_AUTH
]
=
dnodeProcessMnode
Read
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_AUTH_RSP
]
=
dnodeProcessDnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_GRANT
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_GRANT
]
=
dnodeProcessMnode
Write
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_GRANT_RSP
]
=
dnodeProcessDnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_STATUS
]
=
dnodeProcessMnodeMsg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_STATUS
]
=
dnodeProcessMnode
Write
Msg
;
tsTrans
.
msgFp
[
TSDB_MSG_TYPE_STATUS_RSP
]
=
dnodeProcessDnodeMsg
;
}
...
...
source/dnode/mgmt/src/dnodeVnodes.c
浏览文件 @
f13b6149
...
...
@@ -983,7 +983,7 @@ static int32_t dnodeInitVnodeModule() {
int32_t
dnodeInitVnodes
()
{
dInfo
(
"dnode-vnodes start to init"
);
SSteps
*
pSteps
=
taosStepInit
(
3
,
dnodeReportStartup
);
SSteps
*
pSteps
=
taosStepInit
(
6
,
dnodeReportStartup
);
taosStepAdd
(
pSteps
,
"dnode-vnode-env"
,
dnodeInitVnodeModule
,
vnodeCleanup
);
taosStepAdd
(
pSteps
,
"dnode-vnode-mgmt"
,
dnodeInitVnodeMgmtWorker
,
dnodeCleanupVnodeMgmtWorker
);
taosStepAdd
(
pSteps
,
"dnode-vnode-read"
,
dnodeInitVnodeReadWorker
,
dnodeCleanupVnodeReadWorker
);
...
...
source/dnode/mnode/inc/mnodeInt.h
浏览文件 @
f13b6149
...
...
@@ -32,7 +32,6 @@ EMnStatus mnodeGetStatus();
void
mnodeSendMsgToDnode
(
struct
SEpSet
*
epSet
,
struct
SRpcMsg
*
rpcMsg
);
void
mnodeSendMsgToMnode
(
struct
SRpcMsg
*
rpcMsg
);
void
mnodeSendRedirectMsg
(
struct
SRpcMsg
*
rpcMsg
,
bool
forShell
);
void
mnodeGetDnodeEp
(
int32_t
dnodeId
,
char
*
ep
,
char
*
fqdn
,
uint16_t
*
port
);
#ifdef __cplusplus
}
...
...
source/dnode/mnode/inc/mnodeWorker.h
浏览文件 @
f13b6149
...
...
@@ -24,7 +24,6 @@ extern "C" {
int32_t
mnodeInitWorker
();
void
mnodeCleanupWorker
();
void
mnodeProcessMsg
(
SRpcMsg
*
rpcMsg
);
void
mnodeSendRsp
(
SMnMsg
*
pMsg
,
int32_t
code
);
void
mnodeReDispatchToWriteQueue
(
SMnMsg
*
pMsg
);
...
...
source/dnode/mnode/src/mnodeTelem.c
浏览文件 @
f13b6149
...
...
@@ -172,21 +172,21 @@ static void mnodeAddVersionInfo(SBufferWriter* bw) {
}
static
void
mnodeAddRuntimeInfo
(
SBufferWriter
*
bw
)
{
SMnode
Stat
stat
=
{
0
};
if
(
mnodeGet
Statistics
(
&
stat
)
!=
0
)
{
SMnode
Load
load
=
{
0
};
if
(
mnodeGet
Load
(
&
load
)
!=
0
)
{
return
;
}
mnodeAddIntField
(
bw
,
"numOfDnode"
,
stat
.
numOfDnode
);
mnodeAddIntField
(
bw
,
"numOfMnode"
,
stat
.
numOfMnode
);
mnodeAddIntField
(
bw
,
"numOfVgroup"
,
stat
.
numOfVgroup
);
mnodeAddIntField
(
bw
,
"numOfDatabase"
,
stat
.
numOfDatabase
);
mnodeAddIntField
(
bw
,
"numOfSuperTable"
,
stat
.
numOfSuperTable
);
mnodeAddIntField
(
bw
,
"numOfChildTable"
,
stat
.
numOfChildTable
);
mnodeAddIntField
(
bw
,
"numOfColumn"
,
stat
.
numOfColumn
);
mnodeAddIntField
(
bw
,
"numOfPoint"
,
stat
.
totalPoints
);
mnodeAddIntField
(
bw
,
"totalStorage"
,
stat
.
totalStorage
);
mnodeAddIntField
(
bw
,
"compStorage"
,
stat
.
compStorage
);
mnodeAddIntField
(
bw
,
"numOfDnode"
,
load
.
numOfDnode
);
mnodeAddIntField
(
bw
,
"numOfMnode"
,
load
.
numOfMnode
);
mnodeAddIntField
(
bw
,
"numOfVgroup"
,
load
.
numOfVgroup
);
mnodeAddIntField
(
bw
,
"numOfDatabase"
,
load
.
numOfDatabase
);
mnodeAddIntField
(
bw
,
"numOfSuperTable"
,
load
.
numOfSuperTable
);
mnodeAddIntField
(
bw
,
"numOfChildTable"
,
load
.
numOfChildTable
);
mnodeAddIntField
(
bw
,
"numOfColumn"
,
load
.
numOfColumn
);
mnodeAddIntField
(
bw
,
"numOfPoint"
,
load
.
totalPoints
);
mnodeAddIntField
(
bw
,
"totalStorage"
,
load
.
totalStorage
);
mnodeAddIntField
(
bw
,
"compStorage"
,
load
.
compStorage
);
}
static
void
mnodeSendTelemetryReport
()
{
...
...
source/dnode/mnode/src/mnodeWorker.c
浏览文件 @
f13b6149
...
...
@@ -39,7 +39,7 @@ static struct {
void
(
*
msgFp
[
TSDB_MSG_TYPE_MAX
])(
SRpcMsg
*
pMsg
);
}
tsMworker
=
{
0
};
static
SMnMsg
*
mnodeInitMsg
(
SRpcMsg
*
pRpcMsg
)
{
static
SMnMsg
*
mnodeInitMsg
2
(
SRpcMsg
*
pRpcMsg
)
{
int32_t
size
=
sizeof
(
SMnMsg
)
+
pRpcMsg
->
contLen
;
SMnMsg
*
pMsg
=
taosAllocateQitem
(
size
);
...
...
@@ -62,7 +62,7 @@ static SMnMsg *mnodeInitMsg(SRpcMsg *pRpcMsg) {
return
pMsg
;
}
static
void
mnodeCleanupMsg
(
SMnMsg
*
pMsg
)
{
static
void
mnodeCleanupMsg
2
(
SMnMsg
*
pMsg
)
{
if
(
pMsg
==
NULL
)
return
;
if
(
pMsg
->
rpcMsg
.
pCont
!=
pMsg
->
pCont
)
{
tfree
(
pMsg
->
rpcMsg
.
pCont
);
...
...
@@ -75,7 +75,7 @@ static void mnodeDispatchToWriteQueue(SRpcMsg *pRpcMsg) {
if
(
mnodeGetStatus
()
!=
MN_STATUS_READY
||
tsMworker
.
writeQ
==
NULL
)
{
mnodeSendRedirectMsg
(
pRpcMsg
,
true
);
}
else
{
SMnMsg
*
pMsg
=
mnodeInitMsg
(
pRpcMsg
);
SMnMsg
*
pMsg
=
mnodeInitMsg
2
(
pRpcMsg
);
if
(
pMsg
==
NULL
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
pRpcMsg
->
handle
,
.
code
=
TSDB_CODE_MND_INVALID_USER
};
rpcSendResponse
(
&
rpcRsp
);
...
...
@@ -91,7 +91,7 @@ static void mnodeDispatchToWriteQueue(SRpcMsg *pRpcMsg) {
void
mnodeReDispatchToWriteQueue
(
SMnMsg
*
pMsg
)
{
if
(
mnodeGetStatus
()
!=
MN_STATUS_READY
||
tsMworker
.
writeQ
==
NULL
)
{
mnodeSendRedirectMsg
(
&
pMsg
->
rpcMsg
,
true
);
mnodeCleanupMsg
(
pMsg
);
mnodeCleanupMsg
2
(
pMsg
);
}
else
{
taosWriteQitem
(
tsMworker
.
writeQ
,
pMsg
);
}
...
...
@@ -101,7 +101,7 @@ static void mnodeDispatchToReadQueue(SRpcMsg *pRpcMsg) {
if
(
mnodeGetStatus
()
!=
MN_STATUS_READY
||
tsMworker
.
readQ
==
NULL
)
{
mnodeSendRedirectMsg
(
pRpcMsg
,
true
);
}
else
{
SMnMsg
*
pMsg
=
mnodeInitMsg
(
pRpcMsg
);
SMnMsg
*
pMsg
=
mnodeInitMsg
2
(
pRpcMsg
);
if
(
pMsg
==
NULL
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
pRpcMsg
->
handle
,
.
code
=
TSDB_CODE_MND_INVALID_USER
};
rpcSendResponse
(
&
rpcRsp
);
...
...
@@ -118,7 +118,7 @@ static void mnodeDispatchToPeerQueue(SRpcMsg *pRpcMsg) {
if
(
mnodeGetStatus
()
!=
MN_STATUS_READY
||
tsMworker
.
peerReqQ
==
NULL
)
{
mnodeSendRedirectMsg
(
pRpcMsg
,
false
);
}
else
{
SMnMsg
*
pMsg
=
mnodeInitMsg
(
pRpcMsg
);
SMnMsg
*
pMsg
=
mnodeInitMsg
2
(
pRpcMsg
);
if
(
pMsg
==
NULL
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
pRpcMsg
->
handle
,
.
code
=
TSDB_CODE_MND_INVALID_USER
};
rpcSendResponse
(
&
rpcRsp
);
...
...
@@ -133,7 +133,7 @@ static void mnodeDispatchToPeerQueue(SRpcMsg *pRpcMsg) {
}
void
mnodeDispatchToPeerRspQueue
(
SRpcMsg
*
pRpcMsg
)
{
SMnMsg
*
pMsg
=
mnodeInitMsg
(
pRpcMsg
);
SMnMsg
*
pMsg
=
mnodeInitMsg
2
(
pRpcMsg
);
if
(
pMsg
==
NULL
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
pRpcMsg
->
handle
,
.
code
=
TSDB_CODE_MND_INVALID_USER
};
rpcSendResponse
(
&
rpcRsp
);
...
...
@@ -162,7 +162,7 @@ void mnodeSendRsp(SMnMsg *pMsg, int32_t code) {
};
rpcSendResponse
(
&
rpcRsp
);
mnodeCleanupMsg
(
pMsg
);
mnodeCleanupMsg
2
(
pMsg
);
}
static
void
mnodeInitMsgFp
()
{
...
...
@@ -405,7 +405,7 @@ static void mnodeProcessPeerRsp(SMnMsg *pMsg, void *unused) {
if
(
!
mnodeIsMaster
())
{
mError
(
"msg:%p, ahandle:%p type:%s not processed for not master"
,
pRpcMsg
,
pRpcMsg
->
ahandle
,
taosMsg
[
msgType
]);
mnodeCleanupMsg
(
pMsg
);
mnodeCleanupMsg
2
(
pMsg
);
}
if
(
tsMworker
.
peerRspFp
[
msgType
])
{
...
...
@@ -414,7 +414,7 @@ static void mnodeProcessPeerRsp(SMnMsg *pMsg, void *unused) {
mError
(
"msg:%p, ahandle:%p type:%s is not processed"
,
pRpcMsg
,
pRpcMsg
->
ahandle
,
taosMsg
[
msgType
]);
}
mnodeCleanupMsg
(
pMsg
);
mnodeCleanupMsg
2
(
pMsg
);
}
int32_t
mnodeInitWorker
()
{
...
...
@@ -486,10 +486,9 @@ void mnodeCleanupWorker() {
mInfo
(
"mnode worker is closed"
);
}
void
mnodeProcessMsg
(
SRpcMsg
*
pMsg
)
{
if
(
tsMworker
.
msgFp
[
pMsg
->
msgType
])
{
(
*
tsMworker
.
msgFp
[
pMsg
->
msgType
])(
pMsg
);
}
else
{
assert
(
0
);
}
}
SMnodeMsg
*
mnodeInitMsg
(
int32_t
msgNum
)
{
return
NULL
;
}
int32_t
mnodeAppendMsg
(
SMnodeMsg
*
pMsg
,
SRpcMsg
*
pRpcMsg
)
{
return
0
;
}
void
mnodeCleanupMsg
(
SMnodeMsg
*
pMsg
)
{}
void
mnodeProcessMsg
(
SMnodeMsg
*
pMsg
,
EMnMsgType
msgType
)
{}
source/dnode/mnode/src/mondeInt.c
浏览文件 @
f13b6149
...
...
@@ -37,48 +37,42 @@
#include "mnodeTelem.h"
static
struct
{
int32_t
state
;
int32_t
dnodeId
;
int64_t
clusterId
;
tmr_h
timer
;
S
MnodeFp
fp
;
SSteps
*
steps1
;
S
Steps
*
steps2
;
int32_t
state
;
int32_t
dnodeId
;
int64_t
clusterId
;
tmr_h
timer
;
S
Steps
*
steps1
;
SSteps
*
steps2
;
S
MnodePara
para
;
}
tsMint
;
tmr_h
mnodeGetTimer
()
{
return
tsMint
.
timer
;
}
int32_t
mnodeGetDnodeId
()
{
return
tsMint
.
dnodeId
;
}
int32_t
mnodeGetDnodeId
()
{
return
tsMint
.
para
.
dnodeId
;
}
int64_t
mnodeGetClusterId
()
{
return
tsMint
.
clusterId
;
}
int64_t
mnodeGetClusterId
()
{
return
tsMint
.
para
.
clusterId
;
}
EMnStatus
mnodeGetStatus
()
{
return
tsMint
.
state
;
}
void
mnodeSendMsgToDnode
(
struct
SEpSet
*
epSet
,
struct
SRpcMsg
*
rpcMsg
)
{
(
*
tsMint
.
fp
.
SendMsgToDnode
)(
epSet
,
rpcMsg
);
(
*
tsMint
.
para
.
SendMsgToDnode
)(
epSet
,
rpcMsg
);
}
void
mnodeSendMsgToMnode
(
struct
SRpcMsg
*
rpcMsg
)
{
return
(
*
tsMint
.
fp
.
SendMsgToMnode
)(
rpcMsg
);
}
void
mnodeSendMsgToMnode
(
struct
SRpcMsg
*
rpcMsg
)
{
return
(
*
tsMint
.
para
.
SendMsgToMnode
)(
rpcMsg
);
}
void
mnodeSendRedirectMsg
(
struct
SRpcMsg
*
rpcMsg
,
bool
forShell
)
{
(
*
tsMint
.
fp
.
SendRedirectMsg
)(
rpcMsg
,
forShell
);
}
void
mnodeSendRedirectMsg
(
struct
SRpcMsg
*
rpcMsg
,
bool
forShell
)
{
(
*
tsMint
.
para
.
SendRedirectMsg
)(
rpcMsg
,
forShell
);
}
void
mnodeGetDnodeEp
(
int32_t
dnodeId
,
char
*
ep
,
char
*
fqdn
,
uint16_t
*
port
)
{
(
*
tsMint
.
fp
.
GetDnodeEp
)(
dnodeId
,
ep
,
fqdn
,
port
);
}
int32_t
mnodeGetStatistics
(
SMnodeStat
*
stat
)
{
return
0
;
}
int32_t
mnodeGetLoad
(
SMnodeLoad
*
pLoad
)
{
return
0
;
}
static
int32_t
mnodeSetPara
(
SMnodePara
para
)
{
tsMint
.
fp
=
para
.
fp
;
tsMint
.
dnodeId
=
para
.
dnodeId
;
tsMint
.
clusterId
=
para
.
clusterId
;
tsMint
.
para
=
para
;
if
(
tsMint
.
fp
.
SendMsgToDnode
==
NULL
)
return
-
1
;
if
(
tsMint
.
fp
.
SendMsgToMnode
==
NULL
)
return
-
1
;
if
(
tsMint
.
fp
.
SendRedirectMsg
==
NULL
)
return
-
1
;
if
(
tsMint
.
fp
.
GetDnodeEp
==
NULL
)
return
-
1
;
if
(
tsMint
.
dnodeId
<
0
)
return
-
1
;
if
(
tsMint
.
clusterId
<
0
)
return
-
1
;
if
(
tsMint
.
para
.
SendMsgToDnode
==
NULL
)
return
-
1
;
if
(
tsMint
.
para
.
SendMsgToMnode
==
NULL
)
return
-
1
;
if
(
tsMint
.
para
.
SendRedirectMsg
==
NULL
)
return
-
1
;
if
(
tsMint
.
para
.
PutMsgIntoApplyQueue
==
NULL
)
return
-
1
;
if
(
tsMint
.
para
.
dnodeId
<
0
)
return
-
1
;
if
(
tsMint
.
para
.
clusterId
<
0
)
return
-
1
;
return
0
;
}
...
...
@@ -142,13 +136,13 @@ static void mnodeCleanupStep1() { taosStepCleanup(tsMint.steps1); }
static
void
mnodeCleanupStep2
()
{
taosStepCleanup
(
tsMint
.
steps2
);
}
static
bool
mnodeNeedDeploy
()
{
if
(
tsMint
.
dnodeId
>
0
)
return
false
;
if
(
tsMint
.
clusterId
>
0
)
return
false
;
if
(
tsMint
.
para
.
dnodeId
>
0
)
return
false
;
if
(
tsMint
.
para
.
clusterId
>
0
)
return
false
;
if
(
strcmp
(
tsFirst
,
tsLocalEp
)
!=
0
)
return
false
;
return
true
;
}
int32_t
mnodeDeploy
()
{
int32_t
mnodeDeploy
(
char
*
path
,
SMnodeCfg
*
pCfg
)
{
if
(
tsMint
.
state
!=
MN_STATUS_UNINIT
)
{
mError
(
"failed to deploy mnode since its deployed"
);
return
0
;
...
...
@@ -156,7 +150,7 @@ int32_t mnodeDeploy() {
tsMint
.
state
=
MN_STATUS_INIT
;
}
if
(
tsMint
.
dnodeId
<=
0
||
tsMint
.
clusterId
<=
0
)
{
if
(
tsMint
.
para
.
dnodeId
<=
0
||
tsMint
.
para
.
clusterId
<=
0
)
{
mError
(
"failed to deploy mnode since cluster not ready"
);
return
TSDB_CODE_MND_NOT_READY
;
}
...
...
@@ -183,7 +177,7 @@ int32_t mnodeDeploy() {
return
0
;
}
void
mnodeUnDeploy
()
{
void
mnodeUnDeploy
(
char
*
path
)
{
sdbUnDeploy
();
mnodeCleanup
();
}
...
...
@@ -251,5 +245,6 @@ void mnodeCleanup() {
}
}
int32_t
mnodeStart
()
{
return
0
;
}
int32_t
mnodeStart
(
char
*
path
,
SMnodeCfg
*
pCfg
)
{
return
0
;
}
int32_t
mnodeAlter
(
SMnodeCfg
*
pCfg
)
{
return
0
;
}
void
mnodeStop
()
{}
\ No newline at end of file
source/dnode/vnode/tq/CMakeLists.txt
浏览文件 @
f13b6149
...
...
@@ -12,3 +12,7 @@ target_link_libraries(
PUBLIC os
PUBLIC util
)
if
(
${
BUILD_TEST
}
)
add_subdirectory
(
test
)
endif
(
${
BUILD_TEST
}
)
source/dnode/vnode/tq/inc/tqMetaStore.h
浏览文件 @
f13b6149
...
...
@@ -19,6 +19,11 @@
#include "os.h"
#include "tq.h"
#ifdef __cplusplus
extern
"C"
{
#endif
#define TQ_BUCKET_SIZE 0xFF
#define TQ_PAGE_SIZE 4096
//key + offset + size
...
...
@@ -32,9 +37,23 @@ inline static int TqEmptyTail() { //16
return
TQ_PAGE_SIZE
-
TqMaxEntryOnePage
();
}
#ifdef __cplusplus
extern
"C"
{
#endif
#define TQ_ACTION_CONST 0
#define TQ_ACTION_INUSE 1
#define TQ_ACTION_INUSE_CONT 2
#define TQ_ACTION_INTXN 3
#define TQ_SVER 0
static
const
int8_t
TQ_CONST_DELETE
=
TQ_ACTION_CONST
;
#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE
typedef
struct
TqSerializedHead
{
int16_t
ver
;
int16_t
action
;
int32_t
checksum
;
int64_t
ssize
;
char
content
[];
}
TqSerializedHead
;
typedef
struct
TqMetaHandle
{
int64_t
key
;
...
...
@@ -59,30 +78,33 @@ typedef struct TqMetaStore {
TqMetaList
*
unpersistHead
;
int
fileFd
;
//TODO:temporaral use, to be replaced by unified tfile
int
idxFd
;
//TODO:temporaral use, to be replaced by unified tfile
int
(
*
serializer
)(
TqGroupHandle
*
,
void
**
);
const
void
*
(
*
deserializer
)(
const
void
*
,
TqGroupHandle
*
);
char
*
dirPath
;
int
(
*
serializer
)(
const
void
*
pObj
,
TqSerializedHead
**
ppHead
);
const
void
*
(
*
deserializer
)(
const
TqSerializedHead
*
pHead
,
void
**
ppObj
);
void
(
*
deleter
)(
void
*
);
}
TqMetaStore
;
TqMetaStore
*
tqStoreOpen
(
const
char
*
path
,
int
serializer
(
TqGroupHandle
*
,
void
**
),
const
void
*
deserializer
(
const
void
*
,
TqGroupHandle
*
),
void
deleter
(
void
*
));
int
serializer
(
const
void
*
pObj
,
TqSerializedHead
**
ppHead
),
const
void
*
deserializer
(
const
TqSerializedHead
*
pHead
,
void
**
ppObj
),
void
deleter
(
void
*
pObj
));
int32_t
tqStoreClose
(
TqMetaStore
*
);
//int32_t tqStoreDelete(TqMetaStore*);
//int32_t TqStoreCommitAll(TqMetaStore*);
int32_t
tqStorePersist
(
TqMetaStore
*
);
TqMetaHandle
*
tqHandleGet
(
TqMetaStore
*
,
int64_t
key
);
int32_t
tqHandlePut
(
TqMetaStore
*
,
int64_t
key
,
void
*
value
);
void
*
tqHandleGet
(
TqMetaStore
*
,
int64_t
key
);
int32_t
tqHandleMovePut
(
TqMetaStore
*
,
int64_t
key
,
void
*
value
);
int32_t
tqHandleCopyPut
(
TqMetaStore
*
,
int64_t
key
,
void
*
value
,
size_t
vsize
);
//do commit
int32_t
tqHandleCommit
(
TqMetaStore
*
,
int64_t
key
);
int32_t
tqHandleCommit
(
TqMetaStore
*
,
int64_t
key
);
//delete uncommitted
int32_t
tqHandleAbort
(
TqMetaStore
*
,
int64_t
key
);
//delete committed
int32_t
tqHandleDel
(
TqMetaStore
*
,
int64_t
key
);
int32_t
tqHandleAbort
(
TqMetaStore
*
,
int64_t
key
);
//delete committed kv pair
//notice that a delete action still needs to be committed
int32_t
tqHandleDel
(
TqMetaStore
*
,
int64_t
key
);
//delete both committed and uncommitted
int32_t
tqHandleClear
(
TqMetaStore
*
,
int64_t
key
);
int32_t
tqHandleClear
(
TqMetaStore
*
,
int64_t
key
);
#ifdef __cplusplus
}
...
...
source/dnode/vnode/tq/src/tqMetaStore.c
浏览文件 @
f13b6149
...
...
@@ -14,6 +14,7 @@
*/
#include "tqMetaStore.h"
//TODO:replace by an abstract file layer
#include "osDir.h"
#include <fcntl.h>
#include <string.h>
#include <unistd.h>
...
...
@@ -22,8 +23,18 @@
#define TQ_IDX_NAME "tq.idx"
static
int32_t
tqHandlePutCommitted
(
TqMetaStore
*
,
int64_t
key
,
void
*
value
);
static
TqMetaHandle
*
tqHandleGetUncommitted
(
TqMetaStore
*
,
int64_t
key
);
static
int32_t
tqHandlePutCommitted
(
TqMetaStore
*
,
int64_t
key
,
void
*
value
);
static
void
*
tqHandleGetUncommitted
(
TqMetaStore
*
,
int64_t
key
);
static
inline
void
tqLinkUnpersist
(
TqMetaStore
*
pMeta
,
TqMetaList
*
pNode
)
{
if
(
pNode
->
unpersistNext
==
NULL
)
{
pNode
->
unpersistNext
=
pMeta
->
unpersistHead
->
unpersistNext
;
pNode
->
unpersistPrev
=
pMeta
->
unpersistHead
;
pMeta
->
unpersistHead
->
unpersistNext
->
unpersistPrev
=
pNode
;
pMeta
->
unpersistHead
->
unpersistNext
=
pNode
;
}
}
typedef
struct
TqMetaPageBuf
{
int16_t
offset
;
...
...
@@ -31,23 +42,34 @@ typedef struct TqMetaPageBuf {
}
TqMetaPageBuf
;
TqMetaStore
*
tqStoreOpen
(
const
char
*
path
,
int
serializer
(
TqGroupHandle
*
,
void
**
),
const
void
*
deserializer
(
const
void
*
,
TqGroupHandle
*
),
void
deleter
(
void
*
))
{
int
serializer
(
const
void
*
pObj
,
TqSerializedHead
**
ppHead
),
const
void
*
deserializer
(
const
TqSerializedHead
*
pHead
,
void
**
ppObj
),
void
deleter
(
void
*
pObj
))
{
TqMetaStore
*
pMeta
=
malloc
(
sizeof
(
TqMetaStore
));
if
(
pMeta
==
NULL
)
{
//close
return
NULL
;
}
memset
(
pMeta
,
0
,
sizeof
(
TqMetaStore
));
//concat data file name and index file name
size_t
pathLen
=
strlen
(
path
);
pMeta
->
dirPath
=
malloc
(
pathLen
+
1
);
if
(
pMeta
->
dirPath
!=
NULL
)
{
//TODO: memory insufficient
}
strcpy
(
pMeta
->
dirPath
,
path
);
char
name
[
pathLen
+
10
];
strcpy
(
name
,
path
);
if
(
!
taosDirExist
(
name
)
&&
!
taosMkDir
(
name
))
{
ASSERT
(
false
);
}
strcat
(
name
,
"/"
TQ_IDX_NAME
);
int
idxFd
=
open
(
name
,
O_
WRONLY
|
O_CREAT
|
O_EXCL
,
0755
);
int
idxFd
=
open
(
name
,
O_
RDWR
|
O_CREAT
,
0755
);
if
(
idxFd
<
0
)
{
ASSERT
(
false
);
//close file
//free memory
return
NULL
;
...
...
@@ -56,17 +78,24 @@ TqMetaStore* tqStoreOpen(const char* path,
pMeta
->
idxFd
=
idxFd
;
pMeta
->
unpersistHead
=
malloc
(
sizeof
(
TqMetaList
));
if
(
pMeta
->
unpersistHead
==
NULL
)
{
ASSERT
(
false
);
//close file
//free memory
return
NULL
;
}
memset
(
pMeta
->
unpersistHead
,
0
,
sizeof
(
TqMetaList
));
pMeta
->
unpersistHead
->
unpersistNext
=
pMeta
->
unpersistHead
->
unpersistPrev
=
pMeta
->
unpersistHead
;
strcpy
(
name
,
path
);
strcat
(
name
,
"/"
TQ_META_NAME
);
int
fileFd
=
open
(
name
,
O_WRONLY
|
O_CREAT
|
O_EXCL
,
0755
);
if
(
fileFd
<
0
)
return
NULL
;
int
fileFd
=
open
(
name
,
O_RDWR
|
O_CREAT
,
0755
);
if
(
fileFd
<
0
){
ASSERT
(
false
);
return
NULL
;
}
memset
(
pMeta
,
0
,
sizeof
(
TqMetaStore
));
pMeta
->
fileFd
=
fileFd
;
pMeta
->
serializer
=
serializer
;
...
...
@@ -74,23 +103,103 @@ TqMetaStore* tqStoreOpen(const char* path,
pMeta
->
deleter
=
deleter
;
//read idx file and load into memory
char
readBuf
[
TQ_PAGE_SIZE
];
int
readSize
;
while
((
readSize
=
read
(
idxFd
,
readBuf
,
TQ_PAGE_SIZE
))
!=
-
1
)
{
char
idxBuf
[
TQ_PAGE_SIZE
];
TqSerializedHead
*
serializedObj
=
malloc
(
TQ_PAGE_SIZE
);
if
(
serializedObj
==
NULL
)
{
//TODO:memory insufficient
}
int
idxRead
;
int
allocated
=
TQ_PAGE_SIZE
;
while
((
idxRead
=
read
(
idxFd
,
idxBuf
,
TQ_PAGE_SIZE
)))
{
if
(
idxRead
==
-
1
)
{
//TODO: handle error
ASSERT
(
false
);
}
//loop read every entry
for
(
int
i
=
0
;
i
<
readSize
;
i
+=
TQ_IDX_ENTRY_SIZE
)
{
TqMetaList
*
pNode
=
malloc
(
sizeof
(
TqMetaHandle
));
memset
(
pNode
,
0
,
sizeof
(
TqMetaList
));
for
(
int
i
=
0
;
i
<
idxRead
;
i
+=
TQ_IDX_ENTRY_SIZE
)
{
TqMetaList
*
pNode
=
malloc
(
sizeof
(
TqMetaList
));
if
(
pNode
==
NULL
)
{
//TODO: free memory and return error
}
memcpy
(
&
pNode
->
handle
,
&
readBuf
[
i
],
TQ_IDX_ENTRY_SIZE
);
memset
(
pNode
,
0
,
sizeof
(
TqMetaList
));
memcpy
(
&
pNode
->
handle
,
&
idxBuf
[
i
],
TQ_IDX_ENTRY_SIZE
);
lseek
(
fileFd
,
pNode
->
handle
.
offset
,
SEEK_CUR
);
if
(
allocated
<
pNode
->
handle
.
serializedSize
)
{
void
*
ptr
=
realloc
(
serializedObj
,
pNode
->
handle
.
serializedSize
);
if
(
ptr
==
NULL
)
{
//TODO: memory insufficient
}
serializedObj
=
ptr
;
allocated
=
pNode
->
handle
.
serializedSize
;
}
serializedObj
->
ssize
=
pNode
->
handle
.
serializedSize
;
if
(
read
(
fileFd
,
serializedObj
,
pNode
->
handle
.
serializedSize
)
!=
pNode
->
handle
.
serializedSize
)
{
//TODO: read error
}
if
(
serializedObj
->
action
==
TQ_ACTION_INUSE
)
{
if
(
serializedObj
->
ssize
!=
sizeof
(
TqSerializedHead
))
{
pMeta
->
deserializer
(
serializedObj
,
&
pNode
->
handle
.
valueInUse
);
}
else
{
pNode
->
handle
.
valueInUse
=
TQ_DELETE_TOKEN
;
}
}
else
if
(
serializedObj
->
action
==
TQ_ACTION_INTXN
)
{
if
(
serializedObj
->
ssize
!=
sizeof
(
TqSerializedHead
))
{
pMeta
->
deserializer
(
serializedObj
,
&
pNode
->
handle
.
valueInTxn
);
}
else
{
pNode
->
handle
.
valueInTxn
=
TQ_DELETE_TOKEN
;
}
}
else
if
(
serializedObj
->
action
==
TQ_ACTION_INUSE_CONT
)
{
if
(
serializedObj
->
ssize
!=
sizeof
(
TqSerializedHead
))
{
pMeta
->
deserializer
(
serializedObj
,
&
pNode
->
handle
.
valueInUse
);
}
else
{
pNode
->
handle
.
valueInUse
=
TQ_DELETE_TOKEN
;
}
serializedObj
=
POINTER_SHIFT
(
serializedObj
,
serializedObj
->
ssize
);
if
(
serializedObj
->
ssize
!=
sizeof
(
TqSerializedHead
))
{
pMeta
->
deserializer
(
serializedObj
,
&
pNode
->
handle
.
valueInTxn
);
}
else
{
pNode
->
handle
.
valueInTxn
=
TQ_DELETE_TOKEN
;
}
}
else
{
ASSERT
(
0
);
}
//put into list
int
bucketKey
=
pNode
->
handle
.
key
&
TQ_BUCKET_SIZE
;
pNode
->
next
=
pMeta
->
bucket
[
bucketKey
];
pMeta
->
bucket
[
bucketKey
]
=
pNode
;
TqMetaList
*
pBucketNode
=
pMeta
->
bucket
[
bucketKey
];
if
(
pBucketNode
==
NULL
)
{
pMeta
->
bucket
[
bucketKey
]
=
pNode
;
}
else
if
(
pBucketNode
->
handle
.
key
==
pNode
->
handle
.
key
)
{
pNode
->
next
=
pBucketNode
->
next
;
pMeta
->
bucket
[
bucketKey
]
=
pNode
;
}
else
{
while
(
pBucketNode
->
next
&&
pBucketNode
->
next
->
handle
.
key
==
pNode
->
handle
.
key
)
{
pBucketNode
=
pBucketNode
->
next
;
}
if
(
pBucketNode
->
next
)
{
ASSERT
(
pBucketNode
->
next
->
handle
.
key
==
pNode
->
handle
.
key
);
TqMetaList
*
pNodeTmp
=
pBucketNode
->
next
;
pBucketNode
->
next
=
pNodeTmp
->
next
;
pBucketNode
=
pNodeTmp
;
}
else
{
pBucketNode
=
NULL
;
}
}
if
(
pBucketNode
)
{
if
(
pBucketNode
->
handle
.
valueInUse
&&
pBucketNode
->
handle
.
valueInUse
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
deleter
(
pBucketNode
->
handle
.
valueInUse
);
}
if
(
pBucketNode
->
handle
.
valueInTxn
&&
pBucketNode
->
handle
.
valueInTxn
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
deleter
(
pBucketNode
->
handle
.
valueInTxn
);
}
free
(
pBucketNode
);
}
}
}
free
(
serializedObj
);
return
pMeta
;
}
...
...
@@ -102,30 +211,54 @@ int32_t tqStoreClose(TqMetaStore* pMeta) {
close
(
pMeta
->
idxFd
);
//free memory
for
(
int
i
=
0
;
i
<
TQ_BUCKET_SIZE
;
i
++
)
{
TqMetaList
*
n
ode
=
pMeta
->
bucket
[
i
];
pMeta
->
bucket
[
i
]
=
NULL
;
while
(
node
)
{
ASSERT
(
node
->
unpersistNext
==
NULL
);
ASSERT
(
node
->
unpersistPrev
==
NULL
);
if
(
node
->
handle
.
valueInTxn
)
{
pMeta
->
deleter
(
n
ode
->
handle
.
valueInTxn
);
TqMetaList
*
pN
ode
=
pMeta
->
bucket
[
i
];
while
(
pNode
)
{
ASSERT
(
pNode
->
unpersistNext
==
NULL
);
ASSERT
(
pNode
->
unpersistPrev
==
NULL
);
if
(
pNode
->
handle
.
valueInTxn
&&
pNode
->
handle
.
valueInTxn
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
deleter
(
pN
ode
->
handle
.
valueInTxn
);
}
if
(
node
->
handle
.
valueInUse
)
{
pMeta
->
deleter
(
node
->
handle
.
valueInUse
);
if
(
pNode
->
handle
.
valueInUse
&&
pNode
->
handle
.
valueInUse
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInUse
);
}
TqMetaList
*
next
=
n
ode
->
next
;
free
(
n
ode
);
n
ode
=
next
;
TqMetaList
*
next
=
pN
ode
->
next
;
free
(
pN
ode
);
pN
ode
=
next
;
}
}
free
(
pMeta
->
dirPath
);
free
(
pMeta
->
unpersistHead
);
free
(
pMeta
);
return
0
;
}
int32_t
tqStoreDelete
(
TqMetaStore
*
pMeta
)
{
//close file
//delete file
close
(
pMeta
->
fileFd
);
close
(
pMeta
->
idxFd
);
//free memory
for
(
int
i
=
0
;
i
<
TQ_BUCKET_SIZE
;
i
++
)
{
TqMetaList
*
pNode
=
pMeta
->
bucket
[
i
];
pMeta
->
bucket
[
i
]
=
NULL
;
while
(
pNode
)
{
if
(
pNode
->
handle
.
valueInTxn
&&
pNode
->
handle
.
valueInTxn
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInTxn
);
}
if
(
pNode
->
handle
.
valueInUse
&&
pNode
->
handle
.
valueInUse
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInUse
);
}
TqMetaList
*
next
=
pNode
->
next
;
free
(
pNode
);
pNode
=
next
;
}
}
free
(
pMeta
->
unpersistHead
);
taosRemoveDir
(
pMeta
->
dirPath
);
free
(
pMeta
->
dirPath
);
free
(
pMeta
);
return
0
;
}
...
...
@@ -135,69 +268,89 @@ int32_t tqStorePersist(TqMetaStore* pMeta) {
int64_t
*
bufPtr
=
(
int64_t
*
)
writeBuf
;
TqMetaList
*
pHead
=
pMeta
->
unpersistHead
;
TqMetaList
*
pNode
=
pHead
->
unpersistNext
;
TqSerializedHead
*
pSHead
=
malloc
(
sizeof
(
TqSerializedHead
));
if
(
pSHead
==
NULL
)
{
//TODO: memory error
return
-
1
;
}
pSHead
->
ver
=
TQ_SVER
;
pSHead
->
checksum
=
0
;
pSHead
->
ssize
=
sizeof
(
TqSerializedHead
);
int
allocatedSize
=
sizeof
(
TqSerializedHead
);
int
offset
=
lseek
(
pMeta
->
fileFd
,
0
,
SEEK_CUR
);
while
(
pHead
!=
pNode
)
{
if
(
pNode
->
handle
.
valueInUse
==
NULL
)
{
//put delete token in data file
uint32_t
delete
=
TQ_ACTION_DELETE
;
int
nBytes
=
write
(
pMeta
->
fileFd
,
&
delete
,
sizeof
(
uint32_t
));
ASSERT
(
nBytes
==
sizeof
(
uint32_t
));
int
nBytes
=
0
;
//remove from list
int
bucketKey
=
pNode
->
handle
.
key
&
TQ_BUCKET_SIZE
;
TqMetaList
*
pBucketHead
=
pMeta
->
bucket
[
bucketKey
];
if
(
pBucketHead
==
pNode
)
{
pMeta
->
bucket
[
bucketKey
]
=
pBucketHead
->
next
;
if
(
pNode
->
handle
.
valueInUse
)
{
if
(
pNode
->
handle
.
valueInTxn
)
{
pSHead
->
action
=
TQ_ACTION_INUSE_CONT
;
}
else
{
TqMetaList
*
pBucketNode
=
pBucketHead
;
while
(
pBucketNode
->
next
!=
NULL
&&
pBucketNode
->
next
!=
pNode
)
{
pBucketNode
=
pBucketNode
->
next
;
}
if
(
pBucketNode
->
next
!=
NULL
)
{
ASSERT
(
pBucketNode
->
next
==
pNode
);
pBucketNode
->
next
=
pNode
->
next
;
if
(
pNode
->
handle
.
valueInUse
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInUse
);
}
free
(
pNode
);
}
pSHead
->
action
=
TQ_ACTION_INUSE
;
}
if
(
pNode
->
handle
.
valueInUse
==
TQ_DELETE_TOKEN
)
{
pSHead
->
ssize
=
sizeof
(
TqSerializedHead
);
}
else
{
pMeta
->
serializer
(
pNode
->
handle
.
valueInUse
,
&
pSHead
);
}
nBytes
=
write
(
pMeta
->
fileFd
,
pSHead
,
pSHead
->
ssize
);
ASSERT
(
nBytes
==
pSHead
->
ssize
);
}
//serialize
void
*
pBytes
=
NULL
;
int
sz
=
pMeta
->
serializer
(
pNode
->
handle
.
valueInUse
,
&
pBytes
);
ASSERT
(
pBytes
!=
NULL
);
//get current offset
//append data
int64_t
offset
=
lseek
(
pMeta
->
fileFd
,
0
,
SEEK_CUR
);
int
nBytes
=
write
(
pMeta
->
fileFd
,
pBytes
,
sz
);
//TODO: handle error in tfile
ASSERT
(
nBytes
==
sz
);
pNode
->
handle
.
offset
=
offset
;
pNode
->
handle
.
serializedSize
=
sz
;
if
(
pNode
->
handle
.
valueInTxn
)
{
pSHead
->
action
=
TQ_ACTION_INTXN
;
if
(
pNode
->
handle
.
valueInTxn
==
TQ_DELETE_TOKEN
)
{
pSHead
->
ssize
=
sizeof
(
TqSerializedHead
);
}
else
{
pMeta
->
serializer
(
pNode
->
handle
.
valueInTxn
,
&
pSHead
);
}
int
nBytesTxn
=
write
(
pMeta
->
fileFd
,
pSHead
,
pSHead
->
ssize
);
ASSERT
(
nBytesTxn
==
pSHead
->
ssize
);
nBytes
+=
nBytesTxn
;
}
//write idx
//write idx
file
//TODO: endian check and convert
*
(
bufPtr
++
)
=
pNode
->
handle
.
key
;
*
(
bufPtr
++
)
=
pNode
->
handle
.
offset
;
*
(
bufPtr
++
)
=
(
int64_t
)
sz
;
*
(
bufPtr
++
)
=
(
int64_t
)
nBytes
;
if
((
char
*
)(
bufPtr
+
3
)
>
writeBuf
+
TQ_PAGE_SIZE
)
{
nBytes
=
write
(
pMeta
->
idxFd
,
writeBuf
,
sizeof
(
writeBuf
));
//TODO: handle error
in
tfile
//TODO: handle error
with
tfile
ASSERT
(
nBytes
==
sizeof
(
writeBuf
));
memset
(
writeBuf
,
0
,
TQ_PAGE_SIZE
);
bufPtr
=
(
int64_t
*
)
writeBuf
;
}
//remove from unpersist list
pHead
->
unpersistNext
=
pNode
->
unpersistNext
;
pHead
->
unpersistNext
->
unpersistPrev
=
pHead
;
pNode
->
unpersistPrev
=
pNode
->
unpersistNext
=
NULL
;
pNode
=
pHead
->
unpersistNext
;
//remove from bucket
if
(
pNode
->
handle
.
valueInUse
==
TQ_DELETE_TOKEN
&&
pNode
->
handle
.
valueInTxn
==
NULL
)
{
int
bucketKey
=
pNode
->
handle
.
key
&
TQ_BUCKET_SIZE
;
TqMetaList
*
pBucketHead
=
pMeta
->
bucket
[
bucketKey
];
if
(
pBucketHead
==
pNode
)
{
pMeta
->
bucket
[
bucketKey
]
=
pNode
->
next
;
}
else
{
TqMetaList
*
pBucketNode
=
pBucketHead
;
while
(
pBucketNode
->
next
!=
NULL
&&
pBucketNode
->
next
!=
pNode
)
{
pBucketNode
=
pBucketNode
->
next
;
}
//impossible for pBucket->next == NULL
ASSERT
(
pBucketNode
->
next
==
pNode
);
pBucketNode
->
next
=
pNode
->
next
;
}
free
(
pNode
);
}
}
//write left bytes
free
(
pSHead
);
if
((
char
*
)
bufPtr
!=
writeBuf
)
{
int
used
=
(
char
*
)
bufPtr
-
writeBuf
;
int
nBytes
=
write
(
pMeta
->
idxFd
,
writeBuf
,
used
);
...
...
@@ -216,7 +369,10 @@ static int32_t tqHandlePutCommitted(TqMetaStore* pMeta, int64_t key, void* value
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
//TODO: think about thread safety
pMeta
->
deleter
(
pNode
->
handle
.
valueInUse
);
if
(
pNode
->
handle
.
valueInUse
&&
pNode
->
handle
.
valueInUse
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInUse
);
}
//change pointer ownership
pNode
->
handle
.
valueInUse
=
value
;
return
0
;
...
...
@@ -240,13 +396,13 @@ static int32_t tqHandlePutCommitted(TqMetaStore* pMeta, int64_t key, void* value
return
0
;
}
TqMetaHandle
*
tqHandleGet
(
TqMetaStore
*
pMeta
,
int64_t
key
)
{
void
*
tqHandleGet
(
TqMetaStore
*
pMeta
,
int64_t
key
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_SIZE
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInUse
!=
NULL
)
{
return
&
pNode
->
handl
e
;
return
pNode
->
handle
.
valueInUs
e
;
}
else
{
return
NULL
;
}
...
...
@@ -257,15 +413,19 @@ TqMetaHandle* tqHandleGet(TqMetaStore* pMeta, int64_t key) {
return
NULL
;
}
int32_t
tqHandlePut
(
TqMetaStore
*
pMeta
,
int64_t
key
,
void
*
value
)
{
int32_t
tqHandle
Move
Put
(
TqMetaStore
*
pMeta
,
int64_t
key
,
void
*
value
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_SIZE
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
//TODO: think about thread safety
pMeta
->
deleter
(
pNode
->
handle
.
valueInTxn
);
if
(
pNode
->
handle
.
valueInTxn
&&
pNode
->
handle
.
valueInTxn
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInTxn
);
}
//change pointer ownership
pNode
->
handle
.
valueInTxn
=
value
;
tqLinkUnpersist
(
pMeta
,
pNode
);
return
0
;
}
else
{
pNode
=
pNode
->
next
;
...
...
@@ -279,16 +439,58 @@ int32_t tqHandlePut(TqMetaStore* pMeta, int64_t key, void* value) {
memset
(
pNewNode
,
0
,
sizeof
(
TqMetaList
));
pNewNode
->
handle
.
key
=
key
;
pNewNode
->
handle
.
valueInTxn
=
value
;
pNewNode
->
next
=
pMeta
->
bucket
[
bucketKey
];
pMeta
->
bucket
[
bucketKey
]
=
pNewNode
;
tqLinkUnpersist
(
pMeta
,
pNewNode
);
return
0
;
}
static
TqMetaHandle
*
tqHandleGetUncommitted
(
TqMetaStore
*
pMeta
,
int64_t
key
)
{
int32_t
tqHandleCopyPut
(
TqMetaStore
*
pMeta
,
int64_t
key
,
void
*
value
,
size_t
vsize
)
{
void
*
vmem
=
malloc
(
vsize
);
if
(
vmem
==
NULL
)
{
//TODO: memory error
return
-
1
;
}
memcpy
(
vmem
,
value
,
vsize
);
int64_t
bucketKey
=
key
&
TQ_BUCKET_SIZE
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInTxn
!=
NULL
)
{
return
&
pNode
->
handle
;
//TODO: think about thread safety
if
(
pNode
->
handle
.
valueInTxn
&&
pNode
->
handle
.
valueInTxn
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInTxn
);
}
//change pointer ownership
pNode
->
handle
.
valueInTxn
=
vmem
;
tqLinkUnpersist
(
pMeta
,
pNode
);
return
0
;
}
else
{
pNode
=
pNode
->
next
;
}
}
TqMetaList
*
pNewNode
=
malloc
(
sizeof
(
TqMetaList
));
if
(
pNewNode
==
NULL
)
{
//TODO: memory error
return
-
1
;
}
memset
(
pNewNode
,
0
,
sizeof
(
TqMetaList
));
pNewNode
->
handle
.
key
=
key
;
pNewNode
->
handle
.
valueInTxn
=
vmem
;
pNewNode
->
next
=
pMeta
->
bucket
[
bucketKey
];
pMeta
->
bucket
[
bucketKey
]
=
pNewNode
;
tqLinkUnpersist
(
pMeta
,
pNewNode
);
return
0
;
}
static
void
*
tqHandleGetUncommitted
(
TqMetaStore
*
pMeta
,
int64_t
key
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_SIZE
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInTxn
!=
NULL
&&
pNode
->
handle
.
valueInTxn
!=
TQ_DELETE_TOKEN
)
{
return
pNode
->
handle
.
valueInTxn
;
}
else
{
return
NULL
;
}
...
...
@@ -304,16 +506,13 @@ int32_t tqHandleCommit(TqMetaStore* pMeta, int64_t key) {
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInUse
!=
NULL
)
{
if
(
pNode
->
handle
.
valueInUse
&&
pNode
->
handle
.
valueInUse
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInUse
);
}
pNode
->
handle
.
valueInUse
=
pNode
->
handle
.
valueInTxn
;
if
(
pNode
->
unpersistNext
==
NULL
)
{
pNode
->
unpersistNext
=
pMeta
->
unpersistHead
->
unpersistNext
;
pNode
->
unpersistPrev
=
pMeta
->
unpersistHead
;
pMeta
->
unpersistHead
->
unpersistNext
->
unpersistPrev
=
pNode
;
pMeta
->
unpersistHead
->
unpersistNext
=
pNode
;
}
pNode
->
handle
.
valueInTxn
=
NULL
;
tqLinkUnpersist
(
pMeta
,
pNode
);
return
0
;
}
else
{
pNode
=
pNode
->
next
;
...
...
@@ -327,9 +526,12 @@ int32_t tqHandleAbort(TqMetaStore* pMeta, int64_t key) {
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInTxn
!=
NULL
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInTxn
);
if
(
pNode
->
handle
.
valueInTxn
)
{
if
(
pNode
->
handle
.
valueInTxn
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInTxn
);
}
pNode
->
handle
.
valueInTxn
=
NULL
;
tqLinkUnpersist
(
pMeta
,
pNode
);
return
0
;
}
return
-
1
;
...
...
@@ -344,9 +546,11 @@ int32_t tqHandleDel(TqMetaStore* pMeta, int64_t key) {
int64_t
bucketKey
=
key
&
TQ_BUCKET_SIZE
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInTxn
&&
pNode
->
handle
.
valueInTxn
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInTxn
);
pNode
->
handle
.
valueInTxn
=
NULL
;
pNode
->
handle
.
valueInTxn
=
TQ_DELETE_TOKEN
;
tqLinkUnpersist
(
pMeta
,
pNode
);
return
0
;
}
else
{
pNode
=
pNode
->
next
;
...
...
@@ -364,21 +568,20 @@ int32_t tqHandleClear(TqMetaStore* pMeta, int64_t key) {
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInUse
!=
NULL
)
{
exist
=
true
;
pMeta
->
deleter
(
pNode
->
handle
.
valueInUse
);
pNode
->
handle
.
valueInUse
=
NULL
;
if
(
pNode
->
handle
.
valueInUse
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInUse
);
}
pNode
->
handle
.
valueInUse
=
TQ_DELETE_TOKEN
;
}
if
(
pNode
->
handle
.
valueInTxn
!=
NULL
)
{
exist
=
true
;
pMeta
->
deleter
(
pNode
->
handle
.
valueInTxn
);
pNode
->
handle
.
valueInTxn
=
NULL
;
if
(
pNode
->
handle
.
valueInTxn
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInTxn
);
}
pNode
->
handle
.
valueInTxn
=
TQ_DELETE_TOKEN
;
}
if
(
exist
)
{
if
(
pNode
->
unpersistNext
==
NULL
)
{
pNode
->
unpersistNext
=
pMeta
->
unpersistHead
->
unpersistNext
;
pNode
->
unpersistPrev
=
pMeta
->
unpersistHead
;
pMeta
->
unpersistHead
->
unpersistNext
->
unpersistPrev
=
pNode
;
pMeta
->
unpersistHead
->
unpersistNext
=
pNode
;
}
tqLinkUnpersist
(
pMeta
,
pNode
);
return
0
;
}
return
-
1
;
...
...
source/dnode/vnode/tq/test/CMakeLists.txt
0 → 100644
浏览文件 @
f13b6149
add_executable
(
tqTest
""
)
target_sources
(
tqTest
PRIVATE
"tqMetaTest.cpp"
)
target_include_directories
(
tqTest
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/server/vnode/tq"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_link_libraries
(
tqTest
tq
gtest_main
)
enable_testing
()
add_test
(
NAME tq_test
COMMAND tqTest
)
source/dnode/vnode/tq/test/tqMetaTest.cpp
0 → 100644
浏览文件 @
f13b6149
#include <gtest/gtest.h>
#include <cstring>
#include <iostream>
#include <queue>
#include "tqMetaStore.h"
struct
Foo
{
int32_t
a
;
};
int
FooSerializer
(
const
void
*
pObj
,
TqSerializedHead
**
ppHead
)
{
Foo
*
foo
=
(
Foo
*
)
pObj
;
if
((
*
ppHead
)
==
NULL
||
(
*
ppHead
)
->
ssize
<
sizeof
(
TqSerializedHead
)
+
sizeof
(
int32_t
))
{
*
ppHead
=
(
TqSerializedHead
*
)
realloc
(
*
ppHead
,
sizeof
(
TqSerializedHead
)
+
sizeof
(
int32_t
));
(
*
ppHead
)
->
ssize
=
sizeof
(
TqSerializedHead
)
+
sizeof
(
int32_t
);
}
*
(
int32_t
*
)(
*
ppHead
)
->
content
=
foo
->
a
;
return
(
*
ppHead
)
->
ssize
;
}
const
void
*
FooDeserializer
(
const
TqSerializedHead
*
pHead
,
void
**
ppObj
)
{
if
(
*
ppObj
==
NULL
)
{
*
ppObj
=
realloc
(
*
ppObj
,
sizeof
(
int32_t
));
}
Foo
*
pFoo
=
*
(
Foo
**
)
ppObj
;
pFoo
->
a
=
*
(
int32_t
*
)
pHead
->
content
;
return
NULL
;
}
void
FooDeleter
(
void
*
pObj
)
{
free
(
pObj
);
}
class
TqMetaTest
:
public
::
testing
::
Test
{
protected:
void
SetUp
()
override
{
taosRemoveDir
(
pathName
);
pMeta
=
tqStoreOpen
(
pathName
,
FooSerializer
,
FooDeserializer
,
FooDeleter
);
ASSERT
(
pMeta
);
}
void
TearDown
()
override
{
tqStoreClose
(
pMeta
);
}
TqMetaStore
*
pMeta
;
const
char
*
pathName
=
"/tmp/tq_test"
;
};
TEST_F
(
TqMetaTest
,
copyPutTest
)
{
Foo
foo
;
foo
.
a
=
3
;
tqHandleCopyPut
(
pMeta
,
1
,
&
foo
,
sizeof
(
Foo
));
Foo
*
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pFoo
==
NULL
,
true
);
}
TEST_F
(
TqMetaTest
,
persistTest
)
{
Foo
*
pFoo
=
(
Foo
*
)
malloc
(
sizeof
(
Foo
));
pFoo
->
a
=
2
;
tqHandleMovePut
(
pMeta
,
1
,
pFoo
);
Foo
*
pBar
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pBar
==
NULL
,
true
);
tqHandleCommit
(
pMeta
,
1
);
pBar
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pBar
->
a
,
pFoo
->
a
);
pBar
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
2
);
EXPECT_EQ
(
pBar
==
NULL
,
true
);
tqStoreClose
(
pMeta
);
pMeta
=
tqStoreOpen
(
pathName
,
FooSerializer
,
FooDeserializer
,
FooDeleter
);
ASSERT
(
pMeta
);
pBar
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
ASSERT_EQ
(
pBar
!=
NULL
,
true
);
EXPECT_EQ
(
pBar
->
a
,
2
);
pBar
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
2
);
EXPECT_EQ
(
pBar
==
NULL
,
true
);
//taosRemoveDir(pathName);
}
TEST_F
(
TqMetaTest
,
uncommittedTest
)
{
Foo
*
pFoo
=
(
Foo
*
)
malloc
(
sizeof
(
Foo
));
pFoo
->
a
=
3
;
tqHandleMovePut
(
pMeta
,
1
,
pFoo
);
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pFoo
==
NULL
,
true
);
}
TEST_F
(
TqMetaTest
,
abortTest
)
{
Foo
*
pFoo
=
(
Foo
*
)
malloc
(
sizeof
(
Foo
));
pFoo
->
a
=
3
;
tqHandleMovePut
(
pMeta
,
1
,
pFoo
);
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pFoo
==
NULL
,
true
);
tqHandleAbort
(
pMeta
,
1
);
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pFoo
==
NULL
,
true
);
}
TEST_F
(
TqMetaTest
,
deleteTest
)
{
Foo
*
pFoo
=
(
Foo
*
)
malloc
(
sizeof
(
Foo
));
pFoo
->
a
=
3
;
tqHandleMovePut
(
pMeta
,
1
,
pFoo
);
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pFoo
==
NULL
,
true
);
tqHandleCommit
(
pMeta
,
1
);
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
ASSERT_EQ
(
pFoo
!=
NULL
,
true
);
EXPECT_EQ
(
pFoo
->
a
,
3
);
tqHandleDel
(
pMeta
,
1
);
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
ASSERT_EQ
(
pFoo
!=
NULL
,
true
);
EXPECT_EQ
(
pFoo
->
a
,
3
);
tqHandleCommit
(
pMeta
,
1
);
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pFoo
==
NULL
,
true
);
}
source/dnode/vnode/tq/test/tqTests.cpp
已删除
100644 → 0
浏览文件 @
3c8a25aa
source/os/src/osDir.c
浏览文件 @
f13b6149
...
...
@@ -55,7 +55,7 @@ void taosRemoveDir(const char *dirname) {
closedir
(
dir
);
rmdir
(
dirname
);
printf
(
"dir:%s is removed"
,
dirname
);
printf
(
"dir:%s is removed
\n
"
,
dirname
);
}
bool
taosDirExist
(
char
*
dirname
)
{
return
access
(
dirname
,
F_OK
)
==
0
;
}
...
...
@@ -138,4 +138,4 @@ bool taosRealPath(char *dirname, int32_t maxlen) {
return
true
;
}
#endif
\ No newline at end of file
#endif
source/util/src/terror.c
浏览文件 @
f13b6149
...
...
@@ -228,14 +228,18 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_ALREADY_EXIST, "Topic already exists"
// dnode
TAOS_DEFINE_ERROR
(
TSDB_CODE_DND_MSG_NOT_PROCESSED
,
"Message not processed"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_DND_OUT_OF_MEMORY
,
"Dnode out of memory"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_DND_DNODE_ID_NOT_MATCHED
,
"Dnode Id not matched"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_DND_MNODE_ALREADY_DROPPED
,
"Mnode already deployed"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_DND_MNODE_ID_NOT_MATCH_DNODE
,
"Mnode Id not match Dnode"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED
,
"Mnode already deployed"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_DND_MNODE_NOT_DEPLOYED
,
"Mnode not deployed"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_DND_READ_MNODE_FILE_ERROR
,
"Read mnode.json error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_DND_WRITE_MNODE_FILE_ERROR
,
"Write mnode.json error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_DND_NO_WRITE_ACCESS
,
"No permission for disk files in dnode"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_DND_INVALID_MSG_LEN
,
"Invalid message length"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_DND_ACTION_IN_PROGRESS
,
"Action in progress"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_DND_TOO_MANY_VNODES
,
"Too many vnode directories"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_DND_EXITING
,
"Dnode is exiting"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_DND_PARSE_VNODE_FILE_ERROR
,
"Parse vnodes.json error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_DND_PARSE_DNODE_FILE_ERROR
,
"Parse dnodes.json error"
)
// vnode
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_ACTION_IN_PROGRESS
,
"Action in progress"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录