提交 823691b6 编写于 作者: S Shengliang Guan

refactor: node mgmt

上级 5669f2a1
......@@ -36,7 +36,7 @@ typedef struct SBnodeMgmt {
// bmHandle.c
SArray *bmGetMsgHandles();
int32_t bmProcessCreateReq(SBnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg);
int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SNodeMsg *pReq);
......
......@@ -43,7 +43,7 @@ int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SNodeMsg *pReq) {
return 0;
}
int32_t bmProcessCreateReq(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) {
SRpcMsg *pReq = &pMsg->rpcMsg;
SDCreateBnodeReq createReq = {0};
......@@ -52,14 +52,14 @@ int32_t bmProcessCreateReq(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return -1;
}
if (pMgmt->dnodeId != 0 && createReq.dnodeId != pMgmt->dnodeId) {
if (pInput->dnodeId != 0 && createReq.dnodeId != pInput->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to create bnode since %s, input:%d cur:%d", terrstr(), createReq.dnodeId, pMgmt->dnodeId);
dError("failed to create bnode since %s, input:%d cur:%d", terrstr(), createReq.dnodeId, pInput->dnodeId);
return -1;
}
bool deployed = true;
if (dmWriteFile(pMgmt->path, pMgmt->name, deployed) != 0) {
if (dmWriteFile(pInput->path, pInput->name, deployed) != 0) {
dError("failed to write bnode file since %s", terrstr());
return -1;
}
......@@ -76,7 +76,7 @@ int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return -1;
}
if (dropReq.dnodeId != pMgmt->dnodeId) {
if (pMgmt->dnodeId != 0 && dropReq.dnodeId != pMgmt->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to drop bnode since %s", terrstr());
return -1;
......
......@@ -27,8 +27,8 @@ typedef struct SDnodeMgmt {
SMsgCb msgCb;
const char *path;
const char *name;
TdThread *statusThreadId;
TdThread *monitorThreadId;
TdThread statusThread;
TdThread monitorThread;
SSingleWorker mgmtWorker;
ProcessCreateNodeFp processCreateNodeFp;
ProcessDropNodeFp processDropNodeFp;
......
......@@ -27,6 +27,7 @@ static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) {
}
static void dmStopMgmt(SDnodeMgmt *pMgmt) {
pMgmt->data.stopped = true;
dmStopMonitorThread(pMgmt);
dmStopStatusThread(pMgmt);
}
......
......@@ -24,8 +24,7 @@ static void *dmStatusThreadFp(void *param) {
while (1) {
taosMsleep(200);
taosThreadTestCancel();
if (pMgmt->data.dropped) continue;
if (pMgmt->data.dropped || pMgmt->data.stopped) break;
int64_t curTime = taosGetTimestampMs();
float interval = (curTime - lastTime) / 1000.0f;
......@@ -46,8 +45,7 @@ static void *dmMonitorThreadFp(void *param) {
while (1) {
taosMsleep(200);
taosThreadTestCancel();
if (pMgmt->data.dropped) continue;
if (pMgmt->data.dropped || pMgmt->data.stopped) break;
int64_t curTime = taosGetTimestampMs();
float interval = (curTime - lastTime) / 1000.0f;
......@@ -61,40 +59,42 @@ static void *dmMonitorThreadFp(void *param) {
}
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
pMgmt->statusThreadId = taosCreateThread(dmStatusThreadFp, pMgmt);
if (pMgmt->statusThreadId == NULL) {
dError("failed to init dnode status thread");
terrno = TSDB_CODE_OUT_OF_MEMORY;
TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if (taosThreadCreate(&pMgmt->statusThread, &thAttr, dmStatusThreadFp, pMgmt) != 0) {
dError("failed to create status thread since %s", strerror(errno));
return -1;
}
taosThreadAttrDestroy(&thAttr);
tmsgReportStartup("dnode-status", "initialized");
return 0;
}
void dmStopStatusThread(SDnodeMgmt *pMgmt) {
if (pMgmt->statusThreadId != NULL) {
taosDestoryThread(pMgmt->statusThreadId);
pMgmt->statusThreadId = NULL;
if (taosCheckPthreadValid(pMgmt->statusThread)) {
taosThreadJoin(pMgmt->statusThread, NULL);
}
}
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
pMgmt->monitorThreadId = taosCreateThread(dmMonitorThreadFp, pMgmt);
if (pMgmt->monitorThreadId == NULL) {
dError("failed to init dnode monitor thread");
terrno = TSDB_CODE_OUT_OF_MEMORY;
TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if (taosThreadCreate(&pMgmt->monitorThread, &thAttr, dmMonitorThreadFp, pMgmt) != 0) {
dError("failed to create monitor thread since %s", strerror(errno));
return -1;
}
taosThreadAttrDestroy(&thAttr);
tmsgReportStartup("dnode-monitor", "initialized");
return 0;
}
void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
if (pMgmt->monitorThreadId != NULL) {
taosDestoryThread(pMgmt->monitorThreadId);
pMgmt->monitorThreadId = NULL;
if (taosCheckPthreadValid(pMgmt->monitorThread)) {
taosThreadJoin(pMgmt->monitorThread, NULL);
}
}
......
......@@ -48,7 +48,7 @@ int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq);
// mmHandle.c
SArray *mmGetMsgHandles();
int32_t mmProcessCreateReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg);
int32_t mmProcessDropReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SNodeMsg *pReq);
......
......@@ -124,21 +124,19 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed) {
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n");
if (pReq != NULL || pMgmt != NULL) {
int8_t replica = (pReq != NULL ? pReq->replica : pMgmt->replica);
for (int32_t i = 0; i < replica; ++i) {
SReplica *pReplica = &pMgmt->replicas[i];
if (pReq != NULL) {
pReplica = &pReq->replicas[i];
}
len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id);
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn);
len += snprintf(content + len, maxLen - len, " \"port\": %u\n", pReplica->port);
if (i < replica - 1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }],\n");
}
int8_t replica = (pReq != NULL ? pReq->replica : pMgmt->replica);
for (int32_t i = 0; i < replica; ++i) {
SReplica *pReplica = &pMgmt->replicas[i];
if (pReq != NULL) {
pReplica = &pReq->replicas[i];
}
len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id);
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn);
len += snprintf(content + len, maxLen - len, " \"port\": %u\n", pReplica->port);
if (i < replica - 1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }],\n");
}
}
......
......@@ -72,7 +72,7 @@ int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SNodeMsg *pReq) {
return 0;
}
int32_t mmProcessCreateReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) {
SRpcMsg *pReq = &pMsg->rpcMsg;
SDCreateMnodeReq createReq = {0};
......@@ -81,14 +81,18 @@ int32_t mmProcessCreateReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return -1;
}
if (createReq.replica <= 1 || createReq.dnodeId != pMgmt->dnodeId) {
if (createReq.replica <= 1 || (createReq.dnodeId != pInput->dnodeId && pInput->dnodeId != 0)) {
terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to create mnode since %s", terrstr());
return -1;
}
bool deployed = true;
if (mmWriteFile(pMgmt, &createReq, deployed) != 0) {
SMnodeMgmt mgmt = {0};
mgmt.path = pInput->path;
mgmt.name = pInput->name;
if (mmWriteFile(&mgmt, &createReq, deployed) != 0) {
dError("failed to write mnode file since %s", terrstr());
return -1;
}
......@@ -105,7 +109,7 @@ int32_t mmProcessDropReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return -1;
}
if (dropReq.dnodeId != pMgmt->dnodeId) {
if (pMgmt->dnodeId != 0 && dropReq.dnodeId != pMgmt->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to drop mnode since %s", terrstr());
return -1;
......
......@@ -37,7 +37,7 @@ typedef struct SQnodeMgmt {
// qmHandle.c
SArray *qmGetMsgHandles();
int32_t qmProcessCreateReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg);
int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SNodeMsg *pReq);
......
......@@ -43,7 +43,7 @@ int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SNodeMsg *pReq) {
return 0;
}
int32_t qmProcessCreateReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) {
SRpcMsg *pReq = &pMsg->rpcMsg;
SDCreateQnodeReq createReq = {0};
......@@ -52,14 +52,14 @@ int32_t qmProcessCreateReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return -1;
}
if (createReq.dnodeId != pMgmt->dnodeId) {
if (pInput->dnodeId != 0 && createReq.dnodeId != pInput->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to create qnode since %s", terrstr());
return -1;
}
bool deployed = true;
if (dmWriteFile(pMgmt->path, pMgmt->name, deployed) != 0) {
if (dmWriteFile(pInput->path, pInput->name, deployed) != 0) {
dError("failed to write qnode file since %s", terrstr());
return -1;
}
......@@ -76,7 +76,7 @@ int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return -1;
}
if (dropReq.dnodeId != pMgmt->dnodeId) {
if (pMgmt->dnodeId != 0 && dropReq.dnodeId != pMgmt->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to drop qnode since %s", terrstr());
return -1;
......
......@@ -39,7 +39,7 @@ typedef struct SSnodeMgmt {
// smHandle.c
SArray *smGetMsgHandles();
int32_t smProcessCreateReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg);
int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SNodeMsg *pReq);
......
......@@ -43,7 +43,7 @@ int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SNodeMsg *pReq) {
return 0;
}
int32_t smProcessCreateReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) {
SRpcMsg *pReq = &pMsg->rpcMsg;
SDCreateSnodeReq createReq = {0};
......@@ -52,14 +52,14 @@ int32_t smProcessCreateReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return -1;
}
if (createReq.dnodeId != pMgmt->dnodeId) {
if (pInput->dnodeId != 0 && createReq.dnodeId != pInput->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to create snode since %s", terrstr());
return -1;
}
bool deployed = true;
if (dmWriteFile(pMgmt->path, pMgmt->name, deployed) != 0) {
if (dmWriteFile(pInput->path, pInput->name, deployed) != 0) {
dError("failed to write snode file since %s", terrstr());
return -1;
}
......@@ -76,7 +76,7 @@ int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return -1;
}
if (dropReq.dnodeId != pMgmt->dnodeId) {
if (pMgmt->dnodeId != 0 && dropReq.dnodeId != pMgmt->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to drop snode since %s", terrstr());
return -1;
......
......@@ -320,7 +320,12 @@ int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMs
return -1;
}
int32_t code = (*pWrapper->func.createFp)(pWrapper, pMsg);
SMgmtInputOpt *pInput = &pWrapper->pDnode->input;
pInput->name = pWrapper->name;
pInput->path = pWrapper->path;
pInput->msgCb = dmGetMsgcb(pWrapper);
int32_t code = (*pWrapper->func.createFp)(pInput, pMsg);
if (code != 0) {
dError("node:%s, failed to create since %s", pWrapper->name, terrstr());
} else {
......@@ -345,7 +350,7 @@ int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg)
taosThreadMutexLock(&pDnode->mutex);
int32_t code = (*pWrapper->func.dropFp)(pWrapper, pMsg);
int32_t code = (*pWrapper->func.dropFp)(pWrapper->pMgmt, pMsg);
if (code != 0) {
dError("node:%s, failed to drop since %s", pWrapper->name, terrstr());
} else {
......
......@@ -181,6 +181,7 @@ int32_t dmStartNode(SMgmtWrapper *pWrapper) {
void dmStopNode(SMgmtWrapper *pWrapper) {
if (pWrapper->func.stopFp != NULL && pWrapper->pMgmt != NULL) {
(*pWrapper->func.stopFp)(pWrapper->pMgmt);
dDebug("node:%s, has been stopped", pWrapper->name);
}
}
......
......@@ -257,13 +257,21 @@ static inline void dmSendRpcRsp(SDnode *pDnode, const SRpcMsg *pRsp) {
}
static inline void dmSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) {
rpcSendRecv(pDnode->trans.clientRpc, pEpSet, pReq, pRsp);
if (pDnode->status != DND_STAT_RUNNING) {
pRsp->code = TSDB_CODE_NODE_OFFLINE;
} else {
rpcSendRecv(pDnode->trans.clientRpc, pEpSet, pReq, pRsp);
}
}
static inline void dmSendToMnodeRecv(SMgmtWrapper *pWrapper, SRpcMsg *pReq, SRpcMsg *pRsp) {
SEpSet epSet = {0};
dmGetMnodeEpSet(pWrapper->pDnode, &epSet);
dmSendRecv(pWrapper->pDnode, &epSet, pReq, pRsp);
if (pWrapper->pDnode->status != DND_STAT_RUNNING) {
pRsp->code = TSDB_CODE_NODE_OFFLINE;
} else {
SEpSet epSet = {0};
dmGetMnodeEpSet(pWrapper->pDnode, &epSet);
dmSendRecv(pWrapper->pDnode, &epSet, pReq, pRsp);
}
}
static inline int32_t dmSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) {
......
......@@ -54,10 +54,10 @@ extern "C" {
typedef enum {
DNODE = 0,
VNODE = 1,
QNODE = 2,
SNODE = 3,
MNODE = 4,
MNODE = 1,
VNODE = 2,
QNODE = 3,
SNODE = 4,
BNODE = 5,
NODE_END = 6,
} EDndNodeType;
......@@ -117,7 +117,7 @@ typedef int32_t (*NodeOpenFp)(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutp
typedef void (*NodeCloseFp)(void *pMgmt);
typedef int32_t (*NodeStartFp)(void *pMgmt);
typedef void (*NodeStopFp)(void *pMgmt);
typedef int32_t (*NodeCreateFp)(void *pMgmt, SNodeMsg *pMsg);
typedef int32_t (*NodeCreateFp)(const SMgmtInputOpt *pInput, SNodeMsg *pMsg);
typedef int32_t (*NodeDropFp)(void *pMgmt, SNodeMsg *pMsg);
typedef int32_t (*NodeRequireFp)(const SMgmtInputOpt *pInput, bool *required);
typedef SArray *(*NodeGetHandlesFp)(); // array of SMgmtHandle
......@@ -167,6 +167,7 @@ typedef struct {
ESyncState vndState;
ESyncState mndState;
bool dropped;
bool stopped;
SEpSet mnodeEps;
SArray *dnodeEps;
SHashObj *dnodeHash;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册