Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
79936567
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
79936567
编写于
3月 30, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
shm
上级
ae182cd7
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
196 addition
and
241 deletion
+196
-241
include/os/osShm.h
include/os/osShm.h
+2
-3
include/util/tprocess.h
include/util/tprocess.h
+2
-4
source/dnode/mgmt/main/exe/dndMain.c
source/dnode/mgmt/main/exe/dndMain.c
+4
-0
source/dnode/mgmt/main/src/dndExec.c
source/dnode/mgmt/main/src/dndExec.c
+6
-4
source/dnode/mgmt/main/src/dndFile.c
source/dnode/mgmt/main/src/dndFile.c
+14
-16
source/dnode/mgmt/main/src/dndObj.c
source/dnode/mgmt/main/src/dndObj.c
+1
-0
source/os/src/osShm.c
source/os/src/osShm.c
+7
-18
source/util/src/tprocess.c
source/util/src/tprocess.c
+160
-196
未找到文件。
include/os/osShm.h
浏览文件 @
79936567
...
...
@@ -22,14 +22,13 @@ extern "C" {
typedef
struct
{
int32_t
id
;
int
64
_t
size
;
int
32
_t
size
;
void
*
ptr
;
}
SShm
;
int32_t
taosCreateShm
(
SShm
*
pShm
,
int
64
_t
shmsize
)
;
int32_t
taosCreateShm
(
SShm
*
pShm
,
int
32
_t
shmsize
)
;
void
taosDropShm
(
SShm
*
pShm
);
int32_t
taosAttachShm
(
SShm
*
pShm
);
void
taosDetachShm
(
SShm
*
pShm
);
#ifdef __cplusplus
}
...
...
include/util/tprocess.h
浏览文件 @
79936567
...
...
@@ -38,21 +38,19 @@ typedef struct {
ProcMallocFp
childMallocBodyFp
;
ProcFreeFp
childFreeBodyFp
;
ProcConsumeFp
parentConsumeFp
;
ProcMallocFp
parent
d
MallocHeadFp
;
ProcMallocFp
parentMallocHeadFp
;
ProcFreeFp
parentFreeHeadFp
;
ProcMallocFp
parentMallocBodyFp
;
ProcFreeFp
parentFreeBodyFp
;
SShm
shm
;
void
*
pParent
;
const
char
*
name
;
bool
isChild
;
}
SProcCfg
;
SProcObj
*
taosProcInit
(
const
SProcCfg
*
pCfg
);
void
taosProcCleanup
(
SProcObj
*
pProc
);
int32_t
taosProcRun
(
SProcObj
*
pProc
);
void
taosProcStop
(
SProcObj
*
pProc
);
bool
taosProcIsChild
(
SProcObj
*
pProc
);
int32_t
taosProcChildId
(
SProcObj
*
pProc
);
int32_t
taosProcPutToChildQ
(
SProcObj
*
pProc
,
const
void
*
pHead
,
int16_t
headLen
,
const
void
*
pBody
,
int32_t
bodyLen
,
ProcFuncType
ftype
);
int32_t
taosProcPutToParentQ
(
SProcObj
*
pProc
,
const
void
*
pHead
,
int16_t
headLen
,
const
void
*
pBody
,
int32_t
bodyLen
,
...
...
source/dnode/mgmt/main/exe/dndMain.c
浏览文件 @
79936567
...
...
@@ -83,6 +83,10 @@ static int32_t dndParseArgs(int32_t argc, char const *argv[]) {
global
.
generateGrant
=
true
;
}
else
if
(
strcmp
(
argv
[
i
],
"-n"
)
==
0
)
{
global
.
ntype
=
atoi
(
argv
[
++
i
]);
if
(
global
.
ntype
<=
DNODE
||
global
.
ntype
>
NODE_MAX
)
{
printf
(
"'-n' range is [1-5], default is 0
\n
"
);
return
-
1
;
}
}
else
if
(
strcmp
(
argv
[
i
],
"-C"
)
==
0
)
{
global
.
dumpConfig
=
true
;
}
else
if
(
strcmp
(
argv
[
i
],
"-V"
)
==
0
)
{
...
...
source/dnode/mgmt/main/src/dndExec.c
浏览文件 @
79936567
...
...
@@ -166,14 +166,16 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
.
childMallocBodyFp
=
(
ProcMallocFp
)
rpcMallocCont
,
.
childFreeBodyFp
=
(
ProcFreeFp
)
rpcFreeCont
,
.
parentConsumeFp
=
(
ProcConsumeFp
)
dndConsumeParentQueue
,
.
parent
d
MallocHeadFp
=
(
ProcMallocFp
)
taosMemoryMalloc
,
.
parentMallocHeadFp
=
(
ProcMallocFp
)
taosMemoryMalloc
,
.
parentFreeHeadFp
=
(
ProcFreeFp
)
taosMemoryFree
,
.
parentMallocBodyFp
=
(
ProcMallocFp
)
rpcMallocCont
,
.
parentFreeBodyFp
=
(
ProcFreeFp
)
rpcFreeCont
,
.
shm
=
pWrapper
->
shm
,
.
pParent
=
pWrapper
,
.
isChild
=
false
,
.
name
=
pWrapper
->
name
};
pWrapper
->
procType
=
PROC_PARENT
;
pWrapper
->
pProc
=
taosProcInit
(
&
cfg
);
if
(
pWrapper
->
pProc
==
NULL
)
{
dError
(
"node:%s, failed to create proc since %s"
,
pWrapper
->
name
,
terrstr
());
...
...
@@ -193,7 +195,6 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
dInfo
(
"node:%s, will not start in parent process"
,
pWrapper
->
name
);
// exec new node
pWrapper
->
procType
=
PROC_PARENT
;
if
(
taosProcRun
(
pWrapper
->
pProc
)
!=
0
)
{
dError
(
"node:%s, failed to run proc since %s"
,
pWrapper
->
name
,
terrstr
());
return
-
1
;
...
...
@@ -226,21 +227,22 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) {
.
childMallocBodyFp
=
(
ProcMallocFp
)
rpcMallocCont
,
.
childFreeBodyFp
=
(
ProcFreeFp
)
rpcFreeCont
,
.
parentConsumeFp
=
(
ProcConsumeFp
)
dndConsumeParentQueue
,
.
parent
d
MallocHeadFp
=
(
ProcMallocFp
)
taosMemoryMalloc
,
.
parentMallocHeadFp
=
(
ProcMallocFp
)
taosMemoryMalloc
,
.
parentFreeHeadFp
=
(
ProcFreeFp
)
taosMemoryFree
,
.
parentMallocBodyFp
=
(
ProcMallocFp
)
rpcMallocCont
,
.
parentFreeBodyFp
=
(
ProcFreeFp
)
rpcFreeCont
,
.
shm
=
pWrapper
->
shm
,
.
pParent
=
pWrapper
,
.
isChild
=
true
,
.
name
=
pWrapper
->
name
};
pWrapper
->
procType
=
PROC_CHILD
;
pWrapper
->
pProc
=
taosProcInit
(
&
cfg
);
if
(
pWrapper
->
pProc
==
NULL
)
{
dError
(
"node:%s, failed to create proc since %s"
,
pWrapper
->
name
,
terrstr
());
return
-
1
;
}
pWrapper
->
procType
=
PROC_CHILD
;
if
(
taosProcRun
(
pWrapper
->
pProc
)
!=
0
)
{
dError
(
"node:%s, failed to run proc since %s"
,
pWrapper
->
name
,
terrstr
());
return
-
1
;
...
...
source/dnode/mgmt/main/src/dndFile.c
浏览文件 @
79936567
...
...
@@ -167,23 +167,23 @@ int32_t dndReadShmFile(SDnode *pDnode) {
for
(
ENodeType
ntype
=
DNODE
+
1
;
ntype
<
NODE_MAX
;
++
ntype
)
{
snprintf
(
itemName
,
sizeof
(
itemName
),
"%s_shmid"
,
dndNodeProcStr
(
ntype
));
cJSON
*
shmid
=
cJSON_GetObjectItem
(
root
,
itemName
);
if
(
shmid
&&
shmid
->
type
==
cJSON_
String
)
{
pDnode
->
wrappers
[
ntype
].
shm
.
id
=
atoi
(
shmid
->
valuestring
)
;
if
(
shmid
&&
shmid
->
type
==
cJSON_
Number
)
{
pDnode
->
wrappers
[
ntype
].
shm
.
id
=
shmid
->
valueint
;
}
snprintf
(
itemName
,
sizeof
(
itemName
),
"%s_shmsize"
,
dndNodeProcStr
(
ntype
));
cJSON
*
shmsize
=
cJSON_GetObjectItem
(
root
,
itemName
);
if
(
shmsize
&&
shmsize
->
type
==
cJSON_
String
)
{
pDnode
->
wrappers
[
ntype
].
shm
.
size
=
atoll
(
shmsize
->
valuestring
)
;
if
(
shmsize
&&
shmsize
->
type
==
cJSON_
Number
)
{
pDnode
->
wrappers
[
ntype
].
shm
.
size
=
shmsize
->
valueint
;
}
}
}
if
(
tsMultiProcess
||
pDnode
->
ntype
==
DNODE
)
{
if
(
!
tsMultiProcess
||
pDnode
->
ntype
==
DNODE
)
{
for
(
ENodeType
ntype
=
DNODE
;
ntype
<
NODE_MAX
;
++
ntype
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
pDnode
->
ntype
];
if
(
pWrapper
->
shm
.
id
>
0
)
{
dDebug
(
"shmid:%d, is closed, size:%
"
PRId64
,
pWrapper
->
shm
.
id
,
pWrapper
->
shm
.
size
);
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
ntype
];
if
(
pWrapper
->
shm
.
id
>
=
0
)
{
dDebug
(
"shmid:%d, is closed, size:%
d"
,
pWrapper
->
shm
.
id
,
pWrapper
->
shm
.
size
);
taosDropShm
(
&
pWrapper
->
shm
);
}
}
...
...
@@ -194,7 +194,7 @@ int32_t dndReadShmFile(SDnode *pDnode) {
dError
(
"shmid:%d, failed to attach since %s"
,
pWrapper
->
shm
.
id
,
terrstr
());
goto
_OVER
;
}
dDebug
(
"shmid:%d, is attached, size:%
"
PRId64
,
pWrapper
->
shm
.
id
,
pWrapper
->
shm
.
size
);
dDebug
(
"shmid:%d, is attached, size:%
d"
,
pWrapper
->
shm
.
id
,
pWrapper
->
shm
.
size
);
}
dDebug
(
"successed to open %s"
,
file
);
...
...
@@ -227,14 +227,12 @@ int32_t dndWriteShmFile(SDnode *pDnode) {
len
+=
snprintf
(
content
+
len
,
MAXLEN
-
len
,
"{
\n
"
);
for
(
ENodeType
ntype
=
DNODE
+
1
;
ntype
<
NODE_MAX
;
++
ntype
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
pDnode
->
ntype
];
len
+=
snprintf
(
content
+
len
,
MAXLEN
-
len
,
"
\"
%s_shmid
\"
:
\"
%d
\"
,
\n
"
,
dndNodeProcStr
(
ntype
),
pWrapper
->
shm
.
id
);
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
ntype
];
len
+=
snprintf
(
content
+
len
,
MAXLEN
-
len
,
"
\"
%s_shmid
\"
:
%d
,
\n
"
,
dndNodeProcStr
(
ntype
),
pWrapper
->
shm
.
id
);
if
(
ntype
==
NODE_MAX
-
1
)
{
len
+=
snprintf
(
content
+
len
,
MAXLEN
-
len
,
"
\"
%s_shmsize
\"
:
\"
%"
PRId64
"
\"\n
"
,
dndNodeProcStr
(
ntype
),
pWrapper
->
shm
.
size
);
len
+=
snprintf
(
content
+
len
,
MAXLEN
-
len
,
"
\"
%s_shmsize
\"
:%d
\n
"
,
dndNodeProcStr
(
ntype
),
pWrapper
->
shm
.
size
);
}
else
{
len
+=
snprintf
(
content
+
len
,
MAXLEN
-
len
,
"
\"
%s_shmsize
\"
:
\"
%"
PRId64
"
\"
,
\n
"
,
dndNodeProcStr
(
ntype
),
pWrapper
->
shm
.
size
);
len
+=
snprintf
(
content
+
len
,
MAXLEN
-
len
,
"
\"
%s_shmsize
\"
:%d,
\n
"
,
dndNodeProcStr
(
ntype
),
pWrapper
->
shm
.
size
);
}
}
len
+=
snprintf
(
content
+
len
,
MAXLEN
-
len
,
"}
\n
"
);
...
...
@@ -259,7 +257,7 @@ int32_t dndWriteShmFile(SDnode *pDnode) {
return
-
1
;
}
d
Debug
(
"successed to write %s"
,
realfile
);
d
Info
(
"successed to write %s"
,
realfile
);
code
=
0
;
_OVER:
...
...
source/dnode/mgmt/main/src/dndObj.c
浏览文件 @
79936567
...
...
@@ -91,6 +91,7 @@ SDnode *dndCreate(const SDnodeOpt *pOption) {
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
n
];
snprintf
(
path
,
sizeof
(
path
),
"%s%s%s"
,
pDnode
->
dataDir
,
TD_DIRSEP
,
pWrapper
->
name
);
pWrapper
->
path
=
strdup
(
path
);
pWrapper
->
shm
.
id
=
-
1
;
pWrapper
->
pDnode
=
pDnode
;
if
(
pWrapper
->
path
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
source/os/src/osShm.c
浏览文件 @
79936567
...
...
@@ -17,8 +17,10 @@
#define _DEFAULT_SOURCE
#include "os.h"
int32_t
taosCreateShm
(
SShm
*
pShm
,
int64_t
shmsize
)
{
int32_t
shmid
=
shmget
(
IPC_PRIVATE
,
(
size_t
)
shmsize
,
IPC_CREAT
|
0600
);
int32_t
taosCreateShm
(
SShm
*
pShm
,
int32_t
shmsize
)
{
pShm
->
id
=
-
1
;
int32_t
shmid
=
shmget
(
0X95279527
,
shmsize
,
IPC_CREAT
|
0600
);
if
(
shmid
<
0
)
{
return
-
1
;
}
...
...
@@ -35,19 +37,19 @@ int32_t taosCreateShm(SShm* pShm, int64_t shmsize) {
}
void
taosDropShm
(
SShm
*
pShm
)
{
if
(
pShm
->
id
>
0
)
{
if
(
pShm
->
id
>
=
0
)
{
if
(
pShm
->
ptr
!=
NULL
)
{
shmdt
(
pShm
->
ptr
);
}
shmctl
(
pShm
->
id
,
IPC_RMID
,
NULL
);
}
pShm
->
id
=
0
;
pShm
->
id
=
-
1
;
pShm
->
size
=
0
;
pShm
->
ptr
=
NULL
;
}
int32_t
taosAttachShm
(
SShm
*
pShm
)
{
if
(
pShm
->
id
>
0
&&
pShm
->
size
>
0
)
{
if
(
pShm
->
id
>
=
0
)
{
pShm
->
ptr
=
shmat
(
pShm
->
id
,
NULL
,
0
);
if
(
pShm
->
ptr
!=
NULL
)
{
return
0
;
...
...
@@ -56,16 +58,3 @@ int32_t taosAttachShm(SShm* pShm) {
return
-
1
;
}
void
taosDetachShm
(
SShm
*
pShm
)
{
if
(
pShm
->
id
>
0
)
{
if
(
pShm
->
ptr
!=
NULL
)
{
shmdt
(
pShm
->
ptr
);
pShm
->
ptr
=
NULL
;
}
}
pShm
->
id
=
0
;
pShm
->
size
=
0
;
pShm
->
ptr
=
NULL
;
}
source/util/src/tprocess.c
浏览文件 @
79936567
...
...
@@ -23,34 +23,36 @@
typedef
void
*
(
*
ProcThreadFp
)(
void
*
param
);
typedef
struct
SProcQueue
{
int32_t
head
;
int32_t
tail
;
int32_t
total
;
int32_t
avail
;
int32_t
items
;
char
*
pBuffer
;
ProcMallocFp
mallocHeadFp
;
ProcFreeFp
freeHeadFp
;
ProcMallocFp
mallocBodyFp
;
ProcFreeFp
freeBodyFp
;
ProcConsumeFp
consumeFp
;
void
*
pParent
;
tsem_t
sem
;
TdThreadMutex
*
mutex
;
int32_t
mutexShmid
;
int32_t
bufferShmid
;
const
char
*
name
;
int32_t
head
;
int32_t
tail
;
int32_t
total
;
int32_t
avail
;
int32_t
items
;
char
name
[
8
];
TdThreadMutex
mutex
;
tsem_t
sem
;
char
pBuffer
[];
}
SProcQueue
;
typedef
struct
SProcObj
{
TdThread
childThread
;
SProcQueue
*
pChildQueue
;
TdThread
parentThread
;
SProcQueue
*
pParentQueue
;
const
char
*
name
;
int32_t
pid
;
bool
isChild
;
bool
stopFlag
;
TdThread
thread
;
SProcQueue
*
pChildQueue
;
SProcQueue
*
pParentQueue
;
ProcConsumeFp
childConsumeFp
;
ProcMallocFp
childMallocHeadFp
;
ProcFreeFp
childFreeHeadFp
;
ProcMallocFp
childMallocBodyFp
;
ProcFreeFp
childFreeBodyFp
;
ProcConsumeFp
parentConsumeFp
;
ProcMallocFp
parentMallocHeadFp
;
ProcFreeFp
parentFreeHeadFp
;
ProcMallocFp
parentMallocBodyFp
;
ProcFreeFp
parentFreeBodyFp
;
void
*
pParent
;
const
char
*
name
;
int32_t
pid
;
bool
isChild
;
bool
stopFlag
;
}
SProcObj
;
static
inline
int32_t
CEIL8
(
int32_t
v
)
{
...
...
@@ -58,150 +60,95 @@ static inline int32_t CEIL8(int32_t v) {
return
c
<
8
?
8
:
c
;
}
static
int32_t
taosProcInitMutex
(
TdThreadMutex
**
ppMutex
,
int32_t
*
pShmid
)
{
TdThreadMutex
*
pMutex
=
NULL
;
static
int32_t
taosProcInitMutex
(
SProcQueue
*
pQueue
)
{
TdThreadMutexAttr
mattr
=
{
0
};
int32_t
shmid
=
-
1
;
int32_t
code
=
-
1
;
if
(
pthread_mutexattr_init
(
&
mattr
)
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
uError
(
"failed to init mutex while init attr since %s"
,
terrstr
());
goto
_OVER
;
return
-
1
;
}
if
(
pthread_mutexattr_setpshared
(
&
mattr
,
PTHREAD_PROCESS_SHARED
)
!=
0
)
{
pthread_mutexattr_destroy
(
&
mattr
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
uError
(
"failed to init mutex while set shared since %s"
,
terrstr
());
goto
_OVER
;
}
shmid
=
shmget
(
IPC_PRIVATE
,
sizeof
(
TdThreadMutex
),
IPC_CREAT
|
0600
);
if
(
shmid
<=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
uError
(
"failed to init mutex while shmget since %s"
,
terrstr
());
goto
_OVER
;
}
pMutex
=
(
TdThreadMutex
*
)
shmat
(
shmid
,
NULL
,
0
);
if
(
pMutex
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
uError
(
"failed to init mutex while shmat since %s"
,
terrstr
());
goto
_OVER
;
return
-
1
;
}
if
(
taosThreadMutexInit
(
pMutex
,
&
mattr
)
!=
0
)
{
if
(
taosThreadMutexInit
(
&
pQueue
->
mutex
,
&
mattr
)
!=
0
)
{
pthread_mutexattr_destroy
(
&
mattr
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
uError
(
"failed to init mutex since %s"
,
terrstr
());
goto
_OVER
;
}
code
=
0
;
_OVER:
if
(
code
!=
0
)
{
if
(
pMutex
!=
NULL
)
{
taosThreadMutexDestroy
(
pMutex
);
shmdt
(
pMutex
);
}
if
(
shmid
>=
0
)
{
shmctl
(
shmid
,
IPC_RMID
,
NULL
);
}
}
else
{
*
ppMutex
=
pMutex
;
*
pShmid
=
shmid
;
return
-
1
;
}
pthread_mutexattr_destroy
(
&
mattr
);
return
code
;
}
static
void
taosProcDestroyMutex
(
TdThreadMutex
*
pMutex
,
int32_t
shmid
)
{
if
(
pMutex
!=
NULL
)
{
taosThreadMutexDestroy
(
pMutex
);
}
if
(
shmid
>=
0
)
{
shmctl
(
shmid
,
IPC_RMID
,
NULL
);
}
return
0
;
}
static
int32_t
taosProcInitBuffer
(
void
**
ppBuffer
,
int32_t
size
)
{
int32_t
shmid
=
shmget
(
IPC_PRIVATE
,
size
,
IPC_CREAT
|
0600
);
if
(
shmid
<=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
uError
(
"failed to init buffer while shmget since %s"
,
terrstr
());
return
-
1
;
}
void
*
shmptr
=
shmat
(
shmid
,
NULL
,
0
);
if
(
shmptr
==
NULL
)
{
static
int32_t
taosProcInitSem
(
SProcQueue
*
pQueue
)
{
if
(
tsem_init
(
&
pQueue
->
sem
,
1
,
0
)
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
uError
(
"failed to init buffer while shmat since %s"
,
terrstr
());
shmctl
(
shmid
,
IPC_RMID
,
NULL
);
uError
(
"failed to init sem"
);
return
-
1
;
}
*
ppBuffer
=
shmptr
;
return
shmid
;
return
0
;
}
static
void
taosProcDestroyBuffer
(
void
*
pBuffer
,
int32_t
shmid
)
{
if
(
shmid
>
0
)
{
shmdt
(
pBuffer
);
shmctl
(
shmid
,
IPC_RMID
,
NULL
);
static
SProcQueue
*
taosProcInitQueue
(
const
char
*
name
,
bool
isChild
,
char
*
ptr
,
int32_t
size
)
{
int32_t
bufSize
=
size
-
CEIL8
(
sizeof
(
SProcQueue
));
if
(
bufSize
<=
1024
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
}
static
SProcQueue
*
taosProcInitQueue
(
int32_t
size
)
{
if
(
size
<=
0
)
size
=
SHM_DEFAULT_SIZE
;
SProcQueue
*
pQueue
=
(
SProcQueue
*
)(
ptr
);
int32_t
bufSize
=
CEIL8
(
size
);
int32_t
headSize
=
CEIL8
(
sizeof
(
SProcQueue
));
if
(
!
isChild
)
{
if
(
taosProcInitMutex
(
pQueue
)
!=
0
)
{
return
NULL
;
}
SProcQueue
*
pQueue
=
NULL
;
int32_t
shmId
=
taosProcInitBuffer
((
void
**
)
&
pQueue
,
bufSize
+
headSize
);
if
(
shmId
<
0
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
pQueue
->
bufferShmid
=
shmId
;
if
(
taosProcInitSem
(
pQueue
)
!=
0
)
{
return
NULL
;
}
if
(
taosProcInitMutex
(
&
pQueue
->
mutex
,
&
pQueue
->
mutexShmid
)
!=
0
)
{
taosProcDestroyBuffer
(
pQueue
,
pQueue
->
bufferShmid
);
return
NULL
;
tstrncpy
(
pQueue
->
name
,
name
,
sizeof
(
pQueue
->
name
));
pQueue
->
head
=
0
;
pQueue
->
tail
=
0
;
pQueue
->
total
=
bufSize
;
pQueue
->
avail
=
bufSize
;
pQueue
->
items
=
0
;
}
if
(
tsem_init
(
&
pQueue
->
sem
,
1
,
0
)
!=
0
)
{
taosProcDestroyMutex
(
pQueue
->
mutex
,
pQueue
->
mutexShmid
);
taosProcDestroyBuffer
(
pQueue
,
pQueue
->
bufferShmid
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
return
pQueue
;
}
#if 0
static void taosProcDestroyMutex(SProcQueue *pQueue) {
if (pQueue->mutex != NULL) {
taosThreadMutexDestroy(pQueue->mutex);
pQueue->mutex = NULL;
}
}
if
(
taosProcInitMutex
(
&
pQueue
->
mutex
,
&
pQueue
->
mutexShmid
)
!=
0
)
{
tsem_destroy
(
&
pQueue
->
sem
);
taosProcDestroyMutex
(
pQueue
->
mutex
,
pQueue
->
mutexShmid
);
taosProcDestroyBuffer
(
pQueue
,
pQueue
->
bufferShmid
);
return
NULL
;
static void taosProcDestroySem(SProcQueue *pQueue) {
if (pQueue->sem != NULL) {
tsem_destroy(pQueue->sem);
pQueue->sem = NULL;
}
pQueue
->
head
=
0
;
pQueue
->
tail
=
0
;
pQueue
->
total
=
bufSize
;
pQueue
->
avail
=
bufSize
;
pQueue
->
items
=
0
;
pQueue
->
pBuffer
=
(
char
*
)
pQueue
+
headSize
;
return
pQueue
;
}
static void taosProcCleanupQueue(SProcQueue *pQueue) {
if (pQueue != NULL) {
uDebug
(
"proc:%s, queue:%p clean up"
,
pQueue
->
name
,
pQueue
);
tsem_destroy
(
&
pQueue
->
sem
);
taosProcDestroyMutex
(
pQueue
->
mutex
,
pQueue
->
mutexShmid
);
taosProcDestroyBuffer
(
pQueue
,
pQueue
->
bufferShmid
);
taosProcDestroyMutex(pQueue);
taosProcDestroySem(pQueue);
}
}
#endif
static
int32_t
taosProcQueuePush
(
SProcQueue
*
pQueue
,
const
char
*
pHead
,
int16_t
rawHeadLen
,
const
char
*
pBody
,
int32_t
rawBodyLen
,
ProcFuncType
ftype
)
{
...
...
@@ -209,9 +156,9 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t
const
int32_t
bodyLen
=
CEIL8
(
rawBodyLen
);
const
int32_t
fullLen
=
headLen
+
bodyLen
+
8
;
taosThreadMutexLock
(
pQueue
->
mutex
);
taosThreadMutexLock
(
&
pQueue
->
mutex
);
if
(
fullLen
>
pQueue
->
avail
)
{
taosThreadMutexUnlock
(
pQueue
->
mutex
);
taosThreadMutexUnlock
(
&
pQueue
->
mutex
);
terrno
=
TSDB_CODE_OUT_OF_SHM_MEM
;
return
-
1
;
}
...
...
@@ -260,7 +207,7 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t
pQueue
->
avail
-=
fullLen
;
pQueue
->
items
++
;
taosThreadMutexUnlock
(
pQueue
->
mutex
);
taosThreadMutexUnlock
(
&
pQueue
->
mutex
);
tsem_post
(
&
pQueue
->
sem
);
uTrace
(
"proc:%s, push msg at pos:%d ftype:%d remain:%d, head:%d %p body:%d %p"
,
pQueue
->
name
,
pos
,
ftype
,
...
...
@@ -268,13 +215,14 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t
return
0
;
}
static
int32_t
taosProcQueuePop
(
SProcQueue
*
pQueue
,
void
**
ppHead
,
int16_t
*
pHeadLen
,
void
**
ppBody
,
int32_t
*
pBodyLen
,
ProcFuncType
*
pFuncType
)
{
static
int32_t
taosProcQueuePop
(
SProcQueue
*
pQueue
,
void
**
ppHead
,
int16_t
*
pHeadLen
,
void
**
ppBody
,
int32_t
*
pBodyLen
,
ProcFuncType
*
pFuncType
,
ProcMallocFp
mallocHeadFp
,
ProcFreeFp
freeHeadFp
,
ProcMallocFp
mallocBodyFp
,
ProcFreeFp
freeBodyFp
)
{
tsem_wait
(
&
pQueue
->
sem
);
taosThreadMutexLock
(
pQueue
->
mutex
);
taosThreadMutexLock
(
&
pQueue
->
mutex
);
if
(
pQueue
->
total
-
pQueue
->
avail
<=
0
)
{
taosThreadMutexUnlock
(
pQueue
->
mutex
);
taosThreadMutexUnlock
(
&
pQueue
->
mutex
);
tsem_post
(
&
pQueue
->
sem
);
terrno
=
TSDB_CODE_OUT_OF_SHM_MEM
;
return
0
;
...
...
@@ -293,13 +241,13 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea
bodyLen
=
*
(
int32_t
*
)(
pQueue
->
pBuffer
+
4
);
}
void
*
pHead
=
(
*
pQueue
->
mallocHeadFp
)(
headLen
);
void
*
pBody
=
(
*
pQueue
->
mallocBodyFp
)(
bodyLen
);
void
*
pHead
=
(
*
mallocHeadFp
)(
headLen
);
void
*
pBody
=
(
*
mallocBodyFp
)(
bodyLen
);
if
(
pHead
==
NULL
||
pBody
==
NULL
)
{
taosThreadMutexUnlock
(
pQueue
->
mutex
);
taosThreadMutexUnlock
(
&
pQueue
->
mutex
);
tsem_post
(
&
pQueue
->
sem
);
(
*
pQueue
->
freeHeadFp
)(
pHead
);
(
*
pQueue
->
freeBodyFp
)(
pBody
);
(
*
freeHeadFp
)(
pHead
);
(
*
freeBodyFp
)(
pBody
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
...
...
@@ -338,7 +286,7 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea
pQueue
->
avail
=
pQueue
->
avail
+
headLen
+
bodyLen
+
8
;
pQueue
->
items
--
;
taosThreadMutexUnlock
(
pQueue
->
mutex
);
taosThreadMutexUnlock
(
&
pQueue
->
mutex
);
*
ppHead
=
pHead
;
*
ppBody
=
pBody
;
...
...
@@ -358,65 +306,85 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) {
return
NULL
;
}
int32_t
cstart
=
0
;
int32_t
csize
=
CEIL8
(
pCfg
->
shm
.
size
/
2
);
int32_t
pstart
=
csize
;
int32_t
psize
=
CEIL8
(
pCfg
->
shm
.
size
-
pstart
);
if
(
pstart
+
psize
>
pCfg
->
shm
.
size
)
{
psize
-=
8
;
}
pProc
->
name
=
pCfg
->
name
;
pProc
->
pChildQueue
=
taosProcInitQueue
(
pCfg
->
shm
.
size
/
2
);
pProc
->
pParentQueue
=
taosProcInitQueue
(
pCfg
->
shm
.
size
/
2
);
pProc
->
pChildQueue
=
taosProcInitQueue
(
pCfg
->
name
,
pCfg
->
isChild
,
(
char
*
)
pCfg
->
shm
.
ptr
+
cstart
,
csize
);
pProc
->
pParentQueue
=
taosProcInitQueue
(
pCfg
->
name
,
pCfg
->
isChild
,
(
char
*
)
pCfg
->
shm
.
ptr
+
pstart
,
psize
);
if
(
pProc
->
pChildQueue
==
NULL
||
pProc
->
pParentQueue
==
NULL
)
{
taosProcCleanupQueue
(
pProc
->
pChildQueue
);
taosMemoryFree
(
pProc
);
return
NULL
;
}
pProc
->
pChildQueue
->
name
=
pCfg
->
name
;
pProc
->
pChildQueue
->
pParent
=
pCfg
->
pParent
;
pProc
->
pChildQueue
->
mallocHeadFp
=
pCfg
->
childMallocHeadFp
;
pProc
->
pChildQueue
->
freeHeadFp
=
pCfg
->
childFreeHeadFp
;
pProc
->
pChildQueue
->
mallocBodyFp
=
pCfg
->
childMallocBodyFp
;
pProc
->
pChildQueue
->
freeBodyFp
=
pCfg
->
childFreeBodyFp
;
pProc
->
pChildQueue
->
consumeFp
=
pCfg
->
childConsumeFp
;
pProc
->
pParentQueue
->
name
=
pCfg
->
name
;
pProc
->
pParentQueue
->
pParent
=
pCfg
->
pParent
;
pProc
->
pParentQueue
->
mallocHeadFp
=
pCfg
->
parentdMallocHeadFp
;
pProc
->
pParentQueue
->
freeHeadFp
=
pCfg
->
parentFreeHeadFp
;
pProc
->
pParentQueue
->
mallocBodyFp
=
pCfg
->
parentMallocBodyFp
;
pProc
->
pParentQueue
->
freeBodyFp
=
pCfg
->
parentFreeBodyFp
;
pProc
->
pParentQueue
->
consumeFp
=
pCfg
->
parentConsumeFp
;
uDebug
(
"proc:%s, is initialized, child queue:%p parent queue:%p"
,
pProc
->
name
,
pProc
->
pChildQueue
,
pProc
->
pParentQueue
);
pProc
->
pid
=
fork
();
if
(
pProc
->
pid
==
0
)
{
pProc
->
isChild
=
1
;
prctl
(
PR_SET_NAME
,
pProc
->
name
,
NULL
,
NULL
,
NULL
);
}
else
{
pProc
->
isChild
=
0
;
uInfo
(
"this is parent process, child pid:%d"
,
pProc
->
pid
);
}
pProc
->
name
=
pCfg
->
name
;
pProc
->
pParent
=
pCfg
->
pParent
;
pProc
->
childMallocHeadFp
=
pCfg
->
childMallocHeadFp
;
pProc
->
childFreeHeadFp
=
pCfg
->
childFreeHeadFp
;
pProc
->
childMallocBodyFp
=
pCfg
->
childMallocBodyFp
;
pProc
->
childFreeBodyFp
=
pCfg
->
childFreeBodyFp
;
pProc
->
childConsumeFp
=
pCfg
->
childConsumeFp
;
pProc
->
parentMallocHeadFp
=
pCfg
->
parentMallocHeadFp
;
pProc
->
parentFreeHeadFp
=
pCfg
->
parentFreeHeadFp
;
pProc
->
parentMallocBodyFp
=
pCfg
->
parentMallocBodyFp
;
pProc
->
parentFreeBodyFp
=
pCfg
->
parentFreeBodyFp
;
pProc
->
parentConsumeFp
=
pCfg
->
parentConsumeFp
;
pProc
->
isChild
=
pCfg
->
isChild
;
uDebug
(
"proc:%s, is initialized, child:%d child queue:%p parent queue:%p"
,
pProc
->
name
,
pProc
->
isChild
,
pProc
->
pChildQueue
,
pProc
->
pParentQueue
);
return
pProc
;
}
static
void
taosProcThreadLoop
(
SProcQueue
*
pQueue
)
{
ProcConsumeFp
consumeFp
=
pQueue
->
consumeFp
;
void
*
pParent
=
pQueue
->
pParent
;
static
void
taosProcThreadLoop
(
SProcObj
*
pProc
)
{
void
*
pHead
,
*
pBody
;
int16_t
headLen
;
ProcFuncType
ftype
;
int32_t
bodyLen
;
SProcQueue
*
pQueue
;
ProcConsumeFp
consumeFp
;
ProcMallocFp
mallocHeadFp
;
ProcFreeFp
freeHeadFp
;
ProcMallocFp
mallocBodyFp
;
ProcFreeFp
freeBodyFp
;
uDebug
(
"proc:%s, start to get msg from queue:%p"
,
pQueue
->
name
,
pQueue
);
if
(
pProc
->
isChild
)
{
pQueue
=
pProc
->
pChildQueue
;
consumeFp
=
pProc
->
childConsumeFp
;
mallocHeadFp
=
pProc
->
childMallocHeadFp
;
freeHeadFp
=
pProc
->
childFreeHeadFp
;
mallocBodyFp
=
pProc
->
childMallocBodyFp
;
freeBodyFp
=
pProc
->
childFreeBodyFp
;
}
else
{
pQueue
=
pProc
->
pParentQueue
;
consumeFp
=
pProc
->
parentConsumeFp
;
mallocHeadFp
=
pProc
->
parentMallocHeadFp
;
freeHeadFp
=
pProc
->
parentFreeHeadFp
;
mallocBodyFp
=
pProc
->
parentMallocBodyFp
;
freeBodyFp
=
pProc
->
parentFreeBodyFp
;
}
uDebug
(
"proc:%s, start to get msg from queue:%p"
,
pProc
->
name
,
pQueue
);
while
(
1
)
{
int32_t
numOfMsgs
=
taosProcQueuePop
(
pQueue
,
&
pHead
,
&
headLen
,
&
pBody
,
&
bodyLen
,
&
ftype
);
int32_t
numOfMsgs
=
taosProcQueuePop
(
pQueue
,
&
pHead
,
&
headLen
,
&
pBody
,
&
bodyLen
,
&
ftype
,
mallocHeadFp
,
freeHeadFp
,
mallocBodyFp
,
freeBodyFp
);
if
(
numOfMsgs
==
0
)
{
u
Debug
(
"proc:%s, get no msg from queue:%p and exit the proc thread"
,
pQueue
->
name
,
pQueue
);
u
Info
(
"proc:%s, get no msg from queue:%p and exit the proc thread"
,
pProc
->
name
,
pQueue
);
break
;
}
else
if
(
numOfMsgs
<
0
)
{
uTrace
(
"proc:%s, get no msg from queue:%p since %s"
,
p
Queue
->
name
,
pQueue
,
terrstr
());
uTrace
(
"proc:%s, get no msg from queue:%p since %s"
,
p
Proc
->
name
,
pQueue
,
terrstr
());
taosMsleep
(
1
);
continue
;
}
else
{
(
*
consumeFp
)(
pParent
,
pHead
,
headLen
,
pBody
,
bodyLen
,
ftype
);
(
*
consumeFp
)(
pP
roc
->
pP
arent
,
pHead
,
headLen
,
pBody
,
bodyLen
,
ftype
);
}
}
}
...
...
@@ -426,33 +394,29 @@ int32_t taosProcRun(SProcObj *pProc) {
taosThreadAttrInit
(
&
thAttr
);
taosThreadAttrSetDetachState
(
&
thAttr
,
PTHREAD_CREATE_JOINABLE
);
if
(
pProc
->
isChild
)
{
if
(
taosThreadCreate
(
&
pProc
->
childThread
,
&
thAttr
,
(
ProcThreadFp
)
taosProcThreadLoop
,
pProc
->
pChildQueue
)
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
uError
(
"failed to create thread since %s"
,
terrstr
());
return
-
1
;
}
uDebug
(
"proc:%s, child start to consume queue:%p"
,
pProc
->
name
,
pProc
->
pChildQueue
);
}
else
{
if
(
taosThreadCreate
(
&
pProc
->
parentThread
,
&
thAttr
,
(
ProcThreadFp
)
taosProcThreadLoop
,
pProc
->
pParentQueue
)
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
uError
(
"failed to create thread since %s"
,
terrstr
());
return
-
1
;
}
uDebug
(
"proc:%s, parent start to consume queue:%p"
,
pProc
->
name
,
pProc
->
pParentQueue
);
if
(
taosThreadCreate
(
&
pProc
->
thread
,
&
thAttr
,
(
ProcThreadFp
)
taosProcThreadLoop
,
pProc
)
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
uError
(
"failed to create thread since %s"
,
terrstr
());
return
-
1
;
}
uDebug
(
"proc:%s, start to consume queue:%p"
,
pProc
->
name
,
pProc
->
pChildQueue
);
return
0
;
}
void
taosProcStop
(
SProcObj
*
pProc
)
{
pProc
->
stopFlag
=
true
;
// todo join
}
bool
taosProcIsChild
(
SProcObj
*
pProc
)
{
return
pProc
->
isChild
;
}
static
void
taosProcStop
(
SProcObj
*
pProc
)
{
if
(
!
taosCheckPthreadValid
(
pProc
->
thread
))
return
;
int32_t
taosProcChildId
(
SProcObj
*
pProc
)
{
return
pProc
->
pid
;
}
uDebug
(
"proc:%s, start to join thread"
,
pProc
->
name
);
SProcQueue
*
pQueue
;
if
(
pProc
->
isChild
)
{
pQueue
=
pProc
->
pParentQueue
;
}
else
{
pQueue
=
pProc
->
pChildQueue
;
}
tsem_post
(
&
pQueue
->
sem
);
taosThreadJoin
(
pProc
->
thread
,
NULL
);
}
void
taosProcCleanup
(
SProcObj
*
pProc
)
{
if
(
pProc
!=
NULL
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录