Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
dcfd63a2
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
dcfd63a2
编写于
3月 30, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
3月 30, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #11146 from taosdata/feature/shm
adjust shm queue
上级
5a6bf4c3
d90ebef8
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
377 addition
and
406 deletion
+377
-406
include/os/osShm.h
include/os/osShm.h
+0
-1
include/util/tprocess.h
include/util/tprocess.h
+3
-7
source/dnode/mgmt/dm/src/dmFile.c
source/dnode/mgmt/dm/src/dmFile.c
+1
-1
source/dnode/mgmt/dm/src/dmInt.c
source/dnode/mgmt/dm/src/dmInt.c
+14
-0
source/dnode/mgmt/main/exe/dndMain.c
source/dnode/mgmt/main/exe/dndMain.c
+4
-0
source/dnode/mgmt/main/inc/dnd.h
source/dnode/mgmt/main/inc/dnd.h
+6
-2
source/dnode/mgmt/main/inc/dndInt.h
source/dnode/mgmt/main/inc/dndInt.h
+3
-7
source/dnode/mgmt/main/src/dndExec.c
source/dnode/mgmt/main/src/dndExec.c
+108
-108
source/dnode/mgmt/main/src/dndFile.c
source/dnode/mgmt/main/src/dndFile.c
+40
-35
source/dnode/mgmt/main/src/dndInt.c
source/dnode/mgmt/main/src/dndInt.c
+2
-2
source/dnode/mgmt/main/src/dndObj.c
source/dnode/mgmt/main/src/dndObj.c
+27
-26
source/os/src/osShm.c
source/os/src/osShm.c
+6
-17
source/util/src/tprocess.c
source/util/src/tprocess.c
+163
-200
未找到文件。
include/os/osShm.h
浏览文件 @
dcfd63a2
...
...
@@ -29,7 +29,6 @@ typedef struct {
int32_t
taosCreateShm
(
SShm
*
pShm
,
int32_t
shmsize
)
;
void
taosDropShm
(
SShm
*
pShm
);
int32_t
taosAttachShm
(
SShm
*
pShm
);
void
taosDetachShm
(
SShm
*
pShm
);
#ifdef __cplusplus
}
...
...
include/util/tprocess.h
浏览文件 @
dcfd63a2
...
...
@@ -32,29 +32,25 @@ typedef void *(*ProcConsumeFp)(void *pParent, void *pHead, int16_t headLen, void
ProcFuncType
ftype
);
typedef
struct
{
int32_t
childQueueSize
;
ProcConsumeFp
childConsumeFp
;
ProcMallocFp
childMallocHeadFp
;
ProcFreeFp
childFreeHeadFp
;
ProcMallocFp
childMallocBodyFp
;
ProcFreeFp
childFreeBodyFp
;
int32_t
parentQueueSize
;
ProcConsumeFp
parentConsumeFp
;
ProcMallocFp
parent
d
MallocHeadFp
;
ProcMallocFp
parentMallocHeadFp
;
ProcFreeFp
parentFreeHeadFp
;
ProcMallocFp
parentMallocBodyFp
;
ProcFreeFp
parentFreeBodyFp
;
bool
testFlag
;
SShm
shm
;
void
*
pParent
;
const
char
*
name
;
bool
isChild
;
}
SProcCfg
;
SProcObj
*
taosProcInit
(
const
SProcCfg
*
pCfg
);
void
taosProcCleanup
(
SProcObj
*
pProc
);
int32_t
taosProcRun
(
SProcObj
*
pProc
);
void
taosProcStop
(
SProcObj
*
pProc
);
bool
taosProcIsChild
(
SProcObj
*
pProc
);
int32_t
taosProcChildId
(
SProcObj
*
pProc
);
int32_t
taosProcPutToChildQ
(
SProcObj
*
pProc
,
const
void
*
pHead
,
int16_t
headLen
,
const
void
*
pBody
,
int32_t
bodyLen
,
ProcFuncType
ftype
);
int32_t
taosProcPutToParentQ
(
SProcObj
*
pProc
,
const
void
*
pHead
,
int16_t
headLen
,
const
void
*
pBody
,
int32_t
bodyLen
,
...
...
source/dnode/mgmt/dm/src/dmFile.c
浏览文件 @
dcfd63a2
...
...
@@ -130,7 +130,7 @@ int32_t dmReadFile(SDnodeMgmt *pMgmt) {
}
code
=
0
;
d
Info
(
"succcessed to read file %s"
,
file
);
d
Debug
(
"succcessed to read file %s"
,
file
);
dmPrintDnodes
(
pMgmt
);
PRASE_DNODE_OVER:
...
...
source/dnode/mgmt/dm/src/dmInt.c
浏览文件 @
dcfd63a2
...
...
@@ -112,6 +112,16 @@ int32_t dmInit(SMgmtWrapper *pWrapper) {
return
-
1
;
}
if
(
dndInitServer
(
pDnode
)
!=
0
)
{
dError
(
"failed to init trans server since %s"
,
terrstr
());
return
-
1
;
}
if
(
dndInitClient
(
pDnode
)
!=
0
)
{
dError
(
"failed to init trans client since %s"
,
terrstr
());
return
-
1
;
}
pWrapper
->
pMgmt
=
pMgmt
;
dInfo
(
"dnode-mgmt is initialized"
);
return
0
;
...
...
@@ -122,6 +132,7 @@ void dmCleanup(SMgmtWrapper *pWrapper) {
if
(
pMgmt
==
NULL
)
return
;
dInfo
(
"dnode-mgmt start to clean up"
);
SDnode
*
pDnode
=
pMgmt
->
pDnode
;
dmStopWorker
(
pMgmt
);
taosWLockLatch
(
&
pMgmt
->
latch
);
...
...
@@ -140,6 +151,9 @@ void dmCleanup(SMgmtWrapper *pWrapper) {
taosMemoryFree
(
pMgmt
);
pWrapper
->
pMgmt
=
NULL
;
dndCleanupServer
(
pDnode
);
dndCleanupClient
(
pDnode
);
dInfo
(
"dnode-mgmt is cleaned up"
);
}
...
...
source/dnode/mgmt/main/exe/dndMain.c
浏览文件 @
dcfd63a2
...
...
@@ -83,6 +83,10 @@ static int32_t dndParseArgs(int32_t argc, char const *argv[]) {
global
.
generateGrant
=
true
;
}
else
if
(
strcmp
(
argv
[
i
],
"-n"
)
==
0
)
{
global
.
ntype
=
atoi
(
argv
[
++
i
]);
if
(
global
.
ntype
<=
DNODE
||
global
.
ntype
>
NODE_MAX
)
{
printf
(
"'-n' range is [1-5], default is 0
\n
"
);
return
-
1
;
}
}
else
if
(
strcmp
(
argv
[
i
],
"-C"
)
==
0
)
{
global
.
dumpConfig
=
true
;
}
else
if
(
strcmp
(
argv
[
i
],
"-V"
)
==
0
)
{
...
...
source/dnode/mgmt/main/inc/dnd.h
浏览文件 @
dcfd63a2
...
...
@@ -128,7 +128,7 @@ typedef struct SDnode {
EDndStatus
status
;
EDndEvent
event
;
SStartupReq
startup
;
TdFilePtr
runtimeF
ile
;
TdFilePtr
lockf
ile
;
STransMgmt
trans
;
SMgmtWrapper
wrappers
[
NODE_MAX
];
}
SDnode
;
...
...
@@ -141,13 +141,17 @@ void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp no
void
dndReportStartup
(
SDnode
*
pDnode
,
const
char
*
pName
,
const
char
*
pDesc
);
void
dndSendMonitorReport
(
SDnode
*
pDnode
);
int32_t
dndInitServer
(
SDnode
*
pDnode
);
void
dndCleanupServer
(
SDnode
*
pDnode
);
int32_t
dndInitClient
(
SDnode
*
pDnode
);
void
dndCleanupClient
(
SDnode
*
pDnode
);
int32_t
dndProcessNodeMsg
(
SDnode
*
pDnode
,
SNodeMsg
*
pMsg
);
int32_t
dndSendReqToMnode
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pMsg
);
int32_t
dndSendReqToDnode
(
SMgmtWrapper
*
pWrapper
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
);
void
dndSendRsp
(
SMgmtWrapper
*
pWrapper
,
const
SRpcMsg
*
pRsp
);
void
dndRegisterBrokenLinkArg
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pMsg
);
SMsgCb
dndCreateMsgcb
(
SMgmtWrapper
*
pWrapper
);
int32_t
dndProcessNodeMsg
(
SDnode
*
pDnode
,
SNodeMsg
*
pMsg
);
int32_t
dndReadFile
(
SMgmtWrapper
*
pWrapper
,
bool
*
pDeployed
);
int32_t
dndWriteFile
(
SMgmtWrapper
*
pWrapper
,
bool
deployed
);
...
...
source/dnode/mgmt/main/inc/dndInt.h
浏览文件 @
dcfd63a2
...
...
@@ -50,17 +50,13 @@ void dndClose(SDnode *pDnode);
void
dndHandleEvent
(
SDnode
*
pDnode
,
EDndEvent
event
);
// dndTransport.c
int32_t
dndInitServer
(
SDnode
*
pDnode
);
void
dndCleanupServer
(
SDnode
*
pDnode
);
int32_t
dndInitClient
(
SDnode
*
pDnode
);
void
dndCleanupClient
(
SDnode
*
pDnode
);
int32_t
dndInitMsgHandle
(
SDnode
*
pDnode
);
void
dndSendRpcRsp
(
SMgmtWrapper
*
pWrapper
,
const
SRpcMsg
*
pRsp
);
// dndFile.c
int32_t
dndOpenRuntimeFile
(
SDnode
*
pDnode
);
int32_t
dndWriteRuntime
File
(
SDnode
*
pDnode
);
void
dndCloseRuntime
File
(
SDnode
*
pDnode
);
TdFilePtr
dndCheckRunning
(
const
char
*
dataDir
);
int32_t
dndReadShm
File
(
SDnode
*
pDnode
);
int32_t
dndWriteShm
File
(
SDnode
*
pDnode
);
#ifdef __cplusplus
}
...
...
source/dnode/mgmt/main/src/dndExec.c
浏览文件 @
dcfd63a2
...
...
@@ -16,15 +16,6 @@
#define _DEFAULT_SOURCE
#include "dndInt.h"
static
void
dndResetLog
(
SMgmtWrapper
*
pMgmt
)
{
char
logname
[
24
]
=
{
0
};
snprintf
(
logname
,
sizeof
(
logname
),
"%slog"
,
pMgmt
->
name
);
dInfo
(
"node:%s, reset log to %s in child process"
,
pMgmt
->
name
,
logname
);
taosCloseLog
();
taosInitLog
(
logname
,
1
);
}
static
bool
dndRequireNode
(
SMgmtWrapper
*
pWrapper
)
{
bool
required
=
false
;
int32_t
code
=
(
*
pWrapper
->
fp
.
requiredFp
)(
pWrapper
,
&
required
);
...
...
@@ -37,14 +28,18 @@ static bool dndRequireNode(SMgmtWrapper *pWrapper) {
}
int32_t
dndOpenNode
(
SMgmtWrapper
*
pWrapper
)
{
int32_t
code
=
(
*
pWrapper
->
fp
.
openFp
)(
pWrapper
);
if
(
code
!=
0
)
{
if
(
taosMkDir
(
pWrapper
->
path
)
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
dError
(
"node:%s, failed to create dir:%s since %s"
,
pWrapper
->
name
,
pWrapper
->
path
,
terrstr
());
return
-
1
;
}
if
((
*
pWrapper
->
fp
.
openFp
)(
pWrapper
)
!=
0
)
{
dError
(
"node:%s, failed to open since %s"
,
pWrapper
->
name
,
terrstr
());
return
-
1
;
}
else
{
dDebug
(
"node:%s, has been opened"
,
pWrapper
->
name
);
}
dDebug
(
"node:%s, has been opened"
,
pWrapper
->
name
);
pWrapper
->
deployed
=
true
;
return
0
;
}
...
...
@@ -71,23 +66,13 @@ void dndCloseNode(SMgmtWrapper *pWrapper) {
}
static
int32_t
dndRunInSingleProcess
(
SDnode
*
pDnode
)
{
dInfo
(
"dnode
run in single process mode
"
);
dInfo
(
"dnode
start to run in single process
"
);
for
(
ENodeType
n
=
0
;
n
<
NODE_MAX
;
++
n
)
{
for
(
ENodeType
n
=
DNODE
;
n
<
NODE_MAX
;
++
n
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
n
];
pWrapper
->
required
=
dndRequireNode
(
pWrapper
);
if
(
!
pWrapper
->
required
)
continue
;
SMsgCb
msgCb
=
dndCreateMsgcb
(
pWrapper
);
tmsgSetDefaultMsgCb
(
&
msgCb
);
if
(
taosMkDir
(
pWrapper
->
path
)
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
dError
(
"failed to create dir:%s since %s"
,
pWrapper
->
path
,
terrstr
());
return
-
1
;
}
dInfo
(
"node:%s, will start in single process"
,
pWrapper
->
name
);
pWrapper
->
procType
=
PROC_SINGLE
;
if
(
dndOpenNode
(
pWrapper
)
!=
0
)
{
dError
(
"node:%s, failed to start since %s"
,
pWrapper
->
name
,
terrstr
());
return
-
1
;
...
...
@@ -106,18 +91,10 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) {
}
}
dInfo
(
"dnode running in single process"
);
return
0
;
}
static
void
dndClearNodesExecpt
(
SDnode
*
pDnode
,
ENodeType
except
)
{
// dndCleanupServer(pDnode);
for
(
ENodeType
n
=
0
;
n
<
NODE_MAX
;
++
n
)
{
if
(
except
==
n
)
continue
;
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
n
];
pWrapper
->
required
=
false
;
}
}
static
void
dndConsumeChildQueue
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
,
int16_t
msgLen
,
void
*
pCont
,
int32_t
contLen
,
ProcFuncType
ftype
)
{
SRpcMsg
*
pRpc
=
&
pMsg
->
rpcMsg
;
...
...
@@ -163,115 +140,138 @@ static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t
taosMemoryFree
(
pMsg
);
}
static
int32_t
dndRunInMultiProcess
(
SDnode
*
pDnode
)
{
dInfo
(
"dnode run in multi process mode"
);
static
int32_t
dndRunInParentProcess
(
SDnode
*
pDnode
)
{
dInfo
(
"dnode start to run in parent process"
);
SMgmtWrapper
*
pDWrapper
=
&
pDnode
->
wrappers
[
DNODE
];
if
(
dndOpenNode
(
pDWrapper
)
!=
0
)
{
dError
(
"node:%s, failed to start since %s"
,
pDWrapper
->
name
,
terrstr
());
return
-
1
;
}
for
(
ENodeType
n
=
0
;
n
<
NODE_MAX
;
++
n
)
{
for
(
ENodeType
n
=
DNODE
+
1
;
n
<
NODE_MAX
;
++
n
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
n
];
pWrapper
->
required
=
dndRequireNode
(
pWrapper
);
if
(
!
pWrapper
->
required
)
continue
;
SMsgCb
msgCb
=
dndCreateMsgcb
(
pWrapper
);
tmsgSetDefaultMsgCb
(
&
msgCb
);
if
(
taosMkDir
(
pWrapper
->
path
)
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
dError
(
"failed to create dir:%s since %s"
,
pWrapper
->
path
,
terrstr
());
int64_t
shmsize
=
1024
*
1024
*
2
;
// size will be a configuration item
if
(
taosCreateShm
(
&
pWrapper
->
shm
,
shmsize
)
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
terrno
);
dError
(
"node:%s, failed to create shm size:%"
PRId64
" since %s"
,
pWrapper
->
name
,
shmsize
,
terrstr
());
return
-
1
;
}
if
(
n
==
DNODE
)
{
dInfo
(
"node:%s, will start in parent process"
,
pWrapper
->
name
);
pWrapper
->
procType
=
PROC_SINGLE
;
if
(
dndOpenNode
(
pWrapper
)
!=
0
)
{
dError
(
"node:%s, failed to start since %s"
,
pWrapper
->
name
,
terrstr
());
return
-
1
;
}
continue
;
SProcCfg
cfg
=
{.
childConsumeFp
=
(
ProcConsumeFp
)
dndConsumeChildQueue
,
.
childMallocHeadFp
=
(
ProcMallocFp
)
taosAllocateQitem
,
.
childFreeHeadFp
=
(
ProcFreeFp
)
taosFreeQitem
,
.
childMallocBodyFp
=
(
ProcMallocFp
)
rpcMallocCont
,
.
childFreeBodyFp
=
(
ProcFreeFp
)
rpcFreeCont
,
.
parentConsumeFp
=
(
ProcConsumeFp
)
dndConsumeParentQueue
,
.
parentMallocHeadFp
=
(
ProcMallocFp
)
taosMemoryMalloc
,
.
parentFreeHeadFp
=
(
ProcFreeFp
)
taosMemoryFree
,
.
parentMallocBodyFp
=
(
ProcMallocFp
)
rpcMallocCont
,
.
parentFreeBodyFp
=
(
ProcFreeFp
)
rpcFreeCont
,
.
shm
=
pWrapper
->
shm
,
.
pParent
=
pWrapper
,
.
isChild
=
false
,
.
name
=
pWrapper
->
name
};
pWrapper
->
procType
=
PROC_PARENT
;
pWrapper
->
pProc
=
taosProcInit
(
&
cfg
);
if
(
pWrapper
->
pProc
==
NULL
)
{
dError
(
"node:%s, failed to create proc since %s"
,
pWrapper
->
name
,
terrstr
());
return
-
1
;
}
}
if
(
dndWriteShmFile
(
pDnode
)
!=
0
)
{
dError
(
"failed to write runtime file since %s"
,
terrstr
());
return
-
1
;
}
SProcCfg
cfg
=
{.
childQueueSize
=
1024
*
1024
*
2
,
// size will be a configuration item
.
childConsumeFp
=
(
ProcConsumeFp
)
dndConsumeChildQueue
,
.
childMallocHeadFp
=
(
ProcMallocFp
)
taosAllocateQitem
,
.
childFreeHeadFp
=
(
ProcFreeFp
)
taosFreeQitem
,
.
childMallocBodyFp
=
(
ProcMallocFp
)
rpcMallocCont
,
.
childFreeBodyFp
=
(
ProcFreeFp
)
rpcFreeCont
,
.
parentQueueSize
=
1024
*
1024
*
2
,
// size will be a configuration item
.
parentConsumeFp
=
(
ProcConsumeFp
)
dndConsumeParentQueue
,
.
parentdMallocHeadFp
=
(
ProcMallocFp
)
taosMemoryMalloc
,
.
parentFreeHeadFp
=
(
ProcFreeFp
)
taosMemoryFree
,
.
parentMallocBodyFp
=
(
ProcMallocFp
)
rpcMallocCont
,
.
parentFreeBodyFp
=
(
ProcFreeFp
)
rpcFreeCont
,
.
pParent
=
pWrapper
,
.
name
=
pWrapper
->
name
};
SProcObj
*
pProc
=
taosProcInit
(
&
cfg
);
if
(
pProc
==
NULL
)
{
dError
(
"node:%s, failed to fork since %s"
,
pWrapper
->
name
,
terrstr
());
for
(
ENodeType
n
=
DNODE
+
1
;
n
<
NODE_MAX
;
++
n
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
n
];
if
(
!
pWrapper
->
required
)
continue
;
dInfo
(
"node:%s, will not start in parent process"
,
pWrapper
->
name
);
// exec new node
if
(
taosProcRun
(
pWrapper
->
pProc
)
!=
0
)
{
dError
(
"node:%s, failed to run proc since %s"
,
pWrapper
->
name
,
terrstr
());
return
-
1
;
}
}
pWrapper
->
pProc
=
pProc
;
dndSetStatus
(
pDnode
,
DND_STAT_RUNNING
)
;
if
(
taosProcIsChild
(
pProc
)
)
{
dInfo
(
"node:%s, will start in child process"
,
pWrapper
->
name
);
pWrapper
->
procType
=
PROC_CHILD
;
dndResetLog
(
pWrapper
);
if
((
*
pDWrapper
->
fp
.
startFp
)(
pDWrapper
)
!=
0
)
{
dError
(
"node:%s, failed to start since %s"
,
pDWrapper
->
name
,
terrstr
()
);
return
-
1
;
}
dInfo
(
"node:%s, clean up resources inherited from parent"
,
pWrapper
->
name
);
dndClearNodesExecpt
(
pDnode
,
n
);
dInfo
(
"dnode running in parent process"
);
return
0
;
}
dInfo
(
"node:%s, will be initialized in child process"
,
pWrapper
->
name
);
if
(
dndOpenNode
(
pWrapper
)
!=
0
)
{
dInfo
(
"node:%s, failed to init in child process since %s"
,
pWrapper
->
name
,
terrstr
());
return
-
1
;
}
static
int32_t
dndRunInChildProcess
(
SDnode
*
pDnode
)
{
dInfo
(
"dnode start to run in child process"
);
if
(
taosProcRun
(
pProc
)
!=
0
)
{
dError
(
"node:%s, failed to run proc since %s"
,
pWrapper
->
name
,
terrstr
());
return
-
1
;
}
break
;
}
else
{
dInfo
(
"node:%s, will not start in parent process, child pid:%d"
,
pWrapper
->
name
,
taosProcChildId
(
pProc
));
pWrapper
->
procType
=
PROC_PARENT
;
if
(
taosProcRun
(
pProc
)
!=
0
)
{
dError
(
"node:%s, failed to run proc since %s"
,
pWrapper
->
name
,
terrstr
());
return
-
1
;
}
}
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
pDnode
->
ntype
];
if
(
dndOpenNode
(
pWrapper
)
!=
0
)
{
dError
(
"node:%s, failed to start since %s"
,
pWrapper
->
name
,
terrstr
());
return
-
1
;
}
dndSetStatus
(
pDnode
,
DND_STAT_RUNNING
);
SProcCfg
cfg
=
{.
childConsumeFp
=
(
ProcConsumeFp
)
dndConsumeChildQueue
,
.
childMallocHeadFp
=
(
ProcMallocFp
)
taosAllocateQitem
,
.
childFreeHeadFp
=
(
ProcFreeFp
)
taosFreeQitem
,
.
childMallocBodyFp
=
(
ProcMallocFp
)
rpcMallocCont
,
.
childFreeBodyFp
=
(
ProcFreeFp
)
rpcFreeCont
,
.
parentConsumeFp
=
(
ProcConsumeFp
)
dndConsumeParentQueue
,
.
parentMallocHeadFp
=
(
ProcMallocFp
)
taosMemoryMalloc
,
.
parentFreeHeadFp
=
(
ProcFreeFp
)
taosMemoryFree
,
.
parentMallocBodyFp
=
(
ProcMallocFp
)
rpcMallocCont
,
.
parentFreeBodyFp
=
(
ProcFreeFp
)
rpcFreeCont
,
.
shm
=
pWrapper
->
shm
,
.
pParent
=
pWrapper
,
.
isChild
=
true
,
.
name
=
pWrapper
->
name
};
pWrapper
->
procType
=
PROC_CHILD
;
pWrapper
->
pProc
=
taosProcInit
(
&
cfg
);
if
(
pWrapper
->
pProc
==
NULL
)
{
dError
(
"node:%s, failed to create proc since %s"
,
pWrapper
->
name
,
terrstr
());
return
-
1
;
}
for
(
ENodeType
n
=
0
;
n
<
NODE_MAX
;
++
n
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
n
];
if
(
!
pWrapper
->
required
)
continue
;
if
(
pWrapper
->
fp
.
startFp
==
NULL
)
continue
;
if
(
pWrapper
->
procType
==
PROC_PARENT
&&
n
!=
DNODE
)
continue
;
if
(
pWrapper
->
procType
==
PROC_CHILD
&&
n
==
DNODE
)
continue
;
if
((
*
pWrapper
->
fp
.
startFp
)(
pWrapper
)
!=
0
)
{
dError
(
"node:%s, failed to start since %s"
,
pWrapper
->
name
,
terrstr
());
return
-
1
;
}
if
(
taosProcRun
(
pWrapper
->
pProc
)
!=
0
)
{
dError
(
"node:%s, failed to run proc since %s"
,
pWrapper
->
name
,
terrstr
());
return
-
1
;
}
dInfo
(
"dnode running in child process"
);
return
0
;
}
int32_t
dndRun
(
SDnode
*
pDnode
)
{
int32_t
dndRun
(
SDnode
*
pDnode
)
{
if
(
!
tsMultiProcess
)
{
if
(
dndRunInSingleProcess
(
pDnode
)
!=
0
)
{
dError
(
"failed to run dnode in single process mode since %s"
,
terrstr
());
dError
(
"failed to run dnode since %s"
,
terrstr
());
return
-
1
;
}
}
else
if
(
pDnode
->
ntype
==
DNODE
)
{
if
(
dndRunInParentProcess
(
pDnode
)
!=
0
)
{
dError
(
"failed to run dnode in parent process since %s"
,
terrstr
());
return
-
1
;
}
}
else
{
if
(
dndRunIn
Multi
Process
(
pDnode
)
!=
0
)
{
dError
(
"failed to run dnode in
multi process mode
since %s"
,
terrstr
());
if
(
dndRunIn
Child
Process
(
pDnode
)
!=
0
)
{
dError
(
"failed to run dnode in
child process
since %s"
,
terrstr
());
return
-
1
;
}
}
dndReportStartup
(
pDnode
,
"TDengine"
,
"initialized successfully"
);
dInfo
(
"TDengine initialized successfully"
);
while
(
1
)
{
if
(
pDnode
->
event
==
DND_EVENT_STOP
)
{
...
...
source/dnode/mgmt/main/src/dndFile.c
浏览文件 @
dcfd63a2
...
...
@@ -117,7 +117,30 @@ _OVER:
return
code
;
}
int32_t
dndOpenRuntimeFile
(
SDnode
*
pDnode
)
{
TdFilePtr
dndCheckRunning
(
const
char
*
dataDir
)
{
char
filepath
[
PATH_MAX
]
=
{
0
};
snprintf
(
filepath
,
sizeof
(
filepath
),
"%s%s.running"
,
dataDir
,
TD_DIRSEP
);
TdFilePtr
pFile
=
taosOpenFile
(
filepath
,
TD_FILE_CTEATE
|
TD_FILE_WRITE
|
TD_FILE_TRUNC
);
if
(
pFile
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
dError
(
"failed to lock file:%s since %s"
,
filepath
,
terrstr
());
return
NULL
;
}
int32_t
ret
=
taosLockFile
(
pFile
);
if
(
ret
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
dError
(
"failed to lock file:%s since %s"
,
filepath
,
terrstr
());
taosCloseFile
(
&
pFile
);
return
NULL
;
}
dDebug
(
"file:%s is locked"
,
filepath
);
return
pFile
;
}
int32_t
dndReadShmFile
(
SDnode
*
pDnode
)
{
int32_t
code
=
-
1
;
char
itemName
[
24
]
=
{
0
};
char
content
[
MAXLEN
+
1
]
=
{
0
};
...
...
@@ -125,17 +148,11 @@ int32_t dndOpenRuntimeFile(SDnode *pDnode) {
cJSON
*
root
=
NULL
;
TdFilePtr
pFile
=
NULL
;
snprintf
(
file
,
sizeof
(
file
),
"%s%s.
running
"
,
pDnode
->
dataDir
,
TD_DIRSEP
);
pFile
=
taosOpenFile
(
file
,
TD_FILE_
CTEATE
|
TD_FILE_WRITE
|
TD_FILE_TRUNC
);
snprintf
(
file
,
sizeof
(
file
),
"%s%s.
shmfile
"
,
pDnode
->
dataDir
,
TD_DIRSEP
);
pFile
=
taosOpenFile
(
file
,
TD_FILE_
READ
);
if
(
pFile
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
dError
(
"failed to open file:%s since %s"
,
file
,
terrstr
());
goto
_OVER
;
}
if
(
taosLockFile
(
pFile
)
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
dError
(
"failed to lock file:%s since %s"
,
file
,
terrstr
());
dDebug
(
"file %s not exist"
,
file
);
code
=
0
;
goto
_OVER
;
}
...
...
@@ -162,10 +179,10 @@ int32_t dndOpenRuntimeFile(SDnode *pDnode) {
}
}
if
(
tsMultiProcess
||
pDnode
->
ntype
==
DNODE
)
{
if
(
!
tsMultiProcess
||
pDnode
->
ntype
==
DNODE
)
{
for
(
ENodeType
ntype
=
DNODE
;
ntype
<
NODE_MAX
;
++
ntype
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
pDnode
->
ntype
];
if
(
pWrapper
->
shm
.
id
>
0
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
ntype
];
if
(
pWrapper
->
shm
.
id
>
=
0
)
{
dDebug
(
"shmid:%d, is closed, size:%d"
,
pWrapper
->
shm
.
id
,
pWrapper
->
shm
.
size
);
taosDropShm
(
&
pWrapper
->
shm
);
}
...
...
@@ -185,16 +202,12 @@ int32_t dndOpenRuntimeFile(SDnode *pDnode) {
_OVER:
if
(
root
!=
NULL
)
cJSON_Delete
(
root
);
if
(
code
!=
0
)
{
if
(
pFile
!=
NULL
)
taosCloseFile
(
&
pFile
);
}
else
{
pDnode
->
runtimeFile
=
pFile
;
}
if
(
pFile
!=
NULL
)
taosCloseFile
(
&
pFile
);
return
code
;
}
int32_t
dndWrite
Runtime
File
(
SDnode
*
pDnode
)
{
int32_t
dndWrite
Shm
File
(
SDnode
*
pDnode
)
{
int32_t
code
=
-
1
;
int32_t
len
=
0
;
char
content
[
MAXLEN
+
1
]
=
{
0
};
...
...
@@ -202,8 +215,8 @@ int32_t dndWriteRuntimeFile(SDnode *pDnode) {
char
realfile
[
PATH_MAX
]
=
{
0
};
TdFilePtr
pFile
=
NULL
;
snprintf
(
file
,
sizeof
(
file
),
"%s%s.
running
.bak"
,
pDnode
->
dataDir
,
TD_DIRSEP
);
snprintf
(
realfile
,
sizeof
(
realfile
),
"%s%s.
running
"
,
pDnode
->
dataDir
,
TD_DIRSEP
);
snprintf
(
file
,
sizeof
(
file
),
"%s%s.
shmfile
.bak"
,
pDnode
->
dataDir
,
TD_DIRSEP
);
snprintf
(
realfile
,
sizeof
(
realfile
),
"%s%s.
shmfile
"
,
pDnode
->
dataDir
,
TD_DIRSEP
);
pFile
=
taosOpenFile
(
file
,
TD_FILE_CTEATE
|
TD_FILE_WRITE
|
TD_FILE_TRUNC
);
if
(
pFile
==
NULL
)
{
...
...
@@ -214,12 +227,12 @@ int32_t dndWriteRuntimeFile(SDnode *pDnode) {
len
+=
snprintf
(
content
+
len
,
MAXLEN
-
len
,
"{
\n
"
);
for
(
ENodeType
ntype
=
DNODE
+
1
;
ntype
<
NODE_MAX
;
++
ntype
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
pDnode
->
ntype
];
len
+=
snprintf
(
content
+
len
,
MAXLEN
-
len
,
"
\"
%s_shmid
\"
:
%d,
\n
"
,
dndNodeProcStr
(
ntype
),
pWrapper
->
shm
.
id
);
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
ntype
];
len
+=
snprintf
(
content
+
len
,
MAXLEN
-
len
,
"
\"
%s_shmid
\"
:%d,
\n
"
,
dndNodeProcStr
(
ntype
),
pWrapper
->
shm
.
id
);
if
(
ntype
==
NODE_MAX
-
1
)
{
len
+=
snprintf
(
content
+
len
,
MAXLEN
-
len
,
"
\"
%s_shmsize
\"
:
%d
\n
"
,
dndNodeProcStr
(
ntype
),
pWrapper
->
shm
.
size
);
len
+=
snprintf
(
content
+
len
,
MAXLEN
-
len
,
"
\"
%s_shmsize
\"
:%d
\n
"
,
dndNodeProcStr
(
ntype
),
pWrapper
->
shm
.
size
);
}
else
{
len
+=
snprintf
(
content
+
len
,
MAXLEN
-
len
,
"
\"
%s_shmsize
\"
:
%d,
\n
"
,
dndNodeProcStr
(
ntype
),
pWrapper
->
shm
.
size
);
len
+=
snprintf
(
content
+
len
,
MAXLEN
-
len
,
"
\"
%s_shmsize
\"
:%d,
\n
"
,
dndNodeProcStr
(
ntype
),
pWrapper
->
shm
.
size
);
}
}
len
+=
snprintf
(
content
+
len
,
MAXLEN
-
len
,
"}
\n
"
);
...
...
@@ -244,7 +257,7 @@ int32_t dndWriteRuntimeFile(SDnode *pDnode) {
return
-
1
;
}
d
Debug
(
"successed to write %s"
,
realfile
);
d
Info
(
"successed to write %s"
,
realfile
);
code
=
0
;
_OVER:
...
...
@@ -254,11 +267,3 @@ _OVER:
return
code
;
}
void
dndCloseRuntimeFile
(
SDnode
*
pDnode
)
{
if
(
pDnode
->
runtimeFile
)
{
taosUnLockFile
(
pDnode
->
runtimeFile
);
taosCloseFile
(
&
pDnode
->
runtimeFile
);
pDnode
->
runtimeFile
=
NULL
;
}
}
\ No newline at end of file
source/dnode/mgmt/main/src/dndInt.c
浏览文件 @
dcfd63a2
...
...
@@ -41,7 +41,7 @@ int32_t dndInit() {
return
-
1
;
}
d
Debug
(
"dnode env is initialized"
);
d
Info
(
"dnode env is initialized"
);
return
0
;
}
...
...
@@ -55,7 +55,7 @@ void dndCleanup() {
monCleanup
();
walCleanUp
();
taosStopCacheRefreshWorker
();
d
Debug
(
"dnode env is cleaned up"
);
d
Info
(
"dnode env is cleaned up"
);
}
void
dndSetMsgHandle
(
SMgmtWrapper
*
pWrapper
,
tmsg_t
msgType
,
NodeMsgFp
nodeMsgFp
,
int8_t
vgId
)
{
...
...
source/dnode/mgmt/main/src/dndObj.c
浏览文件 @
dcfd63a2
...
...
@@ -34,6 +34,12 @@ static int32_t dndInitVars(SDnode *pDnode, const SDnodeOpt *pOption) {
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
pDnode
->
lockfile
=
dndCheckRunning
(
pDnode
->
dataDir
);
if
(
pDnode
->
lockfile
==
NULL
)
{
return
-
1
;
}
return
0
;
}
...
...
@@ -42,7 +48,11 @@ static void dndClearVars(SDnode *pDnode) {
SMgmtWrapper
*
pMgmt
=
&
pDnode
->
wrappers
[
n
];
taosMemoryFreeClear
(
pMgmt
->
path
);
}
dndCloseRuntimeFile
(
pDnode
);
if
(
pDnode
->
lockfile
!=
NULL
)
{
taosUnLockFile
(
pDnode
->
lockfile
);
taosCloseFile
(
&
pDnode
->
lockfile
);
pDnode
->
lockfile
=
NULL
;
}
taosMemoryFreeClear
(
pDnode
->
localEp
);
taosMemoryFreeClear
(
pDnode
->
localFqdn
);
taosMemoryFreeClear
(
pDnode
->
firstEp
);
...
...
@@ -53,7 +63,7 @@ static void dndClearVars(SDnode *pDnode) {
}
SDnode
*
dndCreate
(
const
SDnodeOpt
*
pOption
)
{
d
Info
(
"start to create dnode object"
);
d
Debug
(
"start to create dnode object"
);
int32_t
code
=
-
1
;
char
path
[
PATH_MAX
]
=
{
0
};
SDnode
*
pDnode
=
NULL
;
...
...
@@ -77,29 +87,11 @@ SDnode *dndCreate(const SDnodeOpt *pOption) {
smGetMgmtFp
(
&
pDnode
->
wrappers
[
SNODE
]);
bmGetMgmtFp
(
&
pDnode
->
wrappers
[
BNODE
]);
if
(
dndOpenRuntimeFile
(
pDnode
)
!=
0
)
{
dError
(
"failed to open runtime file since %s"
,
terrstr
());
goto
_OVER
;
}
if
(
dndInitServer
(
pDnode
)
!=
0
)
{
dError
(
"failed to init trans server since %s"
,
terrstr
());
goto
_OVER
;
}
if
(
dndInitClient
(
pDnode
)
!=
0
)
{
dError
(
"failed to init trans client since %s"
,
terrstr
());
goto
_OVER
;
}
if
(
dndInitMsgHandle
(
pDnode
)
!=
0
)
{
goto
_OVER
;
}
for
(
ENodeType
n
=
0
;
n
<
NODE_MAX
;
++
n
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
n
];
snprintf
(
path
,
sizeof
(
path
),
"%s%s%s"
,
pDnode
->
dataDir
,
TD_DIRSEP
,
pWrapper
->
name
);
pWrapper
->
path
=
strdup
(
path
);
pWrapper
->
shm
.
id
=
-
1
;
pWrapper
->
pDnode
=
pDnode
;
if
(
pWrapper
->
path
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -110,6 +102,20 @@ SDnode *dndCreate(const SDnodeOpt *pOption) {
taosInitRWLatch
(
&
pWrapper
->
latch
);
}
if
(
dndInitMsgHandle
(
pDnode
)
!=
0
)
{
dError
(
"failed to msg handles since %s"
,
terrstr
());
goto
_OVER
;
}
if
(
dndReadShmFile
(
pDnode
)
!=
0
)
{
dError
(
"failed to read shm file since %s"
,
terrstr
());
goto
_OVER
;
}
SMsgCb
msgCb
=
dndCreateMsgcb
(
&
pDnode
->
wrappers
[
0
]);
tmsgSetDefaultMsgCb
(
&
msgCb
);
dInfo
(
"dnode object is created, data:%p"
,
pDnode
);
code
=
0
;
_OVER:
...
...
@@ -117,8 +123,6 @@ _OVER:
dndClearVars
(
pDnode
);
pDnode
=
NULL
;
dError
(
"failed to create dnode object since %s"
,
terrstr
());
}
else
{
dInfo
(
"dnode object is created, data:%p"
,
pDnode
);
}
return
pDnode
;
...
...
@@ -135,9 +139,6 @@ void dndClose(SDnode *pDnode) {
dInfo
(
"start to close dnode, data:%p"
,
pDnode
);
dndSetStatus
(
pDnode
,
DND_STAT_STOPPED
);
dndCleanupServer
(
pDnode
);
dndCleanupClient
(
pDnode
);
for
(
ENodeType
n
=
0
;
n
<
NODE_MAX
;
++
n
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
n
];
dndCloseNode
(
pWrapper
);
...
...
source/os/src/osShm.c
浏览文件 @
dcfd63a2
...
...
@@ -18,7 +18,9 @@
#include "os.h"
int32_t
taosCreateShm
(
SShm
*
pShm
,
int32_t
shmsize
)
{
int32_t
shmid
=
shmget
(
IPC_PRIVATE
,
shmsize
,
IPC_CREAT
|
0600
);
pShm
->
id
=
-
1
;
int32_t
shmid
=
shmget
(
0X95279527
,
shmsize
,
IPC_CREAT
|
0600
);
if
(
shmid
<
0
)
{
return
-
1
;
}
...
...
@@ -35,19 +37,19 @@ int32_t taosCreateShm(SShm* pShm, int32_t shmsize) {
}
void
taosDropShm
(
SShm
*
pShm
)
{
if
(
pShm
->
id
>
0
)
{
if
(
pShm
->
id
>
=
0
)
{
if
(
pShm
->
ptr
!=
NULL
)
{
shmdt
(
pShm
->
ptr
);
}
shmctl
(
pShm
->
id
,
IPC_RMID
,
NULL
);
}
pShm
->
id
=
0
;
pShm
->
id
=
-
1
;
pShm
->
size
=
0
;
pShm
->
ptr
=
NULL
;
}
int32_t
taosAttachShm
(
SShm
*
pShm
)
{
if
(
pShm
->
id
>
0
&&
pShm
->
size
>
0
)
{
if
(
pShm
->
id
>
=
0
)
{
pShm
->
ptr
=
shmat
(
pShm
->
id
,
NULL
,
0
);
if
(
pShm
->
ptr
!=
NULL
)
{
return
0
;
...
...
@@ -56,16 +58,3 @@ int32_t taosAttachShm(SShm* pShm) {
return
-
1
;
}
void
taosDetachShm
(
SShm
*
pShm
)
{
if
(
pShm
->
id
>
0
)
{
if
(
pShm
->
ptr
!=
NULL
)
{
shmdt
(
pShm
->
ptr
);
pShm
->
ptr
=
NULL
;
}
}
pShm
->
id
=
0
;
pShm
->
size
=
0
;
pShm
->
ptr
=
NULL
;
}
source/util/src/tprocess.c
浏览文件 @
dcfd63a2
...
...
@@ -23,34 +23,36 @@
typedef
void
*
(
*
ProcThreadFp
)(
void
*
param
);
typedef
struct
SProcQueue
{
int32_t
head
;
int32_t
tail
;
int32_t
total
;
int32_t
avail
;
int32_t
items
;
char
*
pBuffer
;
ProcMallocFp
mallocHeadFp
;
ProcFreeFp
freeHeadFp
;
ProcMallocFp
mallocBodyFp
;
ProcFreeFp
freeBodyFp
;
ProcConsumeFp
consumeFp
;
void
*
pParent
;
tsem_t
sem
;
TdThreadMutex
*
mutex
;
int32_t
mutexShmid
;
int32_t
bufferShmid
;
const
char
*
name
;
int32_t
head
;
int32_t
tail
;
int32_t
total
;
int32_t
avail
;
int32_t
items
;
char
name
[
8
];
TdThreadMutex
mutex
;
tsem_t
sem
;
char
pBuffer
[];
}
SProcQueue
;
typedef
struct
SProcObj
{
TdThread
childThread
;
SProcQueue
*
pChildQueue
;
TdThread
parentThread
;
SProcQueue
*
pParentQueue
;
const
char
*
name
;
int32_t
pid
;
bool
isChild
;
bool
stopFlag
;
TdThread
thread
;
SProcQueue
*
pChildQueue
;
SProcQueue
*
pParentQueue
;
ProcConsumeFp
childConsumeFp
;
ProcMallocFp
childMallocHeadFp
;
ProcFreeFp
childFreeHeadFp
;
ProcMallocFp
childMallocBodyFp
;
ProcFreeFp
childFreeBodyFp
;
ProcConsumeFp
parentConsumeFp
;
ProcMallocFp
parentMallocHeadFp
;
ProcFreeFp
parentFreeHeadFp
;
ProcMallocFp
parentMallocBodyFp
;
ProcFreeFp
parentFreeBodyFp
;
void
*
pParent
;
const
char
*
name
;
int32_t
pid
;
bool
isChild
;
bool
stopFlag
;
}
SProcObj
;
static
inline
int32_t
CEIL8
(
int32_t
v
)
{
...
...
@@ -58,150 +60,94 @@ static inline int32_t CEIL8(int32_t v) {
return
c
<
8
?
8
:
c
;
}
static
int32_t
taosProcInitMutex
(
TdThreadMutex
**
ppMutex
,
int32_t
*
pShmid
)
{
TdThreadMutex
*
pMutex
=
NULL
;
static
int32_t
taosProcInitMutex
(
SProcQueue
*
pQueue
)
{
TdThreadMutexAttr
mattr
=
{
0
};
int32_t
shmid
=
-
1
;
int32_t
code
=
-
1
;
if
(
taosThreadMutexAttrInit
(
&
mattr
)
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
uError
(
"failed to init mutex while init attr since %s"
,
terrstr
());
goto
_OVER
;
return
-
1
;
}
if
(
taosThreadMutexAttrSetPshared
(
&
mattr
,
PTHREAD_PROCESS_SHARED
)
!=
0
)
{
taosThreadMutexAttrDestroy
(
&
mattr
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
uError
(
"failed to init mutex while set shared since %s"
,
terrstr
());
goto
_OVER
;
}
shmid
=
shmget
(
IPC_PRIVATE
,
sizeof
(
TdThreadMutex
),
IPC_CREAT
|
0600
);
if
(
shmid
<=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
uError
(
"failed to init mutex while shmget since %s"
,
terrstr
());
goto
_OVER
;
}
pMutex
=
(
TdThreadMutex
*
)
shmat
(
shmid
,
NULL
,
0
);
if
(
pMutex
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
uError
(
"failed to init mutex while shmat since %s"
,
terrstr
());
goto
_OVER
;
return
-
1
;
}
if
(
taosThreadMutexInit
(
pMutex
,
&
mattr
)
!=
0
)
{
if
(
taosThreadMutexInit
(
&
pQueue
->
mutex
,
&
mattr
)
!=
0
)
{
taosThreadMutexDestroy
(
&
pQueue
->
mutex
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
uError
(
"failed to init mutex since %s"
,
terrstr
());
goto
_OVER
;
}
code
=
0
;
_OVER:
if
(
code
!=
0
)
{
if
(
pMutex
!=
NULL
)
{
taosThreadMutexDestroy
(
pMutex
);
shmdt
(
pMutex
);
}
if
(
shmid
>=
0
)
{
shmctl
(
shmid
,
IPC_RMID
,
NULL
);
}
}
else
{
*
ppMutex
=
pMutex
;
*
pShmid
=
shmid
;
return
-
1
;
}
taosThreadMutexAttrDestroy
(
&
mattr
);
return
code
;
}
static
void
taosProcDestroyMutex
(
TdThreadMutex
*
pMutex
,
int32_t
shmid
)
{
if
(
pMutex
!=
NULL
)
{
taosThreadMutexDestroy
(
pMutex
);
}
if
(
shmid
>=
0
)
{
shmctl
(
shmid
,
IPC_RMID
,
NULL
);
}
return
0
;
}
static
int32_t
taosProcInitBuffer
(
void
**
ppBuffer
,
int32_t
size
)
{
int32_t
shmid
=
shmget
(
IPC_PRIVATE
,
size
,
IPC_CREAT
|
0600
);
if
(
shmid
<=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
uError
(
"failed to init buffer while shmget since %s"
,
terrstr
());
return
-
1
;
}
void
*
shmptr
=
shmat
(
shmid
,
NULL
,
0
);
if
(
shmptr
==
NULL
)
{
static
int32_t
taosProcInitSem
(
SProcQueue
*
pQueue
)
{
if
(
tsem_init
(
&
pQueue
->
sem
,
1
,
0
)
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
uError
(
"failed to init buffer while shmat since %s"
,
terrstr
());
shmctl
(
shmid
,
IPC_RMID
,
NULL
);
uError
(
"failed to init sem"
);
return
-
1
;
}
*
ppBuffer
=
shmptr
;
return
shmid
;
return
0
;
}
static
void
taosProcDestroyBuffer
(
void
*
pBuffer
,
int32_t
shmid
)
{
if
(
shmid
>
0
)
{
shmdt
(
pBuffer
);
shmctl
(
shmid
,
IPC_RMID
,
NULL
);
static
SProcQueue
*
taosProcInitQueue
(
const
char
*
name
,
bool
isChild
,
char
*
ptr
,
int32_t
size
)
{
int32_t
bufSize
=
size
-
CEIL8
(
sizeof
(
SProcQueue
));
if
(
bufSize
<=
1024
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
}
static
SProcQueue
*
taosProcInitQueue
(
int32_t
size
)
{
if
(
size
<=
0
)
size
=
SHM_DEFAULT_SIZE
;
SProcQueue
*
pQueue
=
(
SProcQueue
*
)(
ptr
);
int32_t
bufSize
=
CEIL8
(
size
);
int32_t
headSize
=
CEIL8
(
sizeof
(
SProcQueue
));
if
(
!
isChild
)
{
if
(
taosProcInitMutex
(
pQueue
)
!=
0
)
{
return
NULL
;
}
SProcQueue
*
pQueue
=
NULL
;
int32_t
shmId
=
taosProcInitBuffer
((
void
**
)
&
pQueue
,
bufSize
+
headSize
);
if
(
shmId
<
0
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
pQueue
->
bufferShmid
=
shmId
;
if
(
taosProcInitSem
(
pQueue
)
!=
0
)
{
return
NULL
;
}
if
(
taosProcInitMutex
(
&
pQueue
->
mutex
,
&
pQueue
->
mutexShmid
)
!=
0
)
{
taosProcDestroyBuffer
(
pQueue
,
pQueue
->
bufferShmid
);
return
NULL
;
tstrncpy
(
pQueue
->
name
,
name
,
sizeof
(
pQueue
->
name
));
pQueue
->
head
=
0
;
pQueue
->
tail
=
0
;
pQueue
->
total
=
bufSize
;
pQueue
->
avail
=
bufSize
;
pQueue
->
items
=
0
;
}
if
(
tsem_init
(
&
pQueue
->
sem
,
1
,
0
)
!=
0
)
{
taosProcDestroyMutex
(
pQueue
->
mutex
,
pQueue
->
mutexShmid
);
taosProcDestroyBuffer
(
pQueue
,
pQueue
->
bufferShmid
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
return
pQueue
;
}
if
(
taosProcInitMutex
(
&
pQueue
->
mutex
,
&
pQueue
->
mutexShmid
)
!=
0
)
{
tsem_destroy
(
&
pQueue
->
sem
);
taosProcDestroyMutex
(
pQueue
->
mutex
,
pQueue
->
mutexShmid
);
taos
ProcDestroyBuffer
(
pQueue
,
pQueue
->
bufferShmid
);
return
NULL
;
#if 0
static void taosProcDestroyMutex(SProcQueue *pQueue) {
if (pQueue->mutex != NULL) {
taos
ThreadMutexDestroy(pQueue->mutex
);
pQueue->mutex =
NULL;
}
}
pQueue
->
head
=
0
;
pQueue
->
tail
=
0
;
pQueue
->
total
=
bufSize
;
pQueue
->
avail
=
bufSize
;
pQueue
->
items
=
0
;
pQueue
->
pBuffer
=
(
char
*
)
pQueue
+
headSize
;
return
pQueue
;
static void taosProcDestroySem(SProcQueue *pQueue) {
if (pQueue->sem != NULL) {
tsem_destroy(pQueue->sem);
pQueue->sem = NULL;
}
}
static void taosProcCleanupQueue(SProcQueue *pQueue) {
if (pQueue != NULL) {
uDebug
(
"proc:%s, queue:%p clean up"
,
pQueue
->
name
,
pQueue
);
tsem_destroy
(
&
pQueue
->
sem
);
taosProcDestroyMutex
(
pQueue
->
mutex
,
pQueue
->
mutexShmid
);
taosProcDestroyBuffer
(
pQueue
,
pQueue
->
bufferShmid
);
taosProcDestroyMutex(pQueue);
taosProcDestroySem(pQueue);
}
}
#endif
static
int32_t
taosProcQueuePush
(
SProcQueue
*
pQueue
,
const
char
*
pHead
,
int16_t
rawHeadLen
,
const
char
*
pBody
,
int32_t
rawBodyLen
,
ProcFuncType
ftype
)
{
...
...
@@ -209,9 +155,9 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t
const
int32_t
bodyLen
=
CEIL8
(
rawBodyLen
);
const
int32_t
fullLen
=
headLen
+
bodyLen
+
8
;
taosThreadMutexLock
(
pQueue
->
mutex
);
taosThreadMutexLock
(
&
pQueue
->
mutex
);
if
(
fullLen
>
pQueue
->
avail
)
{
taosThreadMutexUnlock
(
pQueue
->
mutex
);
taosThreadMutexUnlock
(
&
pQueue
->
mutex
);
terrno
=
TSDB_CODE_OUT_OF_SHM_MEM
;
return
-
1
;
}
...
...
@@ -260,7 +206,7 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t
pQueue
->
avail
-=
fullLen
;
pQueue
->
items
++
;
taosThreadMutexUnlock
(
pQueue
->
mutex
);
taosThreadMutexUnlock
(
&
pQueue
->
mutex
);
tsem_post
(
&
pQueue
->
sem
);
uTrace
(
"proc:%s, push msg at pos:%d ftype:%d remain:%d, head:%d %p body:%d %p"
,
pQueue
->
name
,
pos
,
ftype
,
...
...
@@ -268,13 +214,14 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t
return
0
;
}
static
int32_t
taosProcQueuePop
(
SProcQueue
*
pQueue
,
void
**
ppHead
,
int16_t
*
pHeadLen
,
void
**
ppBody
,
int32_t
*
pBodyLen
,
ProcFuncType
*
pFuncType
)
{
static
int32_t
taosProcQueuePop
(
SProcQueue
*
pQueue
,
void
**
ppHead
,
int16_t
*
pHeadLen
,
void
**
ppBody
,
int32_t
*
pBodyLen
,
ProcFuncType
*
pFuncType
,
ProcMallocFp
mallocHeadFp
,
ProcFreeFp
freeHeadFp
,
ProcMallocFp
mallocBodyFp
,
ProcFreeFp
freeBodyFp
)
{
tsem_wait
(
&
pQueue
->
sem
);
taosThreadMutexLock
(
pQueue
->
mutex
);
taosThreadMutexLock
(
&
pQueue
->
mutex
);
if
(
pQueue
->
total
-
pQueue
->
avail
<=
0
)
{
taosThreadMutexUnlock
(
pQueue
->
mutex
);
taosThreadMutexUnlock
(
&
pQueue
->
mutex
);
tsem_post
(
&
pQueue
->
sem
);
terrno
=
TSDB_CODE_OUT_OF_SHM_MEM
;
return
0
;
...
...
@@ -293,13 +240,13 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea
bodyLen
=
*
(
int32_t
*
)(
pQueue
->
pBuffer
+
4
);
}
void
*
pHead
=
(
*
pQueue
->
mallocHeadFp
)(
headLen
);
void
*
pBody
=
(
*
pQueue
->
mallocBodyFp
)(
bodyLen
);
void
*
pHead
=
(
*
mallocHeadFp
)(
headLen
);
void
*
pBody
=
(
*
mallocBodyFp
)(
bodyLen
);
if
(
pHead
==
NULL
||
pBody
==
NULL
)
{
taosThreadMutexUnlock
(
pQueue
->
mutex
);
taosThreadMutexUnlock
(
&
pQueue
->
mutex
);
tsem_post
(
&
pQueue
->
sem
);
(
*
pQueue
->
freeHeadFp
)(
pHead
);
(
*
pQueue
->
freeBodyFp
)(
pBody
);
(
*
freeHeadFp
)(
pHead
);
(
*
freeBodyFp
)(
pBody
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
...
...
@@ -338,7 +285,7 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea
pQueue
->
avail
=
pQueue
->
avail
+
headLen
+
bodyLen
+
8
;
pQueue
->
items
--
;
taosThreadMutexUnlock
(
pQueue
->
mutex
);
taosThreadMutexUnlock
(
&
pQueue
->
mutex
);
*
ppHead
=
pHead
;
*
ppBody
=
pBody
;
...
...
@@ -358,65 +305,85 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) {
return
NULL
;
}
int32_t
cstart
=
0
;
int32_t
csize
=
CEIL8
(
pCfg
->
shm
.
size
/
2
);
int32_t
pstart
=
csize
;
int32_t
psize
=
CEIL8
(
pCfg
->
shm
.
size
-
pstart
);
if
(
pstart
+
psize
>
pCfg
->
shm
.
size
)
{
psize
-=
8
;
}
pProc
->
name
=
pCfg
->
name
;
pProc
->
pChildQueue
=
taosProcInitQueue
(
pCfg
->
childQueueS
ize
);
pProc
->
pParentQueue
=
taosProcInitQueue
(
pCfg
->
parentQueueS
ize
);
pProc
->
pChildQueue
=
taosProcInitQueue
(
pCfg
->
name
,
pCfg
->
isChild
,
(
char
*
)
pCfg
->
shm
.
ptr
+
cstart
,
cs
ize
);
pProc
->
pParentQueue
=
taosProcInitQueue
(
pCfg
->
name
,
pCfg
->
isChild
,
(
char
*
)
pCfg
->
shm
.
ptr
+
pstart
,
ps
ize
);
if
(
pProc
->
pChildQueue
==
NULL
||
pProc
->
pParentQueue
==
NULL
)
{
taosProcCleanupQueue
(
pProc
->
pChildQueue
);
//
taosProcCleanupQueue(pProc->pChildQueue);
taosMemoryFree
(
pProc
);
return
NULL
;
}
pProc
->
pChildQueue
->
name
=
pCfg
->
name
;
pProc
->
pChildQueue
->
pParent
=
pCfg
->
pParent
;
pProc
->
pChildQueue
->
mallocHeadFp
=
pCfg
->
childMallocHeadFp
;
pProc
->
pChildQueue
->
freeHeadFp
=
pCfg
->
childFreeHeadFp
;
pProc
->
pChildQueue
->
mallocBodyFp
=
pCfg
->
childMallocBodyFp
;
pProc
->
pChildQueue
->
freeBodyFp
=
pCfg
->
childFreeBodyFp
;
pProc
->
pChildQueue
->
consumeFp
=
pCfg
->
childConsumeFp
;
pProc
->
pParentQueue
->
name
=
pCfg
->
name
;
pProc
->
pParentQueue
->
pParent
=
pCfg
->
pParent
;
pProc
->
pParentQueue
->
mallocHeadFp
=
pCfg
->
parentdMallocHeadFp
;
pProc
->
pParentQueue
->
freeHeadFp
=
pCfg
->
parentFreeHeadFp
;
pProc
->
pParentQueue
->
mallocBodyFp
=
pCfg
->
parentMallocBodyFp
;
pProc
->
pParentQueue
->
freeBodyFp
=
pCfg
->
parentFreeBodyFp
;
pProc
->
pParentQueue
->
consumeFp
=
pCfg
->
parentConsumeFp
;
uDebug
(
"proc:%s, is initialized, child queue:%p parent queue:%p"
,
pProc
->
name
,
pProc
->
pChildQueue
,
pProc
->
pParentQueue
);
pProc
->
pid
=
fork
();
if
(
pProc
->
pid
==
0
)
{
pProc
->
isChild
=
1
;
prctl
(
PR_SET_NAME
,
pProc
->
name
,
NULL
,
NULL
,
NULL
);
}
else
{
pProc
->
isChild
=
0
;
uInfo
(
"this is parent process, child pid:%d"
,
pProc
->
pid
);
}
pProc
->
name
=
pCfg
->
name
;
pProc
->
pParent
=
pCfg
->
pParent
;
pProc
->
childMallocHeadFp
=
pCfg
->
childMallocHeadFp
;
pProc
->
childFreeHeadFp
=
pCfg
->
childFreeHeadFp
;
pProc
->
childMallocBodyFp
=
pCfg
->
childMallocBodyFp
;
pProc
->
childFreeBodyFp
=
pCfg
->
childFreeBodyFp
;
pProc
->
childConsumeFp
=
pCfg
->
childConsumeFp
;
pProc
->
parentMallocHeadFp
=
pCfg
->
parentMallocHeadFp
;
pProc
->
parentFreeHeadFp
=
pCfg
->
parentFreeHeadFp
;
pProc
->
parentMallocBodyFp
=
pCfg
->
parentMallocBodyFp
;
pProc
->
parentFreeBodyFp
=
pCfg
->
parentFreeBodyFp
;
pProc
->
parentConsumeFp
=
pCfg
->
parentConsumeFp
;
pProc
->
isChild
=
pCfg
->
isChild
;
uDebug
(
"proc:%s, is initialized, child:%d child queue:%p parent queue:%p"
,
pProc
->
name
,
pProc
->
isChild
,
pProc
->
pChildQueue
,
pProc
->
pParentQueue
);
return
pProc
;
}
static
void
taosProcThreadLoop
(
SProcQueue
*
pQueue
)
{
ProcConsumeFp
consumeFp
=
pQueue
->
consumeFp
;
void
*
pParent
=
pQueue
->
pParent
;
static
void
taosProcThreadLoop
(
SProcObj
*
pProc
)
{
void
*
pHead
,
*
pBody
;
int16_t
headLen
;
ProcFuncType
ftype
;
int32_t
bodyLen
;
SProcQueue
*
pQueue
;
ProcConsumeFp
consumeFp
;
ProcMallocFp
mallocHeadFp
;
ProcFreeFp
freeHeadFp
;
ProcMallocFp
mallocBodyFp
;
ProcFreeFp
freeBodyFp
;
uDebug
(
"proc:%s, start to get msg from queue:%p"
,
pQueue
->
name
,
pQueue
);
if
(
pProc
->
isChild
)
{
pQueue
=
pProc
->
pChildQueue
;
consumeFp
=
pProc
->
childConsumeFp
;
mallocHeadFp
=
pProc
->
childMallocHeadFp
;
freeHeadFp
=
pProc
->
childFreeHeadFp
;
mallocBodyFp
=
pProc
->
childMallocBodyFp
;
freeBodyFp
=
pProc
->
childFreeBodyFp
;
}
else
{
pQueue
=
pProc
->
pParentQueue
;
consumeFp
=
pProc
->
parentConsumeFp
;
mallocHeadFp
=
pProc
->
parentMallocHeadFp
;
freeHeadFp
=
pProc
->
parentFreeHeadFp
;
mallocBodyFp
=
pProc
->
parentMallocBodyFp
;
freeBodyFp
=
pProc
->
parentFreeBodyFp
;
}
uDebug
(
"proc:%s, start to get msg from queue:%p"
,
pProc
->
name
,
pQueue
);
while
(
1
)
{
int32_t
numOfMsgs
=
taosProcQueuePop
(
pQueue
,
&
pHead
,
&
headLen
,
&
pBody
,
&
bodyLen
,
&
ftype
);
int32_t
numOfMsgs
=
taosProcQueuePop
(
pQueue
,
&
pHead
,
&
headLen
,
&
pBody
,
&
bodyLen
,
&
ftype
,
mallocHeadFp
,
freeHeadFp
,
mallocBodyFp
,
freeBodyFp
);
if
(
numOfMsgs
==
0
)
{
u
Debug
(
"proc:%s, get no msg from queue:%p and exit the proc thread"
,
pQueue
->
name
,
pQueue
);
u
Info
(
"proc:%s, get no msg from queue:%p and exit the proc thread"
,
pProc
->
name
,
pQueue
);
break
;
}
else
if
(
numOfMsgs
<
0
)
{
uTrace
(
"proc:%s, get no msg from queue:%p since %s"
,
p
Queue
->
name
,
pQueue
,
terrstr
());
uTrace
(
"proc:%s, get no msg from queue:%p since %s"
,
p
Proc
->
name
,
pQueue
,
terrstr
());
taosMsleep
(
1
);
continue
;
}
else
{
(
*
consumeFp
)(
pParent
,
pHead
,
headLen
,
pBody
,
bodyLen
,
ftype
);
(
*
consumeFp
)(
pP
roc
->
pP
arent
,
pHead
,
headLen
,
pBody
,
bodyLen
,
ftype
);
}
}
}
...
...
@@ -426,40 +393,36 @@ int32_t taosProcRun(SProcObj *pProc) {
taosThreadAttrInit
(
&
thAttr
);
taosThreadAttrSetDetachState
(
&
thAttr
,
PTHREAD_CREATE_JOINABLE
);
if
(
pProc
->
isChild
)
{
if
(
taosThreadCreate
(
&
pProc
->
childThread
,
&
thAttr
,
(
ProcThreadFp
)
taosProcThreadLoop
,
pProc
->
pChildQueue
)
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
uError
(
"failed to create thread since %s"
,
terrstr
());
return
-
1
;
}
uDebug
(
"proc:%s, child start to consume queue:%p"
,
pProc
->
name
,
pProc
->
pChildQueue
);
}
else
{
if
(
taosThreadCreate
(
&
pProc
->
parentThread
,
&
thAttr
,
(
ProcThreadFp
)
taosProcThreadLoop
,
pProc
->
pParentQueue
)
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
uError
(
"failed to create thread since %s"
,
terrstr
());
return
-
1
;
}
uDebug
(
"proc:%s, parent start to consume queue:%p"
,
pProc
->
name
,
pProc
->
pParentQueue
);
if
(
taosThreadCreate
(
&
pProc
->
thread
,
&
thAttr
,
(
ProcThreadFp
)
taosProcThreadLoop
,
pProc
)
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
uError
(
"failed to create thread since %s"
,
terrstr
());
return
-
1
;
}
uDebug
(
"proc:%s, start to consume queue:%p"
,
pProc
->
name
,
pProc
->
pChildQueue
);
return
0
;
}
void
taosProcStop
(
SProcObj
*
pProc
)
{
pProc
->
stopFlag
=
true
;
// todo join
}
bool
taosProcIsChild
(
SProcObj
*
pProc
)
{
return
pProc
->
isChild
;
}
static
void
taosProcStop
(
SProcObj
*
pProc
)
{
if
(
!
taosCheckPthreadValid
(
pProc
->
thread
))
return
;
int32_t
taosProcChildId
(
SProcObj
*
pProc
)
{
return
pProc
->
pid
;
}
uDebug
(
"proc:%s, start to join thread"
,
pProc
->
name
);
SProcQueue
*
pQueue
;
if
(
pProc
->
isChild
)
{
pQueue
=
pProc
->
pParentQueue
;
}
else
{
pQueue
=
pProc
->
pChildQueue
;
}
tsem_post
(
&
pQueue
->
sem
);
taosThreadJoin
(
pProc
->
thread
,
NULL
);
}
void
taosProcCleanup
(
SProcObj
*
pProc
)
{
if
(
pProc
!=
NULL
)
{
uDebug
(
"proc:%s, clean up"
,
pProc
->
name
);
taosProcStop
(
pProc
);
taosProcCleanupQueue
(
pProc
->
pChildQueue
);
taosProcCleanupQueue
(
pProc
->
pParentQueue
);
//
taosProcCleanupQueue(pProc->pChildQueue);
//
taosProcCleanupQueue(pProc->pParentQueue);
taosMemoryFree
(
pProc
);
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录