Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d6606740
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d6606740
编写于
1月 11, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refact dnode module
上级
25bc2b3a
变更
17
隐藏空白更改
内联
并排
Showing
17 changed file
with
356 addition
and
288 deletion
+356
-288
include/dnode/mgmt/dnode.h
include/dnode/mgmt/dnode.h
+33
-15
include/util/taoserror.h
include/util/taoserror.h
+2
-0
source/dnode/mgmt/daemon/src/daemon.c
source/dnode/mgmt/daemon/src/daemon.c
+53
-40
source/dnode/mgmt/impl/inc/dndEnv.h
source/dnode/mgmt/impl/inc/dndEnv.h
+120
-1
source/dnode/mgmt/impl/inc/dndInt.h
source/dnode/mgmt/impl/inc/dndInt.h
+4
-117
source/dnode/mgmt/impl/src/dndBnode.c
source/dnode/mgmt/impl/src/dndBnode.c
+1
-1
source/dnode/mgmt/impl/src/dndEnv.c
source/dnode/mgmt/impl/src/dndEnv.c
+81
-56
source/dnode/mgmt/impl/src/dndMgmt.c
source/dnode/mgmt/impl/src/dndMgmt.c
+13
-13
source/dnode/mgmt/impl/src/dndMnode.c
source/dnode/mgmt/impl/src/dndMnode.c
+12
-12
source/dnode/mgmt/impl/src/dndQnode.c
source/dnode/mgmt/impl/src/dndQnode.c
+1
-1
source/dnode/mgmt/impl/src/dndSnode.c
source/dnode/mgmt/impl/src/dndSnode.c
+1
-1
source/dnode/mgmt/impl/src/dndTransport.c
source/dnode/mgmt/impl/src/dndTransport.c
+5
-5
source/dnode/mgmt/impl/src/dndVnodes.c
source/dnode/mgmt/impl/src/dndVnodes.c
+5
-5
source/dnode/mgmt/impl/test/sut/inc/server.h
source/dnode/mgmt/impl/test/sut/inc/server.h
+1
-1
source/dnode/mgmt/impl/test/sut/src/server.cpp
source/dnode/mgmt/impl/test/sut/src/server.cpp
+18
-20
source/dnode/mgmt/impl/test/sut/src/sut.cpp
source/dnode/mgmt/impl/test/sut/src/sut.cpp
+4
-0
source/util/src/terror.c
source/util/src/terror.c
+2
-0
未找到文件。
include/dnode/mgmt/dnode.h
浏览文件 @
d6606740
...
...
@@ -22,15 +22,39 @@
extern
"C"
{
#endif
/* ------------------------ TYPES EXPOSED ----------------
--------
*/
/* ------------------------ TYPES EXPOSED ---------------- */
typedef
struct
SDnode
SDnode
;
/* ------------------------ Environment ------------------ */
typedef
struct
{
int32_t
sver
;
int32_t
numOfCores
;
int8_t
enableTelem
;
char
timezone
[
TSDB_TIMEZONE_LEN
];
char
locale
[
TSDB_LOCALE_LEN
];
char
charset
[
TSDB_LOCALE_LEN
];
char
buildinfo
[
64
];
char
gitinfo
[
48
];
}
SDnodeEnvCfg
;
/**
* @brief Initialize the environment
*
* @param pOption Option of the environment
* @return int32_t 0 for success and -1 for failure
*/
int32_t
dndInit
(
const
SDnodeEnvCfg
*
pCfg
);
/**
* @brief clear the environment
*
*/
void
dndCleanup
();
/* ------------------------ SDnode ----------------------- */
typedef
struct
{
int32_t
sver
;
int32_t
numOfCores
;
int32_t
numOfSupportVnodes
;
int16_t
numOfCommitThreads
;
int
8_t
enableTelem
;
int
32_t
numOfSupportVnodes
;
int32_t
statusInterval
;
float
numOfThreadsPerCore
;
float
ratioOfQueryCores
;
...
...
@@ -41,28 +65,22 @@ typedef struct {
char
localEp
[
TSDB_EP_LEN
];
char
localFqdn
[
TSDB_FQDN_LEN
];
char
firstEp
[
TSDB_EP_LEN
];
char
timezone
[
TSDB_TIMEZONE_LEN
];
char
locale
[
TSDB_LOCALE_LEN
];
char
charset
[
TSDB_LOCALE_LEN
];
char
buildinfo
[
64
];
char
gitinfo
[
48
];
}
SDnodeOpt
;
}
SDnodeObjCfg
;
/* ------------------------ SDnode ------------------------ */
/**
* @brief Initialize and start the dnode.
*
* @param p
Option Option
of the dnode.
* @param p
Cfg Config
of the dnode.
* @return SDnode* The dnode object.
*/
SDnode
*
dnd
Init
(
SDnodeOpt
*
pOption
);
SDnode
*
dnd
Create
(
SDnodeObjCfg
*
pCfg
);
/**
* @brief Stop and cleanup the dnode.
*
* @param pDnode The dnode object to close.
*/
void
dndCl
eanup
(
SDnode
*
pDnode
);
void
dndCl
ose
(
SDnode
*
pDnode
);
#ifdef __cplusplus
}
...
...
include/util/taoserror.h
浏览文件 @
d6606740
...
...
@@ -70,6 +70,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x0108)
#define TSDB_CODE_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0109)
#define TSDB_CODE_INVALID_PARA TAOS_DEF_ERROR_CODE(0, 0x010A)
#define TSDB_CODE_REPEAT_INIT TAOS_DEF_ERROR_CODE(0, 0x010B)
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0110)
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0111)
#define TSDB_CODE_REF_ID_REMOVED TAOS_DEF_ERROR_CODE(0, 0x0112)
...
...
source/dnode/mgmt/daemon/src/daemon.c
浏览文件 @
d6606740
...
...
@@ -28,11 +28,11 @@ static struct {
bool
printAuth
;
bool
printVersion
;
char
configDir
[
PATH_MAX
];
}
global
=
{
0
};
}
dmn
=
{
0
};
void
dmnSigintHandle
(
int
signum
,
void
*
info
,
void
*
ctx
)
{
uInfo
(
"singal:%d is received"
,
signum
);
global
.
stop
=
true
;
dmn
.
stop
=
true
;
}
void
dmnSetSignalHandle
()
{
...
...
@@ -44,7 +44,7 @@ void dmnSetSignalHandle() {
}
int
dmnParseOption
(
int
argc
,
char
const
*
argv
[])
{
tstrncpy
(
global
.
configDir
,
"/etc/taos"
,
PATH_MAX
);
tstrncpy
(
dmn
.
configDir
,
"/etc/taos"
,
PATH_MAX
);
for
(
int
i
=
1
;
i
<
argc
;
++
i
)
{
if
(
strcmp
(
argv
[
i
],
"-c"
)
==
0
)
{
...
...
@@ -53,19 +53,19 @@ int dmnParseOption(int argc, char const *argv[]) {
printf
(
"config file path overflow"
);
return
-
1
;
}
tstrncpy
(
global
.
configDir
,
argv
[
i
],
PATH_MAX
);
tstrncpy
(
dmn
.
configDir
,
argv
[
i
],
PATH_MAX
);
}
else
{
printf
(
"'-c' requires a parameter, default is %s
\n
"
,
configDir
);
return
-
1
;
}
}
else
if
(
strcmp
(
argv
[
i
],
"-C"
)
==
0
)
{
global
.
dumpConfig
=
true
;
dmn
.
dumpConfig
=
true
;
}
else
if
(
strcmp
(
argv
[
i
],
"-k"
)
==
0
)
{
global
.
generateGrant
=
true
;
dmn
.
generateGrant
=
true
;
}
else
if
(
strcmp
(
argv
[
i
],
"-A"
)
==
0
)
{
global
.
printAuth
=
true
;
dmn
.
printAuth
=
true
;
}
else
if
(
strcmp
(
argv
[
i
],
"-V"
)
==
0
)
{
global
.
printVersion
=
true
;
dmn
.
printVersion
=
true
;
}
else
{
}
}
...
...
@@ -92,7 +92,7 @@ void dmnPrintVersion() {
}
int
dmnReadConfig
(
const
char
*
path
)
{
tstrncpy
(
configDir
,
global
.
configDir
,
PATH_MAX
);
tstrncpy
(
configDir
,
dmn
.
configDir
,
PATH_MAX
);
taosInitGlobalCfg
();
taosReadGlobalLogCfg
();
...
...
@@ -114,12 +114,12 @@ int dmnReadConfig(const char *path) {
}
if
(
taosReadCfgFromFile
()
!=
0
)
{
uError
(
"failed to read
global
config"
);
uError
(
"failed to read config"
);
return
-
1
;
}
if
(
taosCheckAndPrintCfg
()
!=
0
)
{
uError
(
"failed to check
global
config"
);
uError
(
"failed to check config"
);
return
-
1
;
}
...
...
@@ -131,38 +131,50 @@ void dmnDumpConfig() { taosDumpGlobalCfg(); }
void
dmnWaitSignal
()
{
dmnSetSignalHandle
();
while
(
!
global
.
stop
)
{
while
(
!
dmn
.
stop
)
{
taosMsleep
(
100
);
}
}
void
dmnInitOption
(
SDnodeOpt
*
pOption
)
{
pOption
->
sver
=
30000000
;
//3.0.0.0
pOption
->
numOfCores
=
tsNumOfCores
;
pOption
->
numOfSupportVnodes
=
tsNumOfSupportVnodes
;
pOption
->
numOfCommitThreads
=
tsNumOfCommitThreads
;
pOption
->
statusInterval
=
tsStatusInterval
;
pOption
->
numOfThreadsPerCore
=
tsNumOfThreadsPerCore
;
pOption
->
ratioOfQueryCores
=
tsRatioOfQueryCores
;
pOption
->
maxShellConns
=
tsMaxShellConns
;
pOption
->
shellActivityTimer
=
tsShellActivityTimer
;
pOption
->
serverPort
=
tsServerPort
;
tstrncpy
(
pOption
->
dataDir
,
tsDataDir
,
TSDB_FILENAME_LEN
);
tstrncpy
(
pOption
->
localEp
,
tsLocalEp
,
TSDB_EP_LEN
);
tstrncpy
(
pOption
->
localFqdn
,
tsLocalFqdn
,
TSDB_FQDN_LEN
);
tstrncpy
(
pOption
->
firstEp
,
tsFirst
,
TSDB_EP_LEN
);
tstrncpy
(
pOption
->
timezone
,
tsTimezone
,
TSDB_TIMEZONE_LEN
);
tstrncpy
(
pOption
->
locale
,
tsLocale
,
TSDB_LOCALE_LEN
);
tstrncpy
(
pOption
->
charset
,
tsCharset
,
TSDB_LOCALE_LEN
);
tstrncpy
(
pOption
->
buildinfo
,
buildinfo
,
64
);
tstrncpy
(
pOption
->
gitinfo
,
gitinfo
,
48
);
void
dnmInitEnvCfg
(
SDnodeEnvCfg
*
pCfg
)
{
pCfg
->
sver
=
30000000
;
// 3.0.0.0
pCfg
->
numOfCores
=
tsNumOfCores
;
pCfg
->
enableTelem
=
0
;
tstrncpy
(
pCfg
->
timezone
,
tsTimezone
,
TSDB_TIMEZONE_LEN
);
tstrncpy
(
pCfg
->
locale
,
tsLocale
,
TSDB_LOCALE_LEN
);
tstrncpy
(
pCfg
->
charset
,
tsCharset
,
TSDB_LOCALE_LEN
);
tstrncpy
(
pCfg
->
buildinfo
,
buildinfo
,
64
);
tstrncpy
(
pCfg
->
gitinfo
,
gitinfo
,
48
);
}
void
dmnInitObjCfg
(
SDnodeObjCfg
*
pCfg
)
{
pCfg
->
numOfSupportVnodes
=
tsNumOfSupportVnodes
;
pCfg
->
numOfCommitThreads
=
tsNumOfCommitThreads
;
pCfg
->
statusInterval
=
tsStatusInterval
;
pCfg
->
numOfThreadsPerCore
=
tsNumOfThreadsPerCore
;
pCfg
->
ratioOfQueryCores
=
tsRatioOfQueryCores
;
pCfg
->
maxShellConns
=
tsMaxShellConns
;
pCfg
->
shellActivityTimer
=
tsShellActivityTimer
;
pCfg
->
serverPort
=
tsServerPort
;
tstrncpy
(
pCfg
->
dataDir
,
tsDataDir
,
TSDB_FILENAME_LEN
);
tstrncpy
(
pCfg
->
localEp
,
tsLocalEp
,
TSDB_EP_LEN
);
tstrncpy
(
pCfg
->
localFqdn
,
tsLocalFqdn
,
TSDB_FQDN_LEN
);
tstrncpy
(
pCfg
->
firstEp
,
tsFirst
,
TSDB_EP_LEN
);
}
int
dmnRunDnode
()
{
SDnodeOpt
option
=
{
0
};
dmnInitOption
(
&
option
);
SDnodeEnvCfg
envCfg
=
{
0
};
SDnodeObjCfg
objCfg
=
{
0
};
dnmInitEnvCfg
(
&
envCfg
);
dmnInitObjCfg
(
&
objCfg
);
if
(
dndInit
(
&
envCfg
)
!=
0
)
{
uInfo
(
"Failed to start TDengine, please check the log at %s"
,
tsLogDir
);
return
-
1
;
}
SDnode
*
pDnode
=
dnd
Init
(
&
option
);
SDnode
*
pDnode
=
dnd
Create
(
&
objCfg
);
if
(
pDnode
==
NULL
)
{
uInfo
(
"Failed to start TDengine, please check the log at %s"
,
tsLogDir
);
return
-
1
;
...
...
@@ -172,7 +184,8 @@ int dmnRunDnode() {
dmnWaitSignal
();
uInfo
(
"TDengine is shut down!"
);
dndCleanup
(
pDnode
);
dndClose
(
pDnode
);
dndCleanup
();
taosCloseLog
();
return
0
;
}
...
...
@@ -182,21 +195,21 @@ int main(int argc, char const *argv[]) {
return
-
1
;
}
if
(
global
.
generateGrant
)
{
if
(
dmn
.
generateGrant
)
{
dmnGenerateGrant
();
return
0
;
}
if
(
global
.
printVersion
)
{
if
(
dmn
.
printVersion
)
{
dmnPrintVersion
();
return
0
;
}
if
(
dmnReadConfig
(
global
.
configDir
)
!=
0
)
{
if
(
dmnReadConfig
(
dmn
.
configDir
)
!=
0
)
{
return
-
1
;
}
if
(
global
.
dumpConfig
)
{
if
(
dmn
.
dumpConfig
)
{
dmnDumpConfig
();
return
0
;
}
...
...
source/dnode/mgmt/impl/inc/dndEnv.h
浏览文件 @
d6606740
...
...
@@ -20,8 +20,127 @@
extern
"C"
{
#endif
#include "dnd
Env
.h"
#include "dnd
Int
.h"
typedef
struct
{
EWorkerType
type
;
const
char
*
name
;
int32_t
minNum
;
int32_t
maxNum
;
void
*
queueFp
;
SDnode
*
pDnode
;
STaosQueue
*
queue
;
union
{
SWorkerPool
pool
;
SMWorkerPool
mpool
;
};
}
SDnodeWorker
;
typedef
struct
{
char
*
dnode
;
char
*
mnode
;
char
*
snode
;
char
*
bnode
;
char
*
vnodes
;
}
SDnodeDir
;
typedef
struct
{
int32_t
dnodeId
;
int32_t
dropped
;
int64_t
clusterId
;
int64_t
dver
;
int64_t
rebootTime
;
int64_t
updateTime
;
int8_t
statusSent
;
SEpSet
mnodeEpSet
;
char
*
file
;
SHashObj
*
dnodeHash
;
SDnodeEps
*
dnodeEps
;
pthread_t
*
threadId
;
SRWLatch
latch
;
SDnodeWorker
mgmtWorker
;
SDnodeWorker
statusWorker
;
}
SDnodeMgmt
;
typedef
struct
{
int32_t
refCount
;
int8_t
deployed
;
int8_t
dropped
;
SMnode
*
pMnode
;
SRWLatch
latch
;
SDnodeWorker
readWorker
;
SDnodeWorker
writeWorker
;
SDnodeWorker
syncWorker
;
int8_t
replica
;
int8_t
selfIndex
;
SReplica
replicas
[
TSDB_MAX_REPLICA
];
}
SMnodeMgmt
;
typedef
struct
{
int32_t
refCount
;
int8_t
deployed
;
int8_t
dropped
;
SQnode
*
pQnode
;
SRWLatch
latch
;
SDnodeWorker
queryWorker
;
SDnodeWorker
fetchWorker
;
}
SQnodeMgmt
;
typedef
struct
{
int32_t
refCount
;
int8_t
deployed
;
int8_t
dropped
;
SSnode
*
pSnode
;
SRWLatch
latch
;
SDnodeWorker
writeWorker
;
}
SSnodeMgmt
;
typedef
struct
{
int32_t
refCount
;
int8_t
deployed
;
int8_t
dropped
;
SBnode
*
pBnode
;
SRWLatch
latch
;
SDnodeWorker
writeWorker
;
}
SBnodeMgmt
;
typedef
struct
{
SHashObj
*
hash
;
int32_t
openVnodes
;
int32_t
totalVnodes
;
SRWLatch
latch
;
SWorkerPool
queryPool
;
SWorkerPool
fetchPool
;
SMWorkerPool
syncPool
;
SMWorkerPool
writePool
;
}
SVnodesMgmt
;
typedef
struct
{
void
*
serverRpc
;
void
*
clientRpc
;
DndMsgFp
msgFp
[
TDMT_MAX
];
}
STransMgmt
;
typedef
struct
SDnode
{
EStat
stat
;
SDnodeObjCfg
cfg
;
SDnodeEnvCfg
env
;
SDnodeDir
dir
;
FileFd
lockFd
;
SDnodeMgmt
dmgmt
;
SMnodeMgmt
mmgmt
;
SQnodeMgmt
qmgmt
;
SSnodeMgmt
smgmt
;
SBnodeMgmt
bmgmt
;
SVnodesMgmt
vmgmt
;
STransMgmt
tmgmt
;
SStartupReq
startup
;
}
SDnode
;
typedef
struct
{
int8_t
once
;
SDnodeEnvCfg
cfg
;
}
SDnodeEnv
;
#ifdef __cplusplus
}
...
...
source/dnode/mgmt/impl/inc/dndInt.h
浏览文件 @
d6606740
...
...
@@ -55,125 +55,12 @@ extern int32_t dDebugFlag;
typedef
enum
{
DND_STAT_INIT
,
DND_STAT_RUNNING
,
DND_STAT_STOPPED
}
EStat
;
typedef
enum
{
DND_WORKER_SINGLE
,
DND_WORKER_MULTI
}
EWorkerType
;
typedef
enum
{
DND_ENV_INIT
=
0
,
DND_ENV_READY
=
1
,
DND_ENV_CLEANUP
=
2
}
EEnvStat
;
typedef
void
(
*
DndMsgFp
)(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEps
);
typedef
struct
{
EWorkerType
type
;
const
char
*
name
;
int32_t
minNum
;
int32_t
maxNum
;
void
*
queueFp
;
SDnode
*
pDnode
;
STaosQueue
*
queue
;
union
{
SWorkerPool
pool
;
SMWorkerPool
mpool
;
};
}
SDnodeWorker
;
typedef
struct
{
char
*
dnode
;
char
*
mnode
;
char
*
snode
;
char
*
bnode
;
char
*
vnodes
;
}
SDnodeDir
;
typedef
struct
{
int32_t
dnodeId
;
int32_t
dropped
;
int64_t
clusterId
;
int64_t
dver
;
int64_t
rebootTime
;
int64_t
updateTime
;
int8_t
statusSent
;
SEpSet
mnodeEpSet
;
char
*
file
;
SHashObj
*
dnodeHash
;
SDnodeEps
*
dnodeEps
;
pthread_t
*
threadId
;
SRWLatch
latch
;
SDnodeWorker
mgmtWorker
;
SDnodeWorker
statusWorker
;
}
SDnodeMgmt
;
typedef
struct
{
int32_t
refCount
;
int8_t
deployed
;
int8_t
dropped
;
SMnode
*
pMnode
;
SRWLatch
latch
;
SDnodeWorker
readWorker
;
SDnodeWorker
writeWorker
;
SDnodeWorker
syncWorker
;
int8_t
replica
;
int8_t
selfIndex
;
SReplica
replicas
[
TSDB_MAX_REPLICA
];
}
SMnodeMgmt
;
typedef
struct
{
int32_t
refCount
;
int8_t
deployed
;
int8_t
dropped
;
SQnode
*
pQnode
;
SRWLatch
latch
;
SDnodeWorker
queryWorker
;
SDnodeWorker
fetchWorker
;
}
SQnodeMgmt
;
typedef
struct
{
int32_t
refCount
;
int8_t
deployed
;
int8_t
dropped
;
SSnode
*
pSnode
;
SRWLatch
latch
;
SDnodeWorker
writeWorker
;
}
SSnodeMgmt
;
typedef
struct
{
int32_t
refCount
;
int8_t
deployed
;
int8_t
dropped
;
SBnode
*
pBnode
;
SRWLatch
latch
;
SDnodeWorker
writeWorker
;
}
SBnodeMgmt
;
typedef
struct
{
SHashObj
*
hash
;
int32_t
openVnodes
;
int32_t
totalVnodes
;
SRWLatch
latch
;
SWorkerPool
queryPool
;
SWorkerPool
fetchPool
;
SMWorkerPool
syncPool
;
SMWorkerPool
writePool
;
}
SVnodesMgmt
;
typedef
struct
{
void
*
serverRpc
;
void
*
clientRpc
;
DndMsgFp
msgFp
[
TDMT_MAX
];
}
STransMgmt
;
typedef
struct
SDnode
{
EStat
stat
;
SDnodeOpt
opt
;
SDnodeDir
dir
;
FileFd
lockFd
;
SDnodeMgmt
dmgmt
;
SMnodeMgmt
mmgmt
;
SQnodeMgmt
qmgmt
;
SSnodeMgmt
smgmt
;
SBnodeMgmt
bmgmt
;
SVnodesMgmt
vmgmt
;
STransMgmt
tmgmt
;
SStartupReq
startup
;
}
SDnode
;
EStat
dndGetStat
(
SDnode
*
pDnode
);
void
dndSetStat
(
SDnode
*
pDnode
,
EStat
stat
);
char
*
dndStatStr
(
EStat
stat
);
EStat
dndGetStat
(
SDnode
*
pDnode
);
void
dndSetStat
(
SDnode
*
pDnode
,
EStat
stat
);
const
char
*
dndStatStr
(
EStat
stat
);
void
dndReportStartup
(
SDnode
*
pDnode
,
char
*
pName
,
char
*
pDesc
);
void
dndGetStartup
(
SDnode
*
pDnode
,
SStartupReq
*
pStartup
);
...
...
source/dnode/mgmt/impl/src/dndBnode.c
浏览文件 @
d6606740
...
...
@@ -179,7 +179,7 @@ static void dndBuildBnodeOption(SDnode *pDnode, SBnodeOpt *pOption) {
pOption
->
sendRedirectRspFp
=
dndSendRedirectRsp
;
pOption
->
dnodeId
=
dndGetDnodeId
(
pDnode
);
pOption
->
clusterId
=
dndGetClusterId
(
pDnode
);
pOption
->
cfg
.
sver
=
pDnode
->
opt
.
sver
;
pOption
->
cfg
.
sver
=
pDnode
->
env
.
sver
;
}
static
int32_t
dndOpenBnode
(
SDnode
*
pDnode
)
{
...
...
source/dnode/mgmt/impl/src/dndEnv.c
浏览文件 @
d6606740
...
...
@@ -25,6 +25,8 @@
#include "tfs.h"
#include "wal.h"
static
SDnodeEnv
dndEnv
=
{
0
};
EStat
dndGetStat
(
SDnode
*
pDnode
)
{
return
pDnode
->
stat
;
}
void
dndSetStat
(
SDnode
*
pDnode
,
EStat
stat
)
{
...
...
@@ -32,7 +34,7 @@ void dndSetStat(SDnode *pDnode, EStat stat) {
pDnode
->
stat
=
stat
;
}
char
*
dndStatStr
(
EStat
stat
)
{
c
onst
c
har
*
dndStatStr
(
EStat
stat
)
{
switch
(
stat
)
{
case
DND_STAT_INIT
:
return
"init"
;
...
...
@@ -79,25 +81,26 @@ static FileFd dndCheckRunning(char *dataDir) {
return
fd
;
}
static
int32_t
dnd
InitEnv
(
SDnode
*
pDnode
,
SDnodeOpt
*
pOption
)
{
pDnode
->
lockFd
=
dndCheckRunning
(
p
Option
->
dataDir
);
static
int32_t
dnd
CreateImp
(
SDnode
*
pDnode
,
SDnodeObjCfg
*
pCfg
)
{
pDnode
->
lockFd
=
dndCheckRunning
(
p
Cfg
->
dataDir
);
if
(
pDnode
->
lockFd
<
0
)
{
return
-
1
;
}
char
path
[
PATH_MAX
+
100
];
snprintf
(
path
,
sizeof
(
path
),
"%s%smnode"
,
p
Option
->
dataDir
,
TD_DIRSEP
);
snprintf
(
path
,
sizeof
(
path
),
"%s%smnode"
,
p
Cfg
->
dataDir
,
TD_DIRSEP
);
pDnode
->
dir
.
mnode
=
tstrdup
(
path
);
snprintf
(
path
,
sizeof
(
path
),
"%s%svnode"
,
p
Option
->
dataDir
,
TD_DIRSEP
);
snprintf
(
path
,
sizeof
(
path
),
"%s%svnode"
,
p
Cfg
->
dataDir
,
TD_DIRSEP
);
pDnode
->
dir
.
vnodes
=
tstrdup
(
path
);
snprintf
(
path
,
sizeof
(
path
),
"%s%sdnode"
,
p
Option
->
dataDir
,
TD_DIRSEP
);
snprintf
(
path
,
sizeof
(
path
),
"%s%sdnode"
,
p
Cfg
->
dataDir
,
TD_DIRSEP
);
pDnode
->
dir
.
dnode
=
tstrdup
(
path
);
snprintf
(
path
,
sizeof
(
path
),
"%s%ssnode"
,
p
Option
->
dataDir
,
TD_DIRSEP
);
snprintf
(
path
,
sizeof
(
path
),
"%s%ssnode"
,
p
Cfg
->
dataDir
,
TD_DIRSEP
);
pDnode
->
dir
.
snode
=
tstrdup
(
path
);
snprintf
(
path
,
sizeof
(
path
),
"%s%sbnode"
,
p
Option
->
dataDir
,
TD_DIRSEP
);
snprintf
(
path
,
sizeof
(
path
),
"%s%sbnode"
,
p
Cfg
->
dataDir
,
TD_DIRSEP
);
pDnode
->
dir
.
bnode
=
tstrdup
(
path
);
if
(
pDnode
->
dir
.
mnode
==
NULL
||
pDnode
->
dir
.
vnodes
==
NULL
||
pDnode
->
dir
.
dnode
==
NULL
)
{
if
(
pDnode
->
dir
.
mnode
==
NULL
||
pDnode
->
dir
.
vnodes
==
NULL
||
pDnode
->
dir
.
dnode
==
NULL
||
pDnode
->
dir
.
snode
==
NULL
||
pDnode
->
dir
.
bnode
==
NULL
)
{
dError
(
"failed to malloc dir object"
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
...
...
@@ -133,11 +136,12 @@ static int32_t dndInitEnv(SDnode *pDnode, SDnodeOpt *pOption) {
return
-
1
;
}
memcpy
(
&
pDnode
->
opt
,
pOption
,
sizeof
(
SDnodeOpt
));
memcpy
(
&
pDnode
->
cfg
,
pCfg
,
sizeof
(
SDnodeObjCfg
));
memcpy
(
&
pDnode
->
env
,
&
dndEnv
.
cfg
,
sizeof
(
SDnodeEnvCfg
));
return
0
;
}
static
void
dndCl
eanupEnv
(
SDnode
*
pDnode
)
{
static
void
dndCl
oseImp
(
SDnode
*
pDnode
)
{
tfree
(
pDnode
->
dir
.
mnode
);
tfree
(
pDnode
->
dir
.
vnodes
);
tfree
(
pDnode
->
dir
.
dnode
);
...
...
@@ -149,126 +153,121 @@ static void dndCleanupEnv(SDnode *pDnode) {
taosCloseFile
(
pDnode
->
lockFd
);
pDnode
->
lockFd
=
0
;
}
taosStopCacheRefreshWorker
();
}
SDnode
*
dndInit
(
SDnodeOpt
*
pOption
)
{
taosIgnSIGPIPE
();
taosBlockSIGPIPE
();
taosResolveCRC
();
SDnode
*
dndCreate
(
SDnodeObjCfg
*
pCfg
)
{
dInfo
(
"start to create dnode object"
);
SDnode
*
pDnode
=
calloc
(
1
,
sizeof
(
SDnode
));
if
(
pDnode
==
NULL
)
{
dError
(
"failed to create dnode object"
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
dError
(
"failed to create dnode object since %s"
,
terrstr
());
return
NULL
;
}
dInfo
(
"start to initialize TDengine"
);
dndSetStat
(
pDnode
,
DND_STAT_INIT
);
if
(
dnd
InitEnv
(
pDnode
,
pOption
)
!=
0
)
{
dError
(
"failed to init
env"
);
dndCl
eanup
(
pDnode
);
if
(
dnd
CreateImp
(
pDnode
,
pCfg
)
!=
0
)
{
dError
(
"failed to init
dnode dir since %s"
,
terrstr
()
);
dndCl
ose
(
pDnode
);
return
NULL
;
}
if
(
rpcInit
()
!=
0
)
{
dError
(
"failed to init rpc
env"
);
dndCl
eanup
(
pDnode
);
dError
(
"failed to init rpc
since %s"
,
terrstr
()
);
dndCl
ose
(
pDnode
);
return
NULL
;
}
if
(
walInit
()
!=
0
)
{
dError
(
"failed to init wal
env"
);
dndCl
eanup
(
pDnode
);
dError
(
"failed to init wal
since %s"
,
terrstr
()
);
dndCl
ose
(
pDnode
);
return
NULL
;
}
SDiskCfg
dCfg
;
strcpy
(
dCfg
.
dir
,
pDnode
->
opt
.
dataDir
);
strcpy
(
dCfg
.
dir
,
pDnode
->
cfg
.
dataDir
);
dCfg
.
level
=
0
;
dCfg
.
primary
=
1
;
if
(
tfsInit
(
&
dCfg
,
1
)
!=
0
)
{
dError
(
"failed to init tfs
env"
);
dndCl
eanup
(
pDnode
);
dError
(
"failed to init tfs
since %s"
,
terrstr
()
);
dndCl
ose
(
pDnode
);
return
NULL
;
}
SVnodeOpt
vnodeOpt
=
{
.
sver
=
pDnode
->
opt
.
sver
,
.
timezone
=
pDnode
->
opt
.
timezone
,
.
locale
=
pDnode
->
opt
.
locale
,
.
charset
=
pDnode
->
opt
.
charset
,
.
nthreads
=
pDnode
->
opt
.
numOfCommitThreads
,
.
sver
=
pDnode
->
env
.
sver
,
.
timezone
=
pDnode
->
env
.
timezone
,
.
locale
=
pDnode
->
env
.
locale
,
.
charset
=
pDnode
->
env
.
charset
,
.
nthreads
=
pDnode
->
cfg
.
numOfCommitThreads
,
.
putReqToVQueryQFp
=
dndPutReqToVQueryQ
,
};
if
(
vnodeInit
(
&
vnodeOpt
)
!=
0
)
{
dError
(
"failed to init vnode
env"
);
dndCl
eanup
(
pDnode
);
dError
(
"failed to init vnode
since %s"
,
terrstr
()
);
dndCl
ose
(
pDnode
);
return
NULL
;
}
if
(
dndInitMgmt
(
pDnode
)
!=
0
)
{
dError
(
"failed to init
dnode"
);
dndCl
eanup
(
pDnode
);
dError
(
"failed to init
mgmt since %s"
,
terrstr
()
);
dndCl
ose
(
pDnode
);
return
NULL
;
}
if
(
dndInitVnodes
(
pDnode
)
!=
0
)
{
dError
(
"failed to init vnodes
"
);
dndCl
eanup
(
pDnode
);
dError
(
"failed to init vnodes
since %s"
,
terrstr
()
);
dndCl
ose
(
pDnode
);
return
NULL
;
}
if
(
dndInitQnode
(
pDnode
)
!=
0
)
{
dError
(
"failed to init qnode
"
);
dndCl
eanup
(
pDnode
);
dError
(
"failed to init qnode
since %s"
,
terrstr
()
);
dndCl
ose
(
pDnode
);
return
NULL
;
}
if
(
dndInitSnode
(
pDnode
)
!=
0
)
{
dError
(
"failed to init snode
"
);
dndCl
eanup
(
pDnode
);
dError
(
"failed to init snode
since %s"
,
terrstr
()
);
dndCl
ose
(
pDnode
);
return
NULL
;
}
if
(
dndInitBnode
(
pDnode
)
!=
0
)
{
dError
(
"failed to init bnode
"
);
dndCl
eanup
(
pDnode
);
dError
(
"failed to init bnode
since %s"
,
terrstr
()
);
dndCl
ose
(
pDnode
);
return
NULL
;
}
if
(
dndInitMnode
(
pDnode
)
!=
0
)
{
dError
(
"failed to init mnode
"
);
dndCl
eanup
(
pDnode
);
dError
(
"failed to init mnode
since %s"
,
terrstr
()
);
dndCl
ose
(
pDnode
);
return
NULL
;
}
if
(
dndInitTrans
(
pDnode
)
!=
0
)
{
dError
(
"failed to init transport
"
);
dndCl
eanup
(
pDnode
);
dError
(
"failed to init transport
since %s"
,
terrstr
()
);
dndCl
ose
(
pDnode
);
return
NULL
;
}
dndSetStat
(
pDnode
,
DND_STAT_RUNNING
);
dndSendStatusReq
(
pDnode
);
dndReportStartup
(
pDnode
,
"TDengine"
,
"initialized successfully"
);
dInfo
(
"
TDengine is initialized successfully, pDnode
:%p"
,
pDnode
);
dInfo
(
"
dnode object is created, data
:%p"
,
pDnode
);
return
pDnode
;
}
void
dndCl
eanup
(
SDnode
*
pDnode
)
{
void
dndCl
ose
(
SDnode
*
pDnode
)
{
if
(
pDnode
==
NULL
)
return
;
if
(
dndGetStat
(
pDnode
)
==
DND_STAT_STOPPED
)
{
dError
(
"dnode is shutting down
"
);
dError
(
"dnode is shutting down
, data:%p"
,
pDnode
);
return
;
}
dInfo
(
"start to cl
eanup TDengine"
);
dInfo
(
"start to cl
ose dnode, data:%p"
,
pDnode
);
dndSetStat
(
pDnode
,
DND_STAT_STOPPED
);
dndCleanupTrans
(
pDnode
);
dndStopMgmt
(
pDnode
);
...
...
@@ -283,7 +282,33 @@ void dndCleanup(SDnode *pDnode) {
walCleanUp
();
rpcCleanup
();
dndCl
eanupEnv
(
pDnode
);
dndCl
oseImp
(
pDnode
);
free
(
pDnode
);
dInfo
(
"
TDengine is cleaned up successfully"
);
dInfo
(
"
dnode object is closed, data:%p"
,
pDnode
);
}
int32_t
dndInit
(
const
SDnodeEnvCfg
*
pCfg
)
{
if
(
atomic_val_compare_exchange_8
(
&
dndEnv
.
once
,
DND_ENV_INIT
,
DND_ENV_READY
)
!=
DND_ENV_INIT
)
{
terrno
=
TSDB_CODE_REPEAT_INIT
;
dError
(
"failed to init dnode env since %s"
,
terrstr
());
return
-
1
;
}
taosIgnSIGPIPE
();
taosBlockSIGPIPE
();
taosResolveCRC
();
memcpy
(
&
dndEnv
.
cfg
,
pCfg
,
sizeof
(
SDnodeEnvCfg
));
dInfo
(
"dnode env is initialized"
);
return
0
;
}
void
dndCleanup
()
{
if
(
atomic_val_compare_exchange_8
(
&
dndEnv
.
once
,
DND_ENV_READY
,
DND_ENV_CLEANUP
)
!=
DND_ENV_READY
)
{
dError
(
"dnode env is already cleaned up"
);
return
;
}
taosStopCacheRefreshWorker
();
dInfo
(
"dnode env is cleaned up"
);
}
\ No newline at end of file
source/dnode/mgmt/impl/src/dndMgmt.c
浏览文件 @
d6606740
...
...
@@ -86,7 +86,7 @@ void dndSendRedirectRsp(SDnode *pDnode, SRpcMsg *pReq) {
dDebug
(
"RPC %p, req:%s is redirected, num:%d use:%d"
,
pReq
->
handle
,
TMSG_INFO
(
msgType
),
epSet
.
numOfEps
,
epSet
.
inUse
);
for
(
int32_t
i
=
0
;
i
<
epSet
.
numOfEps
;
++
i
)
{
dDebug
(
"mnode index:%d %s:%u"
,
i
,
epSet
.
fqdn
[
i
],
epSet
.
port
[
i
]);
if
(
strcmp
(
epSet
.
fqdn
[
i
],
pDnode
->
opt
.
localFqdn
)
==
0
&&
epSet
.
port
[
i
]
==
pDnode
->
opt
.
serverPort
)
{
if
(
strcmp
(
epSet
.
fqdn
[
i
],
pDnode
->
cfg
.
localFqdn
)
==
0
&&
epSet
.
port
[
i
]
==
pDnode
->
cfg
.
serverPort
)
{
epSet
.
inUse
=
(
i
+
1
)
%
epSet
.
numOfEps
;
}
...
...
@@ -289,8 +289,8 @@ PRASE_DNODE_OVER:
if
(
root
!=
NULL
)
cJSON_Delete
(
root
);
if
(
fp
!=
NULL
)
fclose
(
fp
);
if
(
dndIsEpChanged
(
pDnode
,
pMgmt
->
dnodeId
,
pDnode
->
opt
.
localEp
))
{
dError
(
"localEp %s different with %s and need reconfigured"
,
pDnode
->
opt
.
localEp
,
pMgmt
->
file
);
if
(
dndIsEpChanged
(
pDnode
,
pMgmt
->
dnodeId
,
pDnode
->
cfg
.
localEp
))
{
dError
(
"localEp %s different with %s and need reconfigured"
,
pDnode
->
cfg
.
localEp
,
pMgmt
->
file
);
return
-
1
;
}
...
...
@@ -298,7 +298,7 @@ PRASE_DNODE_OVER:
pMgmt
->
dnodeEps
=
calloc
(
1
,
sizeof
(
SDnodeEps
)
+
sizeof
(
SDnodeEp
));
pMgmt
->
dnodeEps
->
num
=
1
;
pMgmt
->
dnodeEps
->
eps
[
0
].
isMnode
=
1
;
taosGetFqdnPortFromEp
(
pDnode
->
opt
.
firstEp
,
pMgmt
->
dnodeEps
->
eps
[
0
].
fqdn
,
&
pMgmt
->
dnodeEps
->
eps
[
0
].
port
);
taosGetFqdnPortFromEp
(
pDnode
->
cfg
.
firstEp
,
pMgmt
->
dnodeEps
->
eps
[
0
].
fqdn
,
&
pMgmt
->
dnodeEps
->
eps
[
0
].
port
);
}
dndResetDnodes
(
pDnode
,
pMgmt
->
dnodeEps
);
...
...
@@ -362,24 +362,24 @@ void dndSendStatusReq(SDnode *pDnode) {
SDnodeMgmt
*
pMgmt
=
&
pDnode
->
dmgmt
;
taosRLockLatch
(
&
pMgmt
->
latch
);
pStatus
->
sver
=
htonl
(
pDnode
->
opt
.
sver
);
pStatus
->
sver
=
htonl
(
pDnode
->
env
.
sver
);
pStatus
->
dver
=
htobe64
(
pMgmt
->
dver
);
pStatus
->
dnodeId
=
htonl
(
pMgmt
->
dnodeId
);
pStatus
->
clusterId
=
htobe64
(
pMgmt
->
clusterId
);
pStatus
->
rebootTime
=
htobe64
(
pMgmt
->
rebootTime
);
pStatus
->
updateTime
=
htobe64
(
pMgmt
->
updateTime
);
pStatus
->
numOfCores
=
htonl
(
pDnode
->
opt
.
numOfCores
);
pStatus
->
numOfSupportVnodes
=
htonl
(
pDnode
->
opt
.
numOfSupportVnodes
);
tstrncpy
(
pStatus
->
dnodeEp
,
pDnode
->
opt
.
localEp
,
TSDB_EP_LEN
);
pStatus
->
numOfCores
=
htonl
(
pDnode
->
env
.
numOfCores
);
pStatus
->
numOfSupportVnodes
=
htonl
(
pDnode
->
cfg
.
numOfSupportVnodes
);
tstrncpy
(
pStatus
->
dnodeEp
,
pDnode
->
cfg
.
localEp
,
TSDB_EP_LEN
);
pStatus
->
clusterCfg
.
statusInterval
=
htonl
(
pDnode
->
opt
.
statusInterval
);
pStatus
->
clusterCfg
.
statusInterval
=
htonl
(
pDnode
->
cfg
.
statusInterval
);
pStatus
->
clusterCfg
.
checkTime
=
0
;
char
timestr
[
32
]
=
"1970-01-01 00:00:00.00"
;
(
void
)
taosParseTime
(
timestr
,
&
pStatus
->
clusterCfg
.
checkTime
,
(
int32_t
)
strlen
(
timestr
),
TSDB_TIME_PRECISION_MILLI
,
0
);
pStatus
->
clusterCfg
.
checkTime
=
htonl
(
pStatus
->
clusterCfg
.
checkTime
);
tstrncpy
(
pStatus
->
clusterCfg
.
timezone
,
pDnode
->
opt
.
timezone
,
TSDB_TIMEZONE_LEN
);
tstrncpy
(
pStatus
->
clusterCfg
.
locale
,
pDnode
->
opt
.
locale
,
TSDB_LOCALE_LEN
);
tstrncpy
(
pStatus
->
clusterCfg
.
charset
,
pDnode
->
opt
.
charset
,
TSDB_LOCALE_LEN
);
tstrncpy
(
pStatus
->
clusterCfg
.
timezone
,
pDnode
->
env
.
timezone
,
TSDB_TIMEZONE_LEN
);
tstrncpy
(
pStatus
->
clusterCfg
.
locale
,
pDnode
->
env
.
locale
,
TSDB_LOCALE_LEN
);
tstrncpy
(
pStatus
->
clusterCfg
.
charset
,
pDnode
->
env
.
charset
,
TSDB_LOCALE_LEN
);
taosRUnLockLatch
(
&
pMgmt
->
latch
);
dndGetVnodeLoads
(
pDnode
,
&
pStatus
->
vnodeLoads
);
...
...
@@ -485,7 +485,7 @@ void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
static
void
*
dnodeThreadRoutine
(
void
*
param
)
{
SDnode
*
pDnode
=
param
;
SDnodeMgmt
*
pMgmt
=
&
pDnode
->
dmgmt
;
int32_t
ms
=
pDnode
->
opt
.
statusInterval
*
1000
;
int32_t
ms
=
pDnode
->
cfg
.
statusInterval
*
1000
;
while
(
true
)
{
pthread_testcancel
();
...
...
source/dnode/mgmt/impl/src/dndMnode.c
浏览文件 @
d6606740
...
...
@@ -247,7 +247,7 @@ static bool dndNeedDeployMnode(SDnode *pDnode) {
return
false
;
}
if
(
strcmp
(
pDnode
->
opt
.
localEp
,
pDnode
->
opt
.
firstEp
)
!=
0
)
{
if
(
strcmp
(
pDnode
->
cfg
.
localEp
,
pDnode
->
cfg
.
firstEp
)
!=
0
)
{
return
false
;
}
...
...
@@ -266,15 +266,15 @@ static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) {
pOption
->
putReqToMWriteQFp
=
dndPutMsgToMWriteQ
;
pOption
->
dnodeId
=
dndGetDnodeId
(
pDnode
);
pOption
->
clusterId
=
dndGetClusterId
(
pDnode
);
pOption
->
cfg
.
sver
=
pDnode
->
opt
.
sver
;
pOption
->
cfg
.
enableTelem
=
pDnode
->
opt
.
enableTelem
;
pOption
->
cfg
.
statusInterval
=
pDnode
->
opt
.
statusInterval
;
pOption
->
cfg
.
shellActivityTimer
=
pDnode
->
opt
.
shellActivityTimer
;
pOption
->
cfg
.
timezone
=
pDnode
->
opt
.
timezone
;
pOption
->
cfg
.
charset
=
pDnode
->
opt
.
charset
;
pOption
->
cfg
.
locale
=
pDnode
->
opt
.
locale
;
pOption
->
cfg
.
gitinfo
=
pDnode
->
opt
.
gitinfo
;
pOption
->
cfg
.
buildinfo
=
pDnode
->
opt
.
buildinfo
;
pOption
->
cfg
.
sver
=
pDnode
->
env
.
sver
;
pOption
->
cfg
.
enableTelem
=
pDnode
->
env
.
enableTelem
;
pOption
->
cfg
.
statusInterval
=
pDnode
->
cfg
.
statusInterval
;
pOption
->
cfg
.
shellActivityTimer
=
pDnode
->
cfg
.
shellActivityTimer
;
pOption
->
cfg
.
timezone
=
pDnode
->
env
.
timezone
;
pOption
->
cfg
.
charset
=
pDnode
->
env
.
charset
;
pOption
->
cfg
.
locale
=
pDnode
->
env
.
locale
;
pOption
->
cfg
.
gitinfo
=
pDnode
->
env
.
gitinfo
;
pOption
->
cfg
.
buildinfo
=
pDnode
->
env
.
buildinfo
;
}
static
void
dndBuildMnodeDeployOption
(
SDnode
*
pDnode
,
SMnodeOpt
*
pOption
)
{
...
...
@@ -283,8 +283,8 @@ static void dndBuildMnodeDeployOption(SDnode *pDnode, SMnodeOpt *pOption) {
pOption
->
selfIndex
=
0
;
SReplica
*
pReplica
=
&
pOption
->
replicas
[
0
];
pReplica
->
id
=
1
;
pReplica
->
port
=
pDnode
->
opt
.
serverPort
;
memcpy
(
pReplica
->
fqdn
,
pDnode
->
opt
.
localFqdn
,
TSDB_FQDN_LEN
);
pReplica
->
port
=
pDnode
->
cfg
.
serverPort
;
memcpy
(
pReplica
->
fqdn
,
pDnode
->
cfg
.
localFqdn
,
TSDB_FQDN_LEN
);
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
pMgmt
->
selfIndex
=
pOption
->
selfIndex
;
...
...
source/dnode/mgmt/impl/src/dndQnode.c
浏览文件 @
d6606740
...
...
@@ -185,7 +185,7 @@ static void dndBuildQnodeOption(SDnode *pDnode, SQnodeOpt *pOption) {
pOption
->
sendRedirectRspFp
=
dndSendRedirectRsp
;
pOption
->
dnodeId
=
dndGetDnodeId
(
pDnode
);
pOption
->
clusterId
=
dndGetClusterId
(
pDnode
);
pOption
->
cfg
.
sver
=
pDnode
->
opt
.
sver
;
pOption
->
cfg
.
sver
=
pDnode
->
env
.
sver
;
}
static
int32_t
dndOpenQnode
(
SDnode
*
pDnode
)
{
...
...
source/dnode/mgmt/impl/src/dndSnode.c
浏览文件 @
d6606740
...
...
@@ -179,7 +179,7 @@ static void dndBuildSnodeOption(SDnode *pDnode, SSnodeOpt *pOption) {
pOption
->
sendRedirectRspFp
=
dndSendRedirectRsp
;
pOption
->
dnodeId
=
dndGetDnodeId
(
pDnode
);
pOption
->
clusterId
=
dndGetClusterId
(
pDnode
);
pOption
->
cfg
.
sver
=
pDnode
->
opt
.
sver
;
pOption
->
cfg
.
sver
=
pDnode
->
env
.
sver
;
}
static
int32_t
dndOpenSnode
(
SDnode
*
pDnode
)
{
...
...
source/dnode/mgmt/impl/src/dndTransport.c
浏览文件 @
d6606740
...
...
@@ -176,7 +176,7 @@ static int32_t dndInitClient(SDnode *pDnode) {
rpcInit
.
cfp
=
dndProcessResponse
;
rpcInit
.
sessions
=
1024
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
rpcInit
.
idleTime
=
pDnode
->
opt
.
shellActivityTimer
*
1000
;
rpcInit
.
idleTime
=
pDnode
->
cfg
.
shellActivityTimer
*
1000
;
rpcInit
.
user
=
INTERNAL_USER
;
rpcInit
.
ckey
=
INTERNAL_CKEY
;
rpcInit
.
secret
=
INTERNAL_SECRET
;
...
...
@@ -325,20 +325,20 @@ static int32_t dndInitServer(SDnode *pDnode) {
STransMgmt
*
pMgmt
=
&
pDnode
->
tmgmt
;
dndInitMsgFp
(
pMgmt
);
int32_t
numOfThreads
=
(
int32_t
)((
pDnode
->
opt
.
numOfCores
*
pDnode
->
opt
.
numOfThreadsPerCore
)
/
2
.
0
);
int32_t
numOfThreads
=
(
int32_t
)((
pDnode
->
env
.
numOfCores
*
pDnode
->
cfg
.
numOfThreadsPerCore
)
/
2
.
0
);
if
(
numOfThreads
<
1
)
{
numOfThreads
=
1
;
}
SRpcInit
rpcInit
;
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
rpcInit
.
localPort
=
pDnode
->
opt
.
serverPort
;
rpcInit
.
localPort
=
pDnode
->
cfg
.
serverPort
;
rpcInit
.
label
=
"DND-S"
;
rpcInit
.
numOfThreads
=
numOfThreads
;
rpcInit
.
cfp
=
dndProcessRequest
;
rpcInit
.
sessions
=
pDnode
->
opt
.
maxShellConns
;
rpcInit
.
sessions
=
pDnode
->
cfg
.
maxShellConns
;
rpcInit
.
connType
=
TAOS_CONN_SERVER
;
rpcInit
.
idleTime
=
pDnode
->
opt
.
shellActivityTimer
*
1000
;
rpcInit
.
idleTime
=
pDnode
->
cfg
.
shellActivityTimer
*
1000
;
rpcInit
.
afp
=
dndRetrieveUserAuthInfo
;
rpcInit
.
parent
=
pDnode
;
...
...
source/dnode/mgmt/impl/src/dndVnodes.c
浏览文件 @
d6606740
...
...
@@ -420,7 +420,7 @@ static int32_t dndOpenVnodes(SDnode *pDnode) {
pMgmt
->
totalVnodes
=
numOfVnodes
;
int32_t
threadNum
=
pDnode
->
opt
.
numOfCores
;
int32_t
threadNum
=
pDnode
->
env
.
numOfCores
;
int32_t
vnodesPerThread
=
numOfVnodes
/
threadNum
+
1
;
SVnodeThread
*
threads
=
calloc
(
threadNum
,
sizeof
(
SVnodeThread
));
...
...
@@ -904,11 +904,11 @@ static int32_t dndInitVnodeWorkers(SDnode *pDnode) {
SVnodesMgmt
*
pMgmt
=
&
pDnode
->
vmgmt
;
int32_t
maxFetchThreads
=
4
;
int32_t
minFetchThreads
=
MIN
(
maxFetchThreads
,
pDnode
->
opt
.
numOfCores
);
int32_t
minQueryThreads
=
MAX
((
int32_t
)(
pDnode
->
opt
.
numOfCores
*
pDnode
->
opt
.
ratioOfQueryCores
),
1
);
int32_t
minFetchThreads
=
MIN
(
maxFetchThreads
,
pDnode
->
env
.
numOfCores
);
int32_t
minQueryThreads
=
MAX
((
int32_t
)(
pDnode
->
env
.
numOfCores
*
pDnode
->
cfg
.
ratioOfQueryCores
),
1
);
int32_t
maxQueryThreads
=
minQueryThreads
;
int32_t
maxWriteThreads
=
MAX
(
pDnode
->
opt
.
numOfCores
,
1
);
int32_t
maxSyncThreads
=
MAX
(
pDnode
->
opt
.
numOfCores
/
2
,
1
);
int32_t
maxWriteThreads
=
MAX
(
pDnode
->
env
.
numOfCores
,
1
);
int32_t
maxSyncThreads
=
MAX
(
pDnode
->
env
.
numOfCores
/
2
,
1
);
SWorkerPool
*
pPool
=
&
pMgmt
->
queryPool
;
pPool
->
name
=
"vnode-query"
;
...
...
source/dnode/mgmt/impl/test/sut/inc/server.h
浏览文件 @
d6606740
...
...
@@ -24,7 +24,7 @@ class TestServer {
bool
DoStart
();
private:
SDnodeO
pt
BuildOption
(
const
char
*
path
,
const
char
*
fqdn
,
uint16_t
port
,
const
char
*
firstEp
);
SDnodeO
bjCfg
BuildOption
(
const
char
*
path
,
const
char
*
fqdn
,
uint16_t
port
,
const
char
*
firstEp
);
private:
SDnode
*
pDnode
;
...
...
source/dnode/mgmt/impl/test/sut/src/server.cpp
浏览文件 @
d6606740
...
...
@@ -22,30 +22,28 @@ void* serverLoop(void* param) {
}
}
SDnodeOpt
TestServer
::
BuildOption
(
const
char
*
path
,
const
char
*
fqdn
,
uint16_t
port
,
const
char
*
firstEp
)
{
SDnodeOpt
option
=
{
0
};
option
.
sver
=
1
;
option
.
numOfCores
=
1
;
option
.
numOfSupportVnodes
=
16
;
option
.
numOfCommitThreads
=
1
;
option
.
statusInterval
=
1
;
option
.
numOfThreadsPerCore
=
1
;
option
.
ratioOfQueryCores
=
1
;
option
.
maxShellConns
=
1000
;
option
.
shellActivityTimer
=
30
;
option
.
serverPort
=
port
;
strcpy
(
option
.
dataDir
,
path
);
snprintf
(
option
.
localEp
,
TSDB_EP_LEN
,
"%s:%u"
,
fqdn
,
port
);
snprintf
(
option
.
localFqdn
,
TSDB_FQDN_LEN
,
"%s"
,
fqdn
);
snprintf
(
option
.
firstEp
,
TSDB_EP_LEN
,
"%s"
,
firstEp
);
return
option
;
SDnodeObjCfg
TestServer
::
BuildOption
(
const
char
*
path
,
const
char
*
fqdn
,
uint16_t
port
,
const
char
*
firstEp
)
{
SDnodeObjCfg
cfg
=
{
0
};
cfg
.
numOfSupportVnodes
=
16
;
cfg
.
numOfCommitThreads
=
1
;
cfg
.
statusInterval
=
1
;
cfg
.
numOfThreadsPerCore
=
1
;
cfg
.
ratioOfQueryCores
=
1
;
cfg
.
maxShellConns
=
1000
;
cfg
.
shellActivityTimer
=
30
;
cfg
.
serverPort
=
port
;
strcpy
(
cfg
.
dataDir
,
path
);
snprintf
(
cfg
.
localEp
,
TSDB_EP_LEN
,
"%s:%u"
,
fqdn
,
port
);
snprintf
(
cfg
.
localFqdn
,
TSDB_FQDN_LEN
,
"%s"
,
fqdn
);
snprintf
(
cfg
.
firstEp
,
TSDB_EP_LEN
,
"%s"
,
firstEp
);
return
cfg
;
}
bool
TestServer
::
DoStart
()
{
SDnodeO
pt
option
=
BuildOption
(
path
,
fqdn
,
port
,
firstEp
);
SDnodeO
bjCfg
cfg
=
BuildOption
(
path
,
fqdn
,
port
,
firstEp
);
taosMkDir
(
path
);
pDnode
=
dnd
Init
(
&
option
);
pDnode
=
dnd
Create
(
&
cfg
);
if
(
pDnode
!=
NULL
)
{
return
false
;
}
...
...
@@ -81,7 +79,7 @@ void TestServer::Stop() {
}
if
(
pDnode
!=
NULL
)
{
dndCl
eanup
(
pDnode
);
dndCl
ose
(
pDnode
);
pDnode
=
NULL
;
}
}
source/dnode/mgmt/impl/test/sut/src/sut.cpp
浏览文件 @
d6606740
...
...
@@ -43,6 +43,9 @@ void Testbase::InitLog(const char* path) {
}
void
Testbase
::
Init
(
const
char
*
path
,
int16_t
port
)
{
SDnodeEnvCfg
cfg
=
{
0
};
dndInit
(
&
cfg
);
char
fqdn
[]
=
"localhost"
;
char
firstEp
[
TSDB_EP_LEN
]
=
{
0
};
snprintf
(
firstEp
,
TSDB_EP_LEN
,
"%s:%u"
,
fqdn
,
port
);
...
...
@@ -56,6 +59,7 @@ void Testbase::Init(const char* path, int16_t port) {
void
Testbase
::
Cleanup
()
{
server
.
Stop
();
client
.
Cleanup
();
dndCleanup
();
}
void
Testbase
::
Restart
()
{
server
.
Restart
();
}
...
...
source/util/src/terror.c
浏览文件 @
d6606740
...
...
@@ -80,6 +80,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CHECKSUM_ERROR, "Checksum error")
TAOS_DEFINE_ERROR
(
TSDB_CODE_INVALID_MSG
,
"Invalid config message"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MSG_NOT_PROCESSED
,
"Message not processed"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_INVALID_PARA
,
"Invalid parameters"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_REPEAT_INIT
,
"Repeat initialization"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_REF_NO_MEMORY
,
"Ref out of memory"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_REF_FULL
,
"too many Ref Objs"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_REF_ID_REMOVED
,
"Ref ID is removed"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录