Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ca467fd7
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
ca467fd7
编写于
3月 14, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
shm
上级
edccd795
变更
16
隐藏空白更改
内联
并排
Showing
16 changed file
with
297 addition
and
124 deletion
+297
-124
include/common/tglobal.h
include/common/tglobal.h
+1
-1
include/dnode/mgmt/dnode.h
include/dnode/mgmt/dnode.h
+1
-1
source/common/src/tglobal.c
source/common/src/tglobal.c
+3
-3
source/dnode/mgmt/bnode/src/bmInt.c
source/dnode/mgmt/bnode/src/bmInt.c
+6
-0
source/dnode/mgmt/dnode/inc/dndInt.h
source/dnode/mgmt/dnode/inc/dndInt.h
+7
-7
source/dnode/mgmt/dnode/inc/dndTransport.h
source/dnode/mgmt/dnode/inc/dndTransport.h
+1
-1
source/dnode/mgmt/dnode/src/dndInt.c
source/dnode/mgmt/dnode/src/dndInt.c
+67
-69
source/dnode/mgmt/dnode/src/dndMain.c
source/dnode/mgmt/dnode/src/dndMain.c
+135
-26
source/dnode/mgmt/dnode/src/dndMgmt.c
source/dnode/mgmt/dnode/src/dndMgmt.c
+14
-1
source/dnode/mgmt/dnode/src/dndTransport.c
source/dnode/mgmt/dnode/src/dndTransport.c
+2
-8
source/dnode/mgmt/exec/src/dndExec.c
source/dnode/mgmt/exec/src/dndExec.c
+2
-2
source/dnode/mgmt/mnode/src/mmInt.c
source/dnode/mgmt/mnode/src/mmInt.c
+6
-1
source/dnode/mgmt/qnode/src/qmInt.c
source/dnode/mgmt/qnode/src/qmInt.c
+5
-0
source/dnode/mgmt/snode/src/smInt.c
source/dnode/mgmt/snode/src/smInt.c
+6
-0
source/dnode/mgmt/vnode/src/vmInt.c
source/dnode/mgmt/vnode/src/vmInt.c
+6
-0
source/dnode/mgmt/vnode/src/vmMgmt.c
source/dnode/mgmt/vnode/src/vmMgmt.c
+35
-4
未找到文件。
include/common/tglobal.h
浏览文件 @
ca467fd7
...
...
@@ -51,7 +51,7 @@ extern int32_t tsCompatibleModel;
extern
bool
tsEnableSlaveQuery
;
extern
bool
tsPrintAuth
;
extern
int64_t
tsTickPerDay
[
3
];
extern
bool
tsMultiProcess
;
extern
int32_t
tsMultiProcess
;
// monitor
extern
bool
tsEnableMonitor
;
...
...
include/dnode/mgmt/dnode.h
浏览文件 @
ca467fd7
...
...
@@ -72,7 +72,7 @@ void dndClose(SDnode *pDnode);
*
* @param pDnode The dnode object to run.
*/
void
dndRun
(
SDnode
*
pDnode
);
int32_t
dndRun
(
SDnode
*
pDnode
);
/**
* @brief Handle event in the dnode.
...
...
source/common/src/tglobal.c
浏览文件 @
ca467fd7
...
...
@@ -45,7 +45,7 @@ float tsRatioOfQueryCores = 1.0f;
int32_t
tsMaxBinaryDisplayWidth
=
30
;
bool
tsEnableSlaveQuery
=
1
;
bool
tsPrintAuth
=
0
;
bool
tsMultiProcess
=
0
;
int32_t
tsMultiProcess
=
0
;
// monitor
bool
tsEnableMonitor
=
1
;
...
...
@@ -340,7 +340,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if
(
cfgAddBool
(
pCfg
,
"printAuth"
,
tsPrintAuth
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddBool
(
pCfg
,
"slaveQuery"
,
tsEnableSlaveQuery
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddBool
(
pCfg
,
"deadLockKillQuery"
,
tsDeadLockKillQuery
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAdd
Bool
(
pCfg
,
"multiProcess"
,
tsMultiProcess
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAdd
Int32
(
pCfg
,
"multiProcess"
,
tsMultiProcess
,
0
,
2
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddBool
(
pCfg
,
"monitor"
,
tsEnableMonitor
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"monitorInterval"
,
tsMonitorInterval
,
1
,
360000
,
0
)
!=
0
)
return
-
1
;
...
...
@@ -458,7 +458,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsPrintAuth
=
cfgGetItem
(
pCfg
,
"printAuth"
)
->
bval
;
tsEnableSlaveQuery
=
cfgGetItem
(
pCfg
,
"slaveQuery"
)
->
bval
;
tsDeadLockKillQuery
=
cfgGetItem
(
pCfg
,
"deadLockKillQuery"
)
->
bval
;
tsMultiProcess
=
cfgGetItem
(
pCfg
,
"multiProcess"
)
->
bval
;
tsMultiProcess
=
cfgGetItem
(
pCfg
,
"multiProcess"
)
->
i32
;
tsEnableMonitor
=
cfgGetItem
(
pCfg
,
"monitor"
)
->
bval
;
tsMonitorInterval
=
cfgGetItem
(
pCfg
,
"monitorInterval"
)
->
i32
;
...
...
source/dnode/mgmt/bnode/src/bmInt.c
浏览文件 @
ca467fd7
...
...
@@ -17,8 +17,14 @@
#include "bmInt.h"
#include "bmHandle.h"
bool
bmRequireNode
(
SMgmtWrapper
*
pWrapper
)
{
return
false
;
}
SMgmtFp
bmGetMgmtFp
()
{
SMgmtFp
mgmtFp
=
{
0
};
mgmtFp
.
openFp
=
NULL
;
mgmtFp
.
closeFp
=
NULL
;
mgmtFp
.
requiredFp
=
bmRequireNode
;
mgmtFp
.
getMsgHandleFp
=
bmGetMsgHandle
;
return
mgmtFp
;
}
source/dnode/mgmt/dnode/inc/dndInt.h
浏览文件 @
ca467fd7
...
...
@@ -70,9 +70,9 @@ typedef void (*NodeMsgFp)(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg
typedef
int32_t
(
*
MndMsgFp
)(
SDnode
*
pDnode
,
SMndMsg
*
pMsg
);
typedef
SMgmtWrapper
*
(
*
MgmtOpen
Fp
)(
SDnode
*
pDnode
,
const
char
*
path
);
typedef
void
(
*
MgmtClos
eFp
)(
SDnode
*
pDnode
,
SMgmtWrapper
*
pWrapper
);
typedef
bool
(
*
MgmtRequired
Fp
)(
SMgmtWrapper
*
pWrapper
);
typedef
SMgmtWrapper
*
(
*
OpenNode
Fp
)(
SDnode
*
pDnode
,
const
char
*
path
);
typedef
void
(
*
CloseNod
eFp
)(
SDnode
*
pDnode
,
SMgmtWrapper
*
pWrapper
);
typedef
bool
(
*
RequireNode
Fp
)(
SMgmtWrapper
*
pWrapper
);
typedef
int32_t
(
*
MgmtHandleMsgFp
)(
SMgmtWrapper
*
pNode
,
SNodeMsg
*
pMsg
);
typedef
SMsgHandle
(
*
GetMsgHandleFp
)(
SMgmtWrapper
*
pWrapper
,
int32_t
msgIndex
);
...
...
@@ -211,9 +211,9 @@ typedef struct {
}
STransMgmt
;
typedef
struct
SMgmtFp
{
MgmtOpen
Fp
openFp
;
MgmtClos
eFp
closeFp
;
MgmtRequiredFp
requiredFp
;
OpenNode
Fp
openFp
;
CloseNod
eFp
closeFp
;
RequireNodeFp
requiredFp
;
GetMsgHandleFp
getMsgHandleFp
;
}
SMgmtFp
;
...
...
@@ -239,7 +239,7 @@ typedef struct SDnode {
STransMgmt
tmgmt
;
STfs
*
pTfs
;
SMgmtFp
fps
[
NODE_MAX
];
SMgmtWrapper
mgmt
s
[
NODE_MAX
];
SMgmtWrapper
wrapper
s
[
NODE_MAX
];
char
*
path
;
}
SDnode
;
...
...
source/dnode/mgmt/dnode/inc/dndTransport.h
浏览文件 @
ca467fd7
...
...
@@ -24,7 +24,7 @@ extern "C" {
int32_t
dndInitTrans
(
SDnode
*
pDnode
);
void
dndCleanupTrans
(
SDnode
*
pDnode
);
void
dndCleanup
Client
(
SDnode
*
pDnode
);
int32_t
dndInit
Client
(
SDnode
*
pDnode
);
int32_t
dndSendReqToMnode
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
);
int32_t
dndSendReqToDnode
(
SDnode
*
pDnode
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pRpcMsg
);
...
...
source/dnode/mgmt/dnode/src/dndInt.c
浏览文件 @
ca467fd7
...
...
@@ -16,9 +16,76 @@
#define _DEFAULT_SOURCE
#include "dndInt.h"
#include "dndHandle.h"
#include "dndTransport.h"
#include "vmInt.h"
static
int8_t
once
=
DND_ENV_INIT
;
int32_t
dndInit
()
{
if
(
atomic_val_compare_exchange_8
(
&
once
,
DND_ENV_INIT
,
DND_ENV_READY
)
!=
DND_ENV_INIT
)
{
terrno
=
TSDB_CODE_REPEAT_INIT
;
dError
(
"failed to init dnode env since %s"
,
terrstr
());
return
-
1
;
}
taosIgnSIGPIPE
();
taosBlockSIGPIPE
();
taosResolveCRC
();
if
(
rpcInit
()
!=
0
)
{
dError
(
"failed to init rpc since %s"
,
terrstr
());
dndCleanup
();
return
-
1
;
}
if
(
walInit
()
!=
0
)
{
dError
(
"failed to init wal since %s"
,
terrstr
());
dndCleanup
();
return
-
1
;
}
SVnodeOpt
vnodeOpt
=
{
0
};
vnodeOpt
.
nthreads
=
tsNumOfCommitThreads
;
vnodeOpt
.
putReqToVQueryQFp
=
dndPutReqToVQueryQ
;
vnodeOpt
.
sendReqToDnodeFp
=
dndSendReqToDnode
;
if
(
vnodeInit
(
&
vnodeOpt
)
!=
0
)
{
dError
(
"failed to init vnode since %s"
,
terrstr
());
dndCleanup
();
return
-
1
;
}
SMonCfg
monCfg
=
{
0
};
monCfg
.
maxLogs
=
tsMonitorMaxLogs
;
monCfg
.
port
=
tsMonitorPort
;
monCfg
.
server
=
tsMonitorFqdn
;
monCfg
.
comp
=
tsMonitorComp
;
if
(
monInit
(
&
monCfg
)
!=
0
)
{
dError
(
"failed to init monitor since %s"
,
terrstr
());
dndCleanup
();
return
-
1
;
}
dInfo
(
"dnode env is initialized"
);
return
0
;
}
void
dndCleanup
()
{
if
(
atomic_val_compare_exchange_8
(
&
once
,
DND_ENV_READY
,
DND_ENV_CLEANUP
)
!=
DND_ENV_READY
)
{
dError
(
"dnode env is already cleaned up"
);
return
;
}
monCleanup
();
vnodeCleanup
();
walCleanUp
();
rpcCleanup
();
taosStopCacheRefreshWorker
();
dInfo
(
"dnode env is cleaned up"
);
}
SMgmtWrapper
*
dndGetWrapper
(
SDnode
*
pDnode
,
ENodeType
nodeType
)
{
return
&
pDnode
->
wrappers
[
nodeType
];
}
EDndStatus
dndGetStatus
(
SDnode
*
pDnode
)
{
return
pDnode
->
status
;
}
void
dndSetStatus
(
SDnode
*
pDnode
,
EDndStatus
status
)
{
...
...
@@ -75,76 +142,7 @@ TdFilePtr dndCheckRunning(char *dataDir) {
return
pFile
;
}
int32_t
dndInit
()
{
if
(
atomic_val_compare_exchange_8
(
&
once
,
DND_ENV_INIT
,
DND_ENV_READY
)
!=
DND_ENV_INIT
)
{
terrno
=
TSDB_CODE_REPEAT_INIT
;
dError
(
"failed to init dnode env since %s"
,
terrstr
());
return
-
1
;
}
taosIgnSIGPIPE
();
taosBlockSIGPIPE
();
taosResolveCRC
();
if
(
rpcInit
()
!=
0
)
{
dError
(
"failed to init rpc since %s"
,
terrstr
());
dndCleanup
();
return
-
1
;
}
if
(
walInit
()
!=
0
)
{
dError
(
"failed to init wal since %s"
,
terrstr
());
dndCleanup
();
return
-
1
;
}
// SVnodeOpt vnodeOpt = {
// .nthreads = tsNumOfCommitThreads, .putReqToVQueryQFp = dndPutReqToVQueryQ, .sendReqToDnodeFp =
// dndSendReqToDnode};
// if (vnodeInit(&vnodeOpt) != 0) {
// dError("failed to init vnode since %s", terrstr());
// dndCleanup();
// return -1;
// }
SMonCfg
monCfg
=
{.
maxLogs
=
tsMonitorMaxLogs
,
.
port
=
tsMonitorPort
,
.
server
=
tsMonitorFqdn
,
.
comp
=
tsMonitorComp
};
if
(
monInit
(
&
monCfg
)
!=
0
)
{
dError
(
"failed to init monitor since %s"
,
terrstr
());
dndCleanup
();
return
-
1
;
}
dInfo
(
"dnode env is initialized"
);
return
0
;
}
void
dndCleanup
()
{
if
(
atomic_val_compare_exchange_8
(
&
once
,
DND_ENV_READY
,
DND_ENV_CLEANUP
)
!=
DND_ENV_READY
)
{
dError
(
"dnode env is already cleaned up"
);
return
;
}
walCleanUp
();
// vnodeCleanup();
rpcCleanup
();
monCleanup
();
taosStopCacheRefreshWorker
();
dInfo
(
"dnode env is cleaned up"
);
}
void
dndeHandleEvent
(
SDnode
*
pDnode
,
EDndEvent
event
)
{
dInfo
(
"dnode object receive event %d, data:%p"
,
event
,
pDnode
);
pDnode
->
event
=
event
;
}
SMgmtWrapper
*
dndGetWrapper
(
SDnode
*
pDnode
,
ENodeType
nodeType
)
{
return
&
pDnode
->
mgmts
[
nodeType
];
}
SMgmtFp
dndGetMgmtFp
()
{
SMgmtFp
mgmtFp
=
{
0
};
mgmtFp
.
getMsgHandleFp
=
dndGetMsgHandle
;
return
mgmtFp
;
}
source/dnode/mgmt/dnode/src/dndMain.c
浏览文件 @
ca467fd7
...
...
@@ -45,7 +45,7 @@ static bool dndRequireNode(SMgmtWrapper *pMgmt) {
static
void
dndClearMemory
(
SDnode
*
pDnode
)
{
for
(
ENodeType
n
=
0
;
n
<
NODE_MAX
;
++
n
)
{
SMgmtWrapper
*
pMgmt
=
&
pDnode
->
mgmt
s
[
n
];
SMgmtWrapper
*
pMgmt
=
&
pDnode
->
wrapper
s
[
n
];
tfree
(
pMgmt
->
path
);
}
if
(
pDnode
->
pLockFile
!=
NULL
)
{
...
...
@@ -120,34 +120,35 @@ SDnode *dndCreate(SDndCfg *pCfg) {
goto
_OVER
;
}
pDnode
->
mgmts
[
DNODE
].
fp
=
dndGetMgmtFp
();
pDnode
->
mgmts
[
MNODE
].
fp
=
mmGetMgmtFp
();
pDnode
->
mgmts
[
VNODES
].
fp
=
vmGetMgmtFp
();
pDnode
->
mgmts
[
QNODE
].
fp
=
qmGetMgmtFp
();
pDnode
->
mgmts
[
SNODE
].
fp
=
smGetMgmtFp
();
pDnode
->
mgmts
[
BNODE
].
fp
=
bmGetMgmtFp
();
pDnode
->
mgmts
[
MNODE
].
name
=
"mnode"
;
pDnode
->
mgmts
[
VNODES
].
name
=
"vnodes"
;
pDnode
->
mgmts
[
QNODE
].
name
=
"qnode"
;
pDnode
->
mgmts
[
SNODE
].
name
=
"snode"
;
pDnode
->
mgmts
[
BNODE
].
name
=
"bnode"
;
pDnode
->
wrappers
[
DNODE
].
fp
=
dndGetMgmtFp
();
pDnode
->
wrappers
[
MNODE
].
fp
=
mmGetMgmtFp
();
pDnode
->
wrappers
[
VNODES
].
fp
=
vmGetMgmtFp
();
pDnode
->
wrappers
[
QNODE
].
fp
=
qmGetMgmtFp
();
pDnode
->
wrappers
[
SNODE
].
fp
=
smGetMgmtFp
();
pDnode
->
wrappers
[
BNODE
].
fp
=
bmGetMgmtFp
();
pDnode
->
wrappers
[
DNODE
].
name
=
"dnode"
;
pDnode
->
wrappers
[
MNODE
].
name
=
"mnode"
;
pDnode
->
wrappers
[
VNODES
].
name
=
"vnodes"
;
pDnode
->
wrappers
[
QNODE
].
name
=
"qnode"
;
pDnode
->
wrappers
[
SNODE
].
name
=
"snode"
;
pDnode
->
wrappers
[
BNODE
].
name
=
"bnode"
;
memcpy
(
&
pDnode
->
cfg
,
pCfg
,
sizeof
(
SDndCfg
));
for
(
ENodeType
n
=
0
;
n
<
NODE_MAX
;
++
n
)
{
SMgmtWrapper
*
p
Mgmt
=
&
pDnode
->
mgmt
s
[
n
];
snprintf
(
path
,
sizeof
(
path
),
"%s%s%s"
,
pCfg
->
dataDir
,
TD_DIRSEP
,
pDnode
->
mgmt
s
[
n
].
name
);
p
Mgmt
->
path
=
strdup
(
path
);
if
(
pDnode
->
mgmt
s
[
n
].
path
==
NULL
)
{
SMgmtWrapper
*
p
Wrapper
=
&
pDnode
->
wrapper
s
[
n
];
snprintf
(
path
,
sizeof
(
path
),
"%s%s%s"
,
pCfg
->
dataDir
,
TD_DIRSEP
,
pDnode
->
wrapper
s
[
n
].
name
);
p
Wrapper
->
path
=
strdup
(
path
);
if
(
pDnode
->
wrapper
s
[
n
].
path
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_OVER
;
}
p
Mgmt
->
procType
=
PROC_SINGLE
;
p
Mgmt
->
required
=
dndRequireNode
(
pMgmt
);
if
(
p
Mgmt
->
required
)
{
if
(
taosMkDir
(
p
Mgmt
->
path
)
!=
0
)
{
p
Wrapper
->
procType
=
PROC_SINGLE
;
p
Wrapper
->
required
=
dndRequireNode
(
pWrapper
);
if
(
p
Wrapper
->
required
)
{
if
(
taosMkDir
(
p
Wrapper
->
path
)
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
dError
(
"failed to create dir:%s since %s"
,
p
Mgmt
->
path
,
terrstr
());
dError
(
"failed to create dir:%s since %s"
,
p
Wrapper
->
path
,
terrstr
());
goto
_OVER
;
}
}
...
...
@@ -158,6 +159,8 @@ SDnode *dndCreate(SDndCfg *pCfg) {
goto
_OVER
;
}
code
=
0
;
_OVER:
if
(
code
!=
0
&&
pDnode
)
{
dndClearMemory
(
pDnode
);
...
...
@@ -187,12 +190,118 @@ void dndClose(SDnode *pDnode) {
dInfo
(
"dnode object is closed, data:%p"
,
pDnode
);
}
void
dndRun
(
SDnode
*
pDnode
)
{
while
(
pDnode
->
event
!=
DND_EVENT_STOP
)
{
static
int32_t
dndOpenNode
(
SDnode
*
pDnode
,
SMgmtWrapper
*
pWrapper
)
{
// if (tsMultiProcess) {
// SProcCfg cfg = {0};
// pWrapper->pProc = taosProcInit(&cfg);
// if (taosProcIsChild(pWrapper->pProc)) {
// pWrapper->procType = PROC_CHILD;
// dInfo("node:%s, will start in child process", pWrapper->name);
// } else {
// pWrapper->procType = PROC_PARENT;
// dInfo("node:%s, will start in parent process", pWrapper->name);
// }
// } else {
// pWrapper->procType = PROC_SINGLE;
// dInfo("node:%s, will start in single process mnode", pWrapper->name);
// }
// if (pWrapper->procType == PROC_SINGLE || pWrapper->procType == PROC_CHILD) {
// SDndInfo info;
// pWrapper->pNode = (*pWrapper->fp.openFp)(pWrapper->path, &info);
// if (pWrapper != NULL) {
// return -1;
// }
// }
// return 0;
pWrapper
->
pMgmt
=
(
*
pWrapper
->
fp
.
openFp
)(
pDnode
,
pWrapper
->
path
);
return
0
;
}
static
void
dndClearNodeExecpt
(
SDnode
*
pDnode
,
SMgmtWrapper
*
pWrapper
){}
static
int32_t
dndRunInSingleProcess
(
SDnode
*
pDnode
)
{
dInfo
(
"dnode run in single process mode"
);
for
(
ENodeType
n
=
0
;
n
<
NODE_MAX
;
++
n
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
n
];
dInfo
(
"node:%s, will start in single process"
,
pWrapper
->
name
);
if
(
dndOpenNode
(
pDnode
,
pWrapper
)
!=
0
)
{
dError
(
"node:%s, failed to start since %s"
,
pWrapper
->
name
,
terrstr
());
return
-
1
;
}
}
return
0
;
}
static
int32_t
dndRunInMultiProcess
(
SDnode
*
pDnode
)
{
for
(
ENodeType
n
=
0
;
n
<
NODE_MAX
;
++
n
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
n
];
if
(
n
==
DNODE
)
{
dInfo
(
"node:%s, will start in parent process"
,
pWrapper
->
name
);
pWrapper
->
procType
=
PROC_PARENT
;
if
(
dndOpenNode
(
pDnode
,
pWrapper
)
!=
0
)
{
dError
(
"node:%s, failed to start since %s"
,
pWrapper
->
name
,
terrstr
());
return
-
1
;
}
continue
;
}
SProcCfg
cfg
=
{
0
};
SProcObj
*
pProc
=
taosProcInit
(
&
cfg
);
if
(
pProc
==
NULL
)
{
dError
(
"node:%s, failed to fork since %s"
,
pWrapper
->
name
,
terrstr
());
return
-
1
;
}
pWrapper
->
pProc
=
pProc
;
if
(
taosProcIsChild
(
pProc
))
{
dInfo
(
"node:%s, will start in child process"
,
pWrapper
->
name
);
pWrapper
->
procType
=
PROC_CHILD
;
dndResetLog
(
pWrapper
);
dInfo
(
"node:%s, clean up resources inherited from parent"
,
pWrapper
->
name
);
dndClearNodeExecpt
(
pDnode
,
pWrapper
);
dInfo
(
"node:%s, init trans client in child process"
,
pWrapper
->
name
);
dndInitClient
(
pDnode
);
dInfo
(
"node:%s, will be initialized in child process"
,
pWrapper
->
name
);
dndOpenNode
(
pDnode
,
pWrapper
);
}
else
{
dInfo
(
"node:%s, will not start in parent process"
,
pWrapper
->
name
);
pWrapper
->
procType
=
PROC_PARENT
;
dndOpenNode
(
pDnode
,
pWrapper
);
}
}
return
0
;
}
int32_t
dndRun
(
SDnode
*
pDnode
)
{
if
(
tsMultiProcess
==
0
)
{
if
(
dndRunInSingleProcess
(
pDnode
)
!=
0
)
{
dError
(
"failed to run dnode in single process mode since %s"
,
terrstr
());
return
-
1
;
}
}
else
{
if
(
dndRunInMultiProcess
(
pDnode
)
!=
0
)
{
dError
(
"failed to run dnode in multi process mode since %s"
,
terrstr
());
return
-
1
;
}
}
while
(
1
)
{
if
(
pDnode
->
event
!=
DND_EVENT_STOP
)
{
dInfo
(
"dnode object receive stop event"
);
break
;
}
taosMsleep
(
100
);
}
return
0
;
}
void
dndProcessRpcMsg
(
SDnode
*
pDnode
,
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
}
\ No newline at end of file
void
dndProcessRpcMsg
(
SDnode
*
pDnode
,
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{}
\ No newline at end of file
source/dnode/mgmt/dnode/src/dndMgmt.c
浏览文件 @
ca467fd7
...
...
@@ -15,6 +15,8 @@
#define _DEFAULT_SOURCE
#include "dndMgmt.h"
#include "dndHandle.h"
#include "dndMonitor.h"
// #include "dndBnode.h"
// #include "mm.h"
...
...
@@ -442,4 +444,15 @@ void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {}
void
dndProcessStartupReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pReq
){}
void
dndProcessMgmtMsg
(
SDnode
*
pDnode
,
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
){}
\ No newline at end of file
void
dndProcessMgmtMsg
(
SDnode
*
pDnode
,
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
){}
bool
dndRequireNode
(
SMgmtWrapper
*
pWrapper
)
{
return
true
;
}
SMgmtFp
dndGetMgmtFp
()
{
SMgmtFp
mgmtFp
=
{
0
};
mgmtFp
.
openFp
=
NULL
;
mgmtFp
.
closeFp
=
NULL
;
mgmtFp
.
requiredFp
=
dndRequireNode
;
mgmtFp
.
getMsgHandleFp
=
dndGetMsgHandle
;
return
mgmtFp
;
}
source/dnode/mgmt/dnode/src/dndTransport.c
浏览文件 @
ca467fd7
...
...
@@ -13,12 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/* this file is mainly responsible for the communication between DNODEs. Each
* dnode works as both server and client. Dnode may send status, grant, config
* messages to mnode, mnode may send create/alter/drop table/vnode messages
* to dnode. All theses messages are handled from here
*/
#define _DEFAULT_SOURCE
#include "dndTransport.h"
#include "dndMgmt.h"
...
...
@@ -52,7 +46,7 @@ static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
}
}
static
int32_t
dndInitClient
(
SDnode
*
pDnode
)
{
int32_t
dndInitClient
(
SDnode
*
pDnode
)
{
STransMgmt
*
pMgmt
=
&
pDnode
->
tmgmt
;
SRpcInit
rpcInit
;
...
...
@@ -256,7 +250,7 @@ static int32_t dndSetMsgHandle(SDnode *pDnode) {
STransMgmt
*
pMgmt
=
&
pDnode
->
tmgmt
;
for
(
ENodeType
nodeType
=
0
;
nodeType
<
NODE_MAX
;
++
nodeType
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
mgmt
s
[
nodeType
];
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrapper
s
[
nodeType
];
GetMsgHandleFp
getMsgHandleFp
=
pDnode
->
fps
[
nodeType
].
getMsgHandleFp
;
if
(
getMsgHandleFp
==
NULL
)
continue
;
...
...
source/dnode/mgmt/exec/src/dndExec.c
浏览文件 @
ca467fd7
...
...
@@ -82,14 +82,14 @@ static int32_t dndRunDnode() {
}
dInfo
(
"start the TDengine service"
);
dndRun
(
pDnode
);
int32_t
code
=
dndRun
(
pDnode
);
dInfo
(
"start shutting down the TDengine service"
);
dndClose
(
pDnode
);
dndCleanup
();
taosCloseLog
();
taosCleanupCfg
();
return
0
;
return
code
;
}
int
main
(
int
argc
,
char
const
*
argv
[])
{
...
...
source/dnode/mgmt/mnode/src/mmInt.c
浏览文件 @
ca467fd7
...
...
@@ -17,13 +17,18 @@
#include "mmInt.h"
#include "mmHandle.h"
bool
mmRequireNode
(
SMgmtWrapper
*
pWrapper
)
{
return
false
;
}
SMgmtFp
mmGetMgmtFp
()
{
SMgmtFp
mgmtFp
=
{
0
};
mgmtFp
.
openFp
=
NULL
;
mgmtFp
.
closeFp
=
NULL
;
mgmtFp
.
requiredFp
=
mmRequireNode
;
mgmtFp
.
getMsgHandleFp
=
mmGetMsgHandle
;
return
mgmtFp
;
}
int32_t
mmGetUserAuth
(
SMgmtWrapper
*
pWrapper
,
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
)
{
return
0
;
}
source/dnode/mgmt/qnode/src/qmInt.c
浏览文件 @
ca467fd7
...
...
@@ -17,8 +17,13 @@
#include "qmInt.h"
#include "qmHandle.h"
bool
qmRequireNode
(
SMgmtWrapper
*
pWrapper
)
{
return
false
;
}
SMgmtFp
qmGetMgmtFp
()
{
SMgmtFp
mgmtFp
=
{
0
};
mgmtFp
.
openFp
=
NULL
;
mgmtFp
.
closeFp
=
NULL
;
mgmtFp
.
requiredFp
=
qmRequireNode
;
mgmtFp
.
getMsgHandleFp
=
qmGetMsgHandle
;
return
mgmtFp
;
}
source/dnode/mgmt/snode/src/smInt.c
浏览文件 @
ca467fd7
...
...
@@ -17,8 +17,14 @@
#include "smInt.h"
#include "smHandle.h"
bool
smRequireNode
(
SMgmtWrapper
*
pWrapper
)
{
return
false
;
}
SMgmtFp
smGetMgmtFp
()
{
SMgmtFp
mgmtFp
=
{
0
};
mgmtFp
.
openFp
=
NULL
;
mgmtFp
.
closeFp
=
NULL
;
mgmtFp
.
requiredFp
=
smRequireNode
;
mgmtFp
.
getMsgHandleFp
=
smGetMsgHandle
;
return
mgmtFp
;
}
source/dnode/mgmt/vnode/src/vmInt.c
浏览文件 @
ca467fd7
...
...
@@ -17,8 +17,14 @@
#include "vmInt.h"
#include "vmHandle.h"
bool
vmRequireNode
(
SMgmtWrapper
*
pWrapper
)
{
return
false
;
}
SMgmtFp
vmGetMgmtFp
()
{
SMgmtFp
mgmtFp
=
{
0
};
mgmtFp
.
openFp
=
NULL
;
mgmtFp
.
closeFp
=
NULL
;
mgmtFp
.
requiredFp
=
vmRequireNode
;
mgmtFp
.
getMsgHandleFp
=
vmGetMsgHandle
;
return
mgmtFp
;
}
source/dnode/mgmt/vnode/src/vmMgmt.c
浏览文件 @
ca467fd7
...
...
@@ -14,9 +14,9 @@
*/
#define _DEFAULT_SOURCE
// #include "dndVnodes
.h"
//
#include "dndMgmt.h"
//
#include "dndTransport.h"
#include "vmMgmt
.h"
#include "dndMgmt.h"
#include "dndTransport.h"
// #include "sync.h"
#if 0
...
...
@@ -1024,4 +1024,35 @@ void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads) {
pStat->numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
}
#endif
\ No newline at end of file
#endif
// int32_t dndInitVnodes(SDnode *pDnode) {
// SVnodeOpt vnodeOpt = {0};
// vnodeOpt.nthreads = tsNumOfCommitThreads;
// vnodeOpt.putReqToVQueryQFp = dndPutReqToVQueryQ, .sendReqToDnodeFp =
// dndSendReqToDnode};
// if (vnodeInit(&vnodeOpt) != 0) {
// dError("failed to init vnode since %s", terrstr());
// dndCleanup();
// return -1;
// // }
// if (walInit() != 0) {
// dError("failed to init wal since %s", terrstr());
// dndCleanup();
// return -1;
// }
// }
// void dndCleanupVnodes(SDnode *pDnode) {
// // vnodeCleanup();
// walCleanUp();
// }
int32_t
dndPutReqToVQueryQ
(
SDnode
*
pDnode
,
SRpcMsg
*
pReq
){
return
0
;}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录