Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d60941d4
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d60941d4
编写于
12月 28, 2021
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add bnode
上级
6f7eecb2
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
778 addition
and
31 deletion
+778
-31
include/dnode/bnode/bnode.h
include/dnode/bnode/bnode.h
+9
-1
include/dnode/snode/snode.h
include/dnode/snode/snode.h
+9
-1
source/dnode/bnode/src/bnode.c
source/dnode/bnode/src/bnode.c
+3
-1
source/dnode/mgmt/impl/inc/dndBnode.h
source/dnode/mgmt/impl/inc/dndBnode.h
+1
-1
source/dnode/mgmt/impl/inc/dndInt.h
source/dnode/mgmt/impl/inc/dndInt.h
+11
-14
source/dnode/mgmt/impl/inc/dndWorker.h
source/dnode/mgmt/impl/inc/dndWorker.h
+2
-2
source/dnode/mgmt/impl/src/dndBnode.c
source/dnode/mgmt/impl/src/dndBnode.c
+369
-0
source/dnode/mgmt/impl/src/dndQnode.c
source/dnode/mgmt/impl/src/dndQnode.c
+0
-1
source/dnode/mgmt/impl/src/dndSnode.c
source/dnode/mgmt/impl/src/dndSnode.c
+344
-0
source/dnode/mgmt/impl/src/dndWorker.c
source/dnode/mgmt/impl/src/dndWorker.c
+27
-9
source/dnode/snode/src/snode.c
source/dnode/snode/src/snode.c
+3
-1
未找到文件。
include/dnode/bnode/bnode.h
浏览文件 @
d60941d4
...
...
@@ -49,10 +49,11 @@ typedef struct {
/**
* @brief Start one Bnode in Dnode.
*
* @param path Path of the bnode.
* @param pOption Option of the bnode.
* @return SBnode* The bnode object.
*/
SBnode
*
bndOpen
(
const
SBnodeOpt
*
pOption
);
SBnode
*
bndOpen
(
const
char
*
path
,
const
SBnodeOpt
*
pOption
);
/**
* @brief Stop Bnode in Dnode.
...
...
@@ -79,6 +80,13 @@ int32_t bndGetLoad(SBnode *pBnode, SBnodeLoad *pLoad);
*/
int32_t
bndProcessWMsgs
(
SBnode
*
pBnode
,
SArray
*
pMsgs
);
/**
* @brief Drop a bnode.
*
* @param path Path of the bnode.
*/
void
bndDestroy
(
const
char
*
path
);
#ifdef __cplusplus
}
#endif
...
...
include/dnode/snode/snode.h
浏览文件 @
d60941d4
...
...
@@ -49,10 +49,11 @@ typedef struct {
/**
* @brief Start one Snode in Dnode.
*
* @param path Path of the snode.
* @param pOption Option of the snode.
* @return SSnode* The snode object.
*/
SSnode
*
sndOpen
(
const
SSnodeOpt
*
pOption
);
SSnode
*
sndOpen
(
const
char
*
path
,
const
SSnodeOpt
*
pOption
);
/**
* @brief Stop Snode in Dnode.
...
...
@@ -80,6 +81,13 @@ int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad);
*/
int32_t
sndProcessWriteMsg
(
SSnode
*
pSnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
);
/**
* @brief Drop a snode.
*
* @param path Path of the snode.
*/
void
sndDestroy
(
const
char
*
path
);
#ifdef __cplusplus
}
#endif
...
...
source/dnode/bnode/src/bnode.c
浏览文件 @
d60941d4
...
...
@@ -15,7 +15,7 @@
#include "bndInt.h"
SBnode
*
bndOpen
(
const
SBnodeOpt
*
pOption
)
{
SBnode
*
bndOpen
(
const
char
*
path
,
const
SBnodeOpt
*
pOption
)
{
SBnode
*
pBnode
=
calloc
(
1
,
sizeof
(
SBnode
));
return
pBnode
;
}
...
...
@@ -25,3 +25,5 @@ void bndClose(SBnode *pBnode) { free(pBnode); }
int32_t
bndGetLoad
(
SBnode
*
pBnode
,
SBnodeLoad
*
pLoad
)
{
return
0
;
}
int32_t
bndProcessWMsgs
(
SBnode
*
pBnode
,
SArray
*
pMsgs
)
{
return
0
;
}
void
bndDestroy
(
const
char
*
path
)
{}
source/dnode/mgmt/impl/inc/dndBnode.h
浏览文件 @
d60941d4
...
...
@@ -24,7 +24,7 @@ extern "C" {
int32_t
dndInitBnode
(
SDnode
*
pDnode
);
void
dndCleanupBnode
(
SDnode
*
pDnode
);
i
oid
dndProcessBnodeWriteMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
v
oid
dndProcessBnodeWriteMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
int32_t
dndProcessCreateBnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
);
int32_t
dndProcessDropBnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
);
...
...
source/dnode/mgmt/impl/inc/dndInt.h
浏览文件 @
d60941d4
...
...
@@ -54,18 +54,19 @@ extern int32_t dDebugFlag;
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }}
typedef
enum
{
DND_STAT_INIT
,
DND_STAT_RUNNING
,
DND_STAT_STOPPED
}
EStat
;
typedef
enum
{
DND_WORKER_SINGLE
,
DND_WORKER_MULTI
}
E
Dnd
WorkerType
;
typedef
enum
{
DND_WORKER_SINGLE
,
DND_WORKER_MULTI
}
EWorkerType
;
typedef
void
(
*
DndMsgFp
)(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEps
);
typedef
struct
{
E
DndWorkerType
type
;
E
WorkerType
type
;
const
char
*
name
;
int32_t
minNum
;
int32_t
maxNum
;
FProcessItem
f
p
;
void
*
queueF
p
;
SDnode
*
pDnode
;
taos_queue
queue
;
SWorkerPool
pool
;
SMWorkerPool
mpool
;
}
SDnodeWorker
;
typedef
struct
{
...
...
@@ -122,25 +123,21 @@ typedef struct {
}
SQnodeMgmt
;
typedef
struct
{
int32_t
refCount
;
int8_t
deployed
;
int8_t
dropped
;
char
*
file
;
SSnode
*
pSnode
;
SRWLatch
latch
;
taos_queue
pWriteQ
;
SWorkerPool
writePool
;
int32_t
refCount
;
int8_t
deployed
;
int8_t
dropped
;
SSnode
*
pSnode
;
SRWLatch
latch
;
SDnodeWorker
writeWorker
;
}
SSnodeMgmt
;
typedef
struct
{
int32_t
refCount
;
int8_t
deployed
;
int8_t
dropped
;
char
*
file
;
SBnode
*
pBnode
;
SRWLatch
latch
;
taos_queue
pWriteQ
;
SMWorkerPool
writePool
;
SDnodeWorker
writeWorker
;
}
SBnodeMgmt
;
typedef
struct
{
...
...
source/dnode/mgmt/impl/inc/dndWorker.h
浏览文件 @
d60941d4
...
...
@@ -21,8 +21,8 @@ extern "C" {
#endif
#include "dndInt.h"
int32_t
dndInitWorker
(
SDnode
*
pDnode
,
SDnodeWorker
*
pWorker
,
E
Dnd
WorkerType
type
,
const
char
*
name
,
int32_t
minNum
,
int32_t
maxNum
,
FProcessItem
f
p
);
int32_t
dndInitWorker
(
SDnode
*
pDnode
,
SDnodeWorker
*
pWorker
,
EWorkerType
type
,
const
char
*
name
,
int32_t
minNum
,
int32_t
maxNum
,
void
*
queueF
p
);
void
dndCleanupWorker
(
SDnodeWorker
*
pWorker
);
int32_t
dndWriteMsgToWorker
(
SDnodeWorker
*
pWorker
,
void
*
pCont
,
int32_t
contLen
);
...
...
source/dnode/mgmt/impl/src/dndBnode.c
0 → 100644
浏览文件 @
d60941d4
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http:www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dndBnode.h"
#include "dndDnode.h"
#include "dndTransport.h"
#include "dndWorker.h"
static
void
dndProcessBnodeQueue
(
SDnode
*
pDnode
,
taos_qall
qall
,
int32_t
numOfMsgs
);
static
SBnode
*
dndAcquireBnode
(
SDnode
*
pDnode
)
{
SBnodeMgmt
*
pMgmt
=
&
pDnode
->
bmgmt
;
SBnode
*
pBnode
=
NULL
;
int32_t
refCount
=
0
;
taosRLockLatch
(
&
pMgmt
->
latch
);
if
(
pMgmt
->
deployed
&&
!
pMgmt
->
dropped
)
{
refCount
=
atomic_add_fetch_32
(
&
pMgmt
->
refCount
,
1
);
pBnode
=
pMgmt
->
pBnode
;
}
else
{
terrno
=
TSDB_CODE_DND_BNODE_NOT_DEPLOYED
;
}
taosRUnLockLatch
(
&
pMgmt
->
latch
);
if
(
pBnode
!=
NULL
)
{
dTrace
(
"acquire bnode, refCount:%d"
,
refCount
);
}
return
pBnode
;
}
static
void
dndReleaseBnode
(
SDnode
*
pDnode
,
SBnode
*
pBnode
)
{
SBnodeMgmt
*
pMgmt
=
&
pDnode
->
bmgmt
;
int32_t
refCount
=
0
;
taosRLockLatch
(
&
pMgmt
->
latch
);
if
(
pBnode
!=
NULL
)
{
refCount
=
atomic_sub_fetch_32
(
&
pMgmt
->
refCount
,
1
);
}
taosRUnLockLatch
(
&
pMgmt
->
latch
);
if
(
pBnode
!=
NULL
)
{
dTrace
(
"release bnode, refCount:%d"
,
refCount
);
}
}
static
int32_t
dndReadBnodeFile
(
SDnode
*
pDnode
)
{
SBnodeMgmt
*
pMgmt
=
&
pDnode
->
bmgmt
;
int32_t
code
=
TSDB_CODE_DND_BNODE_READ_FILE_ERROR
;
int32_t
len
=
0
;
int32_t
maxLen
=
4096
;
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
cJSON
*
root
=
NULL
;
char
file
[
PATH_MAX
+
20
];
snprintf
(
file
,
PATH_MAX
+
20
,
"%s/bnode.json"
,
pDnode
->
dir
.
dnode
);
FILE
*
fp
=
fopen
(
file
,
"r"
);
if
(
fp
==
NULL
)
{
dDebug
(
"file %s not exist"
,
file
);
code
=
0
;
goto
PRASE_BNODE_OVER
;
}
len
=
(
int32_t
)
fread
(
content
,
1
,
maxLen
,
fp
);
if
(
len
<=
0
)
{
dError
(
"failed to read %s since content is null"
,
file
);
goto
PRASE_BNODE_OVER
;
}
content
[
len
]
=
0
;
root
=
cJSON_Parse
(
content
);
if
(
root
==
NULL
)
{
dError
(
"failed to read %s since invalid json format"
,
file
);
goto
PRASE_BNODE_OVER
;
}
cJSON
*
deployed
=
cJSON_GetObjectItem
(
root
,
"deployed"
);
if
(
!
deployed
||
deployed
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since deployed not found"
,
file
);
goto
PRASE_BNODE_OVER
;
}
pMgmt
->
deployed
=
deployed
->
valueint
;
cJSON
*
dropped
=
cJSON_GetObjectItem
(
root
,
"dropped"
);
if
(
!
dropped
||
dropped
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since dropped not found"
,
file
);
goto
PRASE_BNODE_OVER
;
}
pMgmt
->
dropped
=
dropped
->
valueint
;
code
=
0
;
dDebug
(
"succcessed to read file %s, deployed:%d dropped:%d"
,
file
,
pMgmt
->
deployed
,
pMgmt
->
dropped
);
PRASE_BNODE_OVER:
if
(
content
!=
NULL
)
free
(
content
);
if
(
root
!=
NULL
)
cJSON_Delete
(
root
);
if
(
fp
!=
NULL
)
fclose
(
fp
);
terrno
=
code
;
return
code
;
}
static
int32_t
dndWriteBnodeFile
(
SDnode
*
pDnode
)
{
SBnodeMgmt
*
pMgmt
=
&
pDnode
->
bmgmt
;
char
file
[
PATH_MAX
+
20
];
snprintf
(
file
,
PATH_MAX
+
20
,
"%s/bnode.json"
,
pDnode
->
dir
.
dnode
);
FILE
*
fp
=
fopen
(
file
,
"w"
);
if
(
fp
==
NULL
)
{
terrno
=
TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR
;
dError
(
"failed to write %s since %s"
,
file
,
terrstr
());
return
-
1
;
}
int32_t
len
=
0
;
int32_t
maxLen
=
4096
;
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"{
\n
"
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
deployed
\"
: %d,
\n
"
,
pMgmt
->
deployed
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dropped
\"
: %d
\n
"
,
pMgmt
->
dropped
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"}
\n
"
);
fwrite
(
content
,
1
,
len
,
fp
);
taosFsyncFile
(
fileno
(
fp
));
fclose
(
fp
);
free
(
content
);
if
(
taosRenameFile
(
file
,
file
)
!=
0
)
{
terrno
=
TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR
;
dError
(
"failed to rename %s since %s"
,
file
,
terrstr
());
return
-
1
;
}
dInfo
(
"successed to write %s, deployed:%d dropped:%d"
,
file
,
pMgmt
->
deployed
,
pMgmt
->
dropped
);
return
0
;
}
static
int32_t
dndStartBnodeWorker
(
SDnode
*
pDnode
)
{
SBnodeMgmt
*
pMgmt
=
&
pDnode
->
bmgmt
;
if
(
dndInitWorker
(
pDnode
,
&
pMgmt
->
writeWorker
,
DND_WORKER_SINGLE
,
"bnode-write"
,
0
,
1
,
(
FProcessItem
)
dndProcessBnodeQueue
)
!=
0
)
{
dError
(
"failed to start bnode write worker since %s"
,
terrstr
());
return
-
1
;
}
return
0
;
}
static
void
dndStopBnodeWorker
(
SDnode
*
pDnode
)
{
SBnodeMgmt
*
pMgmt
=
&
pDnode
->
bmgmt
;
taosWLockLatch
(
&
pMgmt
->
latch
);
pMgmt
->
deployed
=
0
;
taosWUnLockLatch
(
&
pMgmt
->
latch
);
while
(
pMgmt
->
refCount
>
1
)
{
taosMsleep
(
10
);
}
dndCleanupWorker
(
&
pMgmt
->
writeWorker
);
}
static
void
dndBuildBnodeOption
(
SDnode
*
pDnode
,
SBnodeOpt
*
pOption
)
{
pOption
->
pDnode
=
pDnode
;
pOption
->
sendMsgToDnodeFp
=
dndSendMsgToDnode
;
pOption
->
sendMsgToMnodeFp
=
dndSendMsgToMnode
;
pOption
->
sendRedirectMsgFp
=
dndSendRedirectMsg
;
pOption
->
dnodeId
=
dndGetDnodeId
(
pDnode
);
pOption
->
clusterId
=
dndGetClusterId
(
pDnode
);
pOption
->
cfg
.
sver
=
pDnode
->
opt
.
sver
;
}
static
int32_t
dndOpenBnode
(
SDnode
*
pDnode
)
{
SBnodeMgmt
*
pMgmt
=
&
pDnode
->
bmgmt
;
SBnodeOpt
option
=
{
0
};
dndBuildBnodeOption
(
pDnode
,
&
option
);
SBnode
*
pBnode
=
bndOpen
(
pDnode
->
dir
.
bnode
,
&
option
);
if
(
pBnode
==
NULL
)
{
dError
(
"failed to open bnode since %s"
,
terrstr
());
return
-
1
;
}
if
(
dndStartBnodeWorker
(
pDnode
)
!=
0
)
{
dError
(
"failed to start bnode worker since %s"
,
terrstr
());
bndClose
(
pBnode
);
return
-
1
;
}
if
(
dndWriteBnodeFile
(
pDnode
)
!=
0
)
{
dError
(
"failed to write bnode file since %s"
,
terrstr
());
dndStopBnodeWorker
(
pDnode
);
bndClose
(
pBnode
);
return
-
1
;
}
taosWLockLatch
(
&
pMgmt
->
latch
);
pMgmt
->
pBnode
=
pBnode
;
pMgmt
->
deployed
=
1
;
taosWUnLockLatch
(
&
pMgmt
->
latch
);
dInfo
(
"bnode open successfully"
);
return
0
;
}
static
int32_t
dndDropBnode
(
SDnode
*
pDnode
)
{
SBnodeMgmt
*
pMgmt
=
&
pDnode
->
bmgmt
;
SBnode
*
pBnode
=
dndAcquireBnode
(
pDnode
);
if
(
pBnode
==
NULL
)
{
dError
(
"failed to drop bnode since %s"
,
terrstr
());
return
-
1
;
}
taosRLockLatch
(
&
pMgmt
->
latch
);
pMgmt
->
dropped
=
1
;
taosRUnLockLatch
(
&
pMgmt
->
latch
);
if
(
dndWriteBnodeFile
(
pDnode
)
!=
0
)
{
taosRLockLatch
(
&
pMgmt
->
latch
);
pMgmt
->
dropped
=
0
;
taosRUnLockLatch
(
&
pMgmt
->
latch
);
dndReleaseBnode
(
pDnode
,
pBnode
);
dError
(
"failed to drop bnode since %s"
,
terrstr
());
return
-
1
;
}
dndReleaseBnode
(
pDnode
,
pBnode
);
dndStopBnodeWorker
(
pDnode
);
bndClose
(
pBnode
);
pMgmt
->
pBnode
=
NULL
;
bndDestroy
(
pDnode
->
dir
.
bnode
);
return
0
;
}
int32_t
dndProcessCreateBnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
)
{
SCreateBnodeInMsg
*
pMsg
=
pRpcMsg
->
pCont
;
pMsg
->
dnodeId
=
htonl
(
pMsg
->
dnodeId
);
if
(
pMsg
->
dnodeId
!=
dndGetDnodeId
(
pDnode
))
{
terrno
=
TSDB_CODE_DND_BNODE_ID_INVALID
;
return
-
1
;
}
else
{
return
dndOpenBnode
(
pDnode
);
}
}
int32_t
dndProcessDropBnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
)
{
SDropBnodeInMsg
*
pMsg
=
pRpcMsg
->
pCont
;
pMsg
->
dnodeId
=
htonl
(
pMsg
->
dnodeId
);
if
(
pMsg
->
dnodeId
!=
dndGetDnodeId
(
pDnode
))
{
terrno
=
TSDB_CODE_DND_BNODE_ID_INVALID
;
return
-
1
;
}
else
{
return
dndDropBnode
(
pDnode
);
}
}
static
void
dndSendBnodeErrorRsp
(
SRpcMsg
*
pMsg
,
int32_t
code
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
code
=
code
};
rpcSendResponse
(
&
rpcRsp
);
rpcFreeCont
(
pMsg
->
pCont
);
taosFreeQitem
(
pMsg
);
}
static
void
dndSendBnodeErrorRsps
(
taos_qall
qall
,
int32_t
numOfMsgs
,
int32_t
code
)
{
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
SRpcMsg
*
pMsg
=
NULL
;
taosGetQitem
(
qall
,
(
void
**
)
&
pMsg
);
dndSendBnodeErrorRsp
(
pMsg
,
code
);
}
}
static
void
dndProcessBnodeQueue
(
SDnode
*
pDnode
,
taos_qall
qall
,
int32_t
numOfMsgs
)
{
SBnode
*
pBnode
=
dndAcquireBnode
(
pDnode
);
if
(
pBnode
==
NULL
)
{
dndSendBnodeErrorRsps
(
qall
,
numOfMsgs
,
TSDB_CODE_OUT_OF_MEMORY
);
return
;
}
SArray
*
pArray
=
taosArrayInit
(
numOfMsgs
,
sizeof
(
SRpcMsg
*
));
if
(
pArray
==
NULL
)
{
dndReleaseBnode
(
pDnode
,
pBnode
);
dndSendBnodeErrorRsps
(
qall
,
numOfMsgs
,
TSDB_CODE_OUT_OF_MEMORY
);
return
;
}
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
SRpcMsg
*
pMsg
=
NULL
;
taosGetQitem
(
qall
,
(
void
**
)
&
pMsg
);
void
*
ptr
=
taosArrayPush
(
pArray
,
&
pMsg
);
if
(
ptr
==
NULL
)
{
dndSendBnodeErrorRsp
(
pMsg
,
TSDB_CODE_OUT_OF_MEMORY
);
}
}
bndProcessWMsgs
(
pBnode
,
pArray
);
for
(
size_t
i
=
0
;
i
<
numOfMsgs
;
i
++
)
{
SRpcMsg
*
pMsg
=
*
(
SRpcMsg
**
)
taosArrayGet
(
pArray
,
i
);
rpcFreeCont
(
pMsg
->
pCont
);
taosFreeQitem
(
pMsg
);
}
taosArrayDestroy
(
pArray
);
dndReleaseBnode
(
pDnode
,
pBnode
);
}
static
void
dndWriteBnodeMsgToWorker
(
SDnode
*
pDnode
,
SDnodeWorker
*
pWorker
,
SRpcMsg
*
pMsg
)
{
int32_t
code
=
TSDB_CODE_DND_BNODE_NOT_DEPLOYED
;
SBnode
*
pBnode
=
dndAcquireBnode
(
pDnode
);
if
(
pBnode
!=
NULL
)
{
code
=
dndWriteMsgToWorker
(
pWorker
,
pMsg
,
sizeof
(
SRpcMsg
));
}
dndReleaseBnode
(
pDnode
,
pBnode
);
if
(
code
!=
0
)
{
if
(
pMsg
->
msgType
&
1u
)
{
SRpcMsg
rsp
=
{.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
code
=
code
};
rpcSendResponse
(
&
rsp
);
}
rpcFreeCont
(
pMsg
->
pCont
);
}
}
void
dndProcessBnodeWriteMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
dndWriteBnodeMsgToWorker
(
pDnode
,
&
pDnode
->
bmgmt
.
writeWorker
,
pMsg
);
}
int32_t
dndInitBnode
(
SDnode
*
pDnode
)
{
SBnodeMgmt
*
pMgmt
=
&
pDnode
->
bmgmt
;
taosInitRWLatch
(
&
pMgmt
->
latch
);
if
(
dndReadBnodeFile
(
pDnode
)
!=
0
)
{
return
-
1
;
}
if
(
pMgmt
->
dropped
)
return
0
;
if
(
!
pMgmt
->
deployed
)
return
0
;
return
dndOpenBnode
(
pDnode
);
}
void
dndCleanupBnode
(
SDnode
*
pDnode
)
{
SBnodeMgmt
*
pMgmt
=
&
pDnode
->
bmgmt
;
if
(
pMgmt
->
pBnode
)
{
dndStopBnodeWorker
(
pDnode
);
bndClose
(
pMgmt
->
pBnode
);
pMgmt
->
pBnode
=
NULL
;
}
}
source/dnode/mgmt/impl/src/dndQnode.c
浏览文件 @
d60941d4
...
...
@@ -252,7 +252,6 @@ static int32_t dndDropQnode(SDnode *pDnode) {
dndStopQnodeWorker
(
pDnode
);
qndClose
(
pQnode
);
pMgmt
->
pQnode
=
NULL
;
// qndDestroy(pDnode->dir.qnode);
return
0
;
}
...
...
source/dnode/mgmt/impl/src/dndSnode.c
0 → 100644
浏览文件 @
d60941d4
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http:www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dndSnode.h"
#include "dndDnode.h"
#include "dndTransport.h"
#include "dndWorker.h"
static
void
dndProcessSnodeQueue
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
);
static
SSnode
*
dndAcquireSnode
(
SDnode
*
pDnode
)
{
SSnodeMgmt
*
pMgmt
=
&
pDnode
->
smgmt
;
SSnode
*
pSnode
=
NULL
;
int32_t
refCount
=
0
;
taosRLockLatch
(
&
pMgmt
->
latch
);
if
(
pMgmt
->
deployed
&&
!
pMgmt
->
dropped
)
{
refCount
=
atomic_add_fetch_32
(
&
pMgmt
->
refCount
,
1
);
pSnode
=
pMgmt
->
pSnode
;
}
else
{
terrno
=
TSDB_CODE_DND_SNODE_NOT_DEPLOYED
;
}
taosRUnLockLatch
(
&
pMgmt
->
latch
);
if
(
pSnode
!=
NULL
)
{
dTrace
(
"acquire snode, refCount:%d"
,
refCount
);
}
return
pSnode
;
}
static
void
dndReleaseSnode
(
SDnode
*
pDnode
,
SSnode
*
pSnode
)
{
SSnodeMgmt
*
pMgmt
=
&
pDnode
->
smgmt
;
int32_t
refCount
=
0
;
taosRLockLatch
(
&
pMgmt
->
latch
);
if
(
pSnode
!=
NULL
)
{
refCount
=
atomic_sub_fetch_32
(
&
pMgmt
->
refCount
,
1
);
}
taosRUnLockLatch
(
&
pMgmt
->
latch
);
if
(
pSnode
!=
NULL
)
{
dTrace
(
"release snode, refCount:%d"
,
refCount
);
}
}
static
int32_t
dndReadSnodeFile
(
SDnode
*
pDnode
)
{
SSnodeMgmt
*
pMgmt
=
&
pDnode
->
smgmt
;
int32_t
code
=
TSDB_CODE_DND_SNODE_READ_FILE_ERROR
;
int32_t
len
=
0
;
int32_t
maxLen
=
4096
;
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
cJSON
*
root
=
NULL
;
char
file
[
PATH_MAX
+
20
];
snprintf
(
file
,
PATH_MAX
+
20
,
"%s/snode.json"
,
pDnode
->
dir
.
dnode
);
FILE
*
fp
=
fopen
(
file
,
"r"
);
if
(
fp
==
NULL
)
{
dDebug
(
"file %s not exist"
,
file
);
code
=
0
;
goto
PRASE_SNODE_OVER
;
}
len
=
(
int32_t
)
fread
(
content
,
1
,
maxLen
,
fp
);
if
(
len
<=
0
)
{
dError
(
"failed to read %s since content is null"
,
file
);
goto
PRASE_SNODE_OVER
;
}
content
[
len
]
=
0
;
root
=
cJSON_Parse
(
content
);
if
(
root
==
NULL
)
{
dError
(
"failed to read %s since invalid json format"
,
file
);
goto
PRASE_SNODE_OVER
;
}
cJSON
*
deployed
=
cJSON_GetObjectItem
(
root
,
"deployed"
);
if
(
!
deployed
||
deployed
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since deployed not found"
,
file
);
goto
PRASE_SNODE_OVER
;
}
pMgmt
->
deployed
=
deployed
->
valueint
;
cJSON
*
dropped
=
cJSON_GetObjectItem
(
root
,
"dropped"
);
if
(
!
dropped
||
dropped
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since dropped not found"
,
file
);
goto
PRASE_SNODE_OVER
;
}
pMgmt
->
dropped
=
dropped
->
valueint
;
code
=
0
;
dDebug
(
"succcessed to read file %s, deployed:%d dropped:%d"
,
file
,
pMgmt
->
deployed
,
pMgmt
->
dropped
);
PRASE_SNODE_OVER:
if
(
content
!=
NULL
)
free
(
content
);
if
(
root
!=
NULL
)
cJSON_Delete
(
root
);
if
(
fp
!=
NULL
)
fclose
(
fp
);
terrno
=
code
;
return
code
;
}
static
int32_t
dndWriteSnodeFile
(
SDnode
*
pDnode
)
{
SSnodeMgmt
*
pMgmt
=
&
pDnode
->
smgmt
;
char
file
[
PATH_MAX
+
20
];
snprintf
(
file
,
PATH_MAX
+
20
,
"%s/snode.json"
,
pDnode
->
dir
.
dnode
);
FILE
*
fp
=
fopen
(
file
,
"w"
);
if
(
fp
==
NULL
)
{
terrno
=
TSDB_CODE_DND_SNODE_WRITE_FILE_ERROR
;
dError
(
"failed to write %s since %s"
,
file
,
terrstr
());
return
-
1
;
}
int32_t
len
=
0
;
int32_t
maxLen
=
4096
;
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"{
\n
"
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
deployed
\"
: %d,
\n
"
,
pMgmt
->
deployed
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dropped
\"
: %d
\n
"
,
pMgmt
->
dropped
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"}
\n
"
);
fwrite
(
content
,
1
,
len
,
fp
);
taosFsyncFile
(
fileno
(
fp
));
fclose
(
fp
);
free
(
content
);
if
(
taosRenameFile
(
file
,
file
)
!=
0
)
{
terrno
=
TSDB_CODE_DND_SNODE_WRITE_FILE_ERROR
;
dError
(
"failed to rename %s since %s"
,
file
,
terrstr
());
return
-
1
;
}
dInfo
(
"successed to write %s, deployed:%d dropped:%d"
,
file
,
pMgmt
->
deployed
,
pMgmt
->
dropped
);
return
0
;
}
static
int32_t
dndStartSnodeWorker
(
SDnode
*
pDnode
)
{
SSnodeMgmt
*
pMgmt
=
&
pDnode
->
smgmt
;
if
(
dndInitWorker
(
pDnode
,
&
pMgmt
->
writeWorker
,
DND_WORKER_SINGLE
,
"snode-write"
,
0
,
1
,
(
FProcessItem
)
dndProcessSnodeQueue
)
!=
0
)
{
dError
(
"failed to start snode write worker since %s"
,
terrstr
());
return
-
1
;
}
return
0
;
}
static
void
dndStopSnodeWorker
(
SDnode
*
pDnode
)
{
SSnodeMgmt
*
pMgmt
=
&
pDnode
->
smgmt
;
taosWLockLatch
(
&
pMgmt
->
latch
);
pMgmt
->
deployed
=
0
;
taosWUnLockLatch
(
&
pMgmt
->
latch
);
while
(
pMgmt
->
refCount
>
1
)
{
taosMsleep
(
10
);
}
dndCleanupWorker
(
&
pMgmt
->
writeWorker
);
}
static
void
dndBuildSnodeOption
(
SDnode
*
pDnode
,
SSnodeOpt
*
pOption
)
{
pOption
->
pDnode
=
pDnode
;
pOption
->
sendMsgToDnodeFp
=
dndSendMsgToDnode
;
pOption
->
sendMsgToMnodeFp
=
dndSendMsgToMnode
;
pOption
->
sendRedirectMsgFp
=
dndSendRedirectMsg
;
pOption
->
dnodeId
=
dndGetDnodeId
(
pDnode
);
pOption
->
clusterId
=
dndGetClusterId
(
pDnode
);
pOption
->
cfg
.
sver
=
pDnode
->
opt
.
sver
;
}
static
int32_t
dndOpenSnode
(
SDnode
*
pDnode
)
{
SSnodeMgmt
*
pMgmt
=
&
pDnode
->
smgmt
;
SSnodeOpt
option
=
{
0
};
dndBuildSnodeOption
(
pDnode
,
&
option
);
SSnode
*
pSnode
=
sndOpen
(
pDnode
->
dir
.
snode
,
&
option
);
if
(
pSnode
==
NULL
)
{
dError
(
"failed to open snode since %s"
,
terrstr
());
return
-
1
;
}
if
(
dndStartSnodeWorker
(
pDnode
)
!=
0
)
{
dError
(
"failed to start snode worker since %s"
,
terrstr
());
sndClose
(
pSnode
);
return
-
1
;
}
if
(
dndWriteSnodeFile
(
pDnode
)
!=
0
)
{
dError
(
"failed to write snode file since %s"
,
terrstr
());
dndStopSnodeWorker
(
pDnode
);
sndClose
(
pSnode
);
return
-
1
;
}
taosWLockLatch
(
&
pMgmt
->
latch
);
pMgmt
->
pSnode
=
pSnode
;
pMgmt
->
deployed
=
1
;
taosWUnLockLatch
(
&
pMgmt
->
latch
);
dInfo
(
"snode open successfully"
);
return
0
;
}
static
int32_t
dndDropSnode
(
SDnode
*
pDnode
)
{
SSnodeMgmt
*
pMgmt
=
&
pDnode
->
smgmt
;
SSnode
*
pSnode
=
dndAcquireSnode
(
pDnode
);
if
(
pSnode
==
NULL
)
{
dError
(
"failed to drop snode since %s"
,
terrstr
());
return
-
1
;
}
taosRLockLatch
(
&
pMgmt
->
latch
);
pMgmt
->
dropped
=
1
;
taosRUnLockLatch
(
&
pMgmt
->
latch
);
if
(
dndWriteSnodeFile
(
pDnode
)
!=
0
)
{
taosRLockLatch
(
&
pMgmt
->
latch
);
pMgmt
->
dropped
=
0
;
taosRUnLockLatch
(
&
pMgmt
->
latch
);
dndReleaseSnode
(
pDnode
,
pSnode
);
dError
(
"failed to drop snode since %s"
,
terrstr
());
return
-
1
;
}
dndReleaseSnode
(
pDnode
,
pSnode
);
dndStopSnodeWorker
(
pDnode
);
sndClose
(
pSnode
);
pMgmt
->
pSnode
=
NULL
;
sndDestroy
(
pDnode
->
dir
.
snode
);
return
0
;
}
int32_t
dndProcessCreateSnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
)
{
SCreateSnodeInMsg
*
pMsg
=
pRpcMsg
->
pCont
;
pMsg
->
dnodeId
=
htonl
(
pMsg
->
dnodeId
);
if
(
pMsg
->
dnodeId
!=
dndGetDnodeId
(
pDnode
))
{
terrno
=
TSDB_CODE_DND_SNODE_ID_INVALID
;
return
-
1
;
}
else
{
return
dndOpenSnode
(
pDnode
);
}
}
int32_t
dndProcessDropSnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
)
{
SDropSnodeInMsg
*
pMsg
=
pRpcMsg
->
pCont
;
pMsg
->
dnodeId
=
htonl
(
pMsg
->
dnodeId
);
if
(
pMsg
->
dnodeId
!=
dndGetDnodeId
(
pDnode
))
{
terrno
=
TSDB_CODE_DND_SNODE_ID_INVALID
;
return
-
1
;
}
else
{
return
dndDropSnode
(
pDnode
);
}
}
static
void
dndProcessSnodeQueue
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
)
{
SSnodeMgmt
*
pMgmt
=
&
pDnode
->
smgmt
;
SRpcMsg
*
pRsp
=
NULL
;
int32_t
code
=
TSDB_CODE_DND_SNODE_NOT_DEPLOYED
;
SSnode
*
pSnode
=
dndAcquireSnode
(
pDnode
);
if
(
pSnode
!=
NULL
)
{
code
=
sndProcessMsg
(
pSnode
,
pMsg
,
&
pRsp
);
}
if
(
pRsp
!=
NULL
)
{
pRsp
->
ahandle
=
pMsg
->
ahandle
;
rpcSendResponse
(
pRsp
);
free
(
pRsp
);
}
else
{
if
(
code
!=
0
)
code
=
terrno
;
SRpcMsg
rpcRsp
=
{.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
code
=
code
};
rpcSendResponse
(
&
rpcRsp
);
}
rpcFreeCont
(
pMsg
->
pCont
);
taosFreeQitem
(
pMsg
);
}
static
void
dndWriteSnodeMsgToWorker
(
SDnode
*
pDnode
,
SDnodeWorker
*
pWorker
,
SRpcMsg
*
pMsg
)
{
int32_t
code
=
TSDB_CODE_DND_SNODE_NOT_DEPLOYED
;
SSnode
*
pSnode
=
dndAcquireSnode
(
pDnode
);
if
(
pSnode
!=
NULL
)
{
code
=
dndWriteMsgToWorker
(
pWorker
,
pMsg
,
sizeof
(
SRpcMsg
));
}
dndReleaseSnode
(
pDnode
,
pSnode
);
if
(
code
!=
0
)
{
if
(
pMsg
->
msgType
&
1u
)
{
SRpcMsg
rsp
=
{.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
code
=
code
};
rpcSendResponse
(
&
rsp
);
}
rpcFreeCont
(
pMsg
->
pCont
);
}
}
void
dndProcessSnodeWriteMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
dndWriteSnodeMsgToWorker
(
pDnode
,
&
pDnode
->
smgmt
.
writeWorker
,
pMsg
);
}
int32_t
dndInitSnode
(
SDnode
*
pDnode
)
{
SSnodeMgmt
*
pMgmt
=
&
pDnode
->
smgmt
;
taosInitRWLatch
(
&
pMgmt
->
latch
);
if
(
dndReadSnodeFile
(
pDnode
)
!=
0
)
{
return
-
1
;
}
if
(
pMgmt
->
dropped
)
return
0
;
if
(
!
pMgmt
->
deployed
)
return
0
;
return
dndOpenSnode
(
pDnode
);
}
void
dndCleanupSnode
(
SDnode
*
pDnode
)
{
SSnodeMgmt
*
pMgmt
=
&
pDnode
->
smgmt
;
if
(
pMgmt
->
pSnode
)
{
dndStopSnodeWorker
(
pDnode
);
sndClose
(
pMgmt
->
pSnode
);
pMgmt
->
pSnode
=
NULL
;
}
}
source/dnode/mgmt/impl/src/dndWorker.c
浏览文件 @
d60941d4
...
...
@@ -16,9 +16,9 @@
#define _DEFAULT_SOURCE
#include "dndWorker.h"
int32_t
dndInitWorker
(
SDnode
*
pDnode
,
SDnodeWorker
*
pWorker
,
E
Dnd
WorkerType
type
,
const
char
*
name
,
int32_t
minNum
,
int32_t
maxNum
,
FProcessItem
f
p
)
{
if
(
pDnode
==
NULL
||
pWorker
==
NULL
||
name
==
NULL
||
minNum
<
0
||
maxNum
<=
0
||
f
p
==
NULL
)
{
int32_t
dndInitWorker
(
SDnode
*
pDnode
,
SDnodeWorker
*
pWorker
,
EWorkerType
type
,
const
char
*
name
,
int32_t
minNum
,
int32_t
maxNum
,
void
*
queueF
p
)
{
if
(
pDnode
==
NULL
||
pWorker
==
NULL
||
name
==
NULL
||
minNum
<
0
||
maxNum
<=
0
||
queueF
p
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_PARA
;
return
-
1
;
}
...
...
@@ -27,19 +27,32 @@ int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EDndWorkerType type
pWorker
->
name
=
name
;
pWorker
->
minNum
=
minNum
;
pWorker
->
maxNum
=
maxNum
;
pWorker
->
fp
=
f
p
;
pWorker
->
queueFp
=
queueF
p
;
pWorker
->
pDnode
=
pDnode
;
if
(
pWorker
->
type
==
DND_WORKER_SINGLE
)
{
SWorkerPool
*
pPool
=
&
pWorker
->
pool
;
pPool
->
name
=
name
;
pPool
->
min
=
minNum
;
pPool
->
max
=
maxNum
;
if
(
tWorkerInit
(
pPool
)
!=
0
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
pWorker
->
queue
=
tWorkerAllocQueue
(
&
pPool
,
pDnode
,
fp
);
pWorker
->
queue
=
tWorkerAllocQueue
(
pPool
,
pDnode
,
(
FProcessItem
)
queueFp
);
if
(
pWorker
->
queue
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
}
else
if
(
pWorker
->
type
==
DND_WORKER_MULTI
)
{
SMWorkerPool
*
pPool
=
&
pWorker
->
mpool
;
pPool
->
name
=
name
;
pPool
->
max
=
maxNum
;
if
(
tMWorkerInit
(
pPool
)
!=
0
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
pWorker
->
queue
=
tMWorkerAllocQueue
(
pPool
,
pDnode
,
(
FProcessItems
)
queueFp
);
if
(
pWorker
->
queue
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
...
...
@@ -52,12 +65,17 @@ int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EDndWorkerType type
}
void
dndCleanupWorker
(
SDnodeWorker
*
pWorker
)
{
while
(
!
taosQueueEmpty
(
pWorker
->
queue
))
{
taosMsleep
(
10
);
}
if
(
pWorker
->
type
==
DND_WORKER_SINGLE
)
{
while
(
!
taosQueueEmpty
(
pWorker
->
queue
))
{
taosMsleep
(
10
);
}
tWorkerCleanup
(
&
pWorker
->
pool
);
tWorkerFreeQueue
(
&
pWorker
->
pool
,
pWorker
->
queue
);
}
else
if
(
pWorker
->
type
==
DND_WORKER_MULTI
)
{
tWorkerCleanup
(
&
pWorker
->
mpool
);
tMWorkerFreeQueue
(
&
pWorker
->
mpool
,
pWorker
->
queue
);
}
else
{
}
}
...
...
source/dnode/snode/src/snode.c
浏览文件 @
d60941d4
...
...
@@ -15,7 +15,7 @@
#include "sndInt.h"
SSnode
*
sndOpen
(
const
SSnodeOpt
*
pOption
)
{
SSnode
*
sndOpen
(
const
char
*
path
,
const
SSnodeOpt
*
pOption
)
{
SSnode
*
pSnode
=
calloc
(
1
,
sizeof
(
SSnode
));
return
pSnode
;
}
...
...
@@ -28,3 +28,5 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
*
pRsp
=
NULL
;
return
0
;
}
void
sndDestroy
(
const
char
*
path
)
{}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录