提交 06a2b831 编写于 作者: S Shengliang Guan

shm

上级 326459ff
...@@ -33,7 +33,6 @@ typedef struct { ...@@ -33,7 +33,6 @@ typedef struct {
} SSnodeLoad; } SSnodeLoad;
typedef struct { typedef struct {
int32_t sver;
int32_t dnodeId; int32_t dnodeId;
int64_t clusterId; int64_t clusterId;
SMgmtWrapper *pWrapper; SMgmtWrapper *pWrapper;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_SNODE_H_
#define _TD_DND_SNODE_H_
#include "dnd.h"
#ifdef __cplusplus
extern "C" {
#endif
void bmGetMgmtFp(SMgmtWrapper *pWrapper);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_SNODE_H_*/
\ No newline at end of file
...@@ -16,47 +16,37 @@ ...@@ -16,47 +16,37 @@
#ifndef _TD_DND_SNODE_INT_H_ #ifndef _TD_DND_SNODE_INT_H_
#define _TD_DND_SNODE_INT_H_ #define _TD_DND_SNODE_INT_H_
#include "dnd.h" #include "sm.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
typedef struct SSnodeMgmt { typedef struct SSnodeMgmt {
int32_t refCount;
int8_t deployed;
int8_t dropped;
int8_t uniqueWorkerInUse;
SSnode *pSnode; SSnode *pSnode;
SDnode *pDnode;
SMgmtWrapper *pWrapper;
const char *path;
SRWLatch latch; SRWLatch latch;
int8_t uniqueWorkerInUse;
SArray *uniqueWorkers; // SArray<SDnodeWorker*> SArray *uniqueWorkers; // SArray<SDnodeWorker*>
SDnodeWorker sharedWorker; SDnodeWorker sharedWorker;
} SSnodeMgmt; } SSnodeMgmt;
void smGetMgmtFp(SMgmtWrapper *pMgmt); // smInt.c
int32_t smOpen(SMgmtWrapper *pWrapper);
int32_t dndInitSnode(SDnode *pDnode); int32_t smDrop(SMgmtWrapper *pWrapper);
void dndCleanupSnode(SDnode *pDnode);
void dndProcessSnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t smProcessCreateReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg);
// smMsg.c
void smInitMsgHandles(SMgmtWrapper *pWrapper); void smInitMsgHandles(SMgmtWrapper *pWrapper);
int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t smStartWorker(SDnode *pDnode); int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void smStopWorker(SDnode *pDnode);
void smInitMsgFp(SMnodeMgmt *pMgmt); // smWorker.c
void smProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); int32_t smStartWorker(SQnodeMgmt *pMgmt);
int32_t smPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); void smStopWorker(SQnodeMgmt *pMgmt);
int32_t smPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t smProcessQueryMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg);
void smConsumeChildQueue(SDnode *pDnode, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); int32_t smProcessFetchMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg);
void smConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void smProcessWriteMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void smProcessSyncMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void smProcessReadMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -18,13 +18,110 @@ ...@@ -18,13 +18,110 @@
static int32_t smRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); } static int32_t smRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); }
static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) {
SDnode *pDnode = pMgmt->pDnode;
pOption->pWrapper = pMgmt->pWrapper;
pOption->sendReqFp = dndSendReqToDnode;
pOption->sendMnodeReqFp = dndSendReqToMnode;
pOption->dnodeId = pDnode->dnodeId;
pOption->clusterId = pDnode->clusterId;
}
static int32_t smOpenImp(SSnodeMgmt *pMgmt) {
SSnodeOpt option = {0};
smInitOption(pMgmt, &option);
pMgmt->pSnode = sndOpen(pMgmt->path, &option);
if (pMgmt->pSnode == NULL) {
dError("failed to open snode since %s", terrstr());
return -1;
}
if (smStartWorker(pMgmt) != 0) {
dError("failed to start snode worker since %s", terrstr());
return -1;
}
bool deployed = true;
if (dndWriteFile(pMgmt->pWrapper, deployed) != 0) {
dError("failed to write snode file since %s", terrstr());
return -1;
}
return 0;
}
static void smCloseImp(SSnodeMgmt *pMgmt) {
if (pMgmt->pSnode != NULL) {
smStopWorker(pMgmt);
sndClose(pMgmt->pSnode);
pMgmt->pSnode = NULL;
}
}
int32_t smDrop(SMgmtWrapper *pWrapper) {
SSnodeMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return 0;
dInfo("snode-mgmt start to drop");
bool deployed = false;
if (dndWriteFile(pWrapper, deployed) != 0) {
dError("failed to drop snode since %s", terrstr());
return -1;
}
smCloseImp(pMgmt);
taosRemoveDir(pMgmt->path);
pWrapper->pMgmt = NULL;
free(pMgmt);
dInfo("snode-mgmt is dropped");
return 0;
}
static void smClose(SMgmtWrapper *pWrapper) {
SSnodeMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return;
dInfo("snode-mgmt start to cleanup");
smCloseImp(pMgmt);
pWrapper->pMgmt = NULL;
free(pMgmt);
dInfo("snode-mgmt is cleaned up");
}
int32_t smOpen(SMgmtWrapper *pWrapper) {
dInfo("snode-mgmt start to init");
SSnodeMgmt *pMgmt = calloc(1, sizeof(SSnodeMgmt));
if (pMgmt == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pMgmt->path = pWrapper->path;
pMgmt->pDnode = pWrapper->pDnode;
pMgmt->pWrapper = pWrapper;
pWrapper->pMgmt = pMgmt;
int32_t code = smOpenImp(pMgmt);
if (code != 0) {
dError("failed to init snode-mgmt since %s", terrstr());
smClose(pWrapper);
} else {
dInfo("snode-mgmt is initialized");
}
return code;
}
void smGetMgmtFp(SMgmtWrapper *pWrapper) { void smGetMgmtFp(SMgmtWrapper *pWrapper) {
SMgmtFp mgmtFp = {0}; SMgmtFp mgmtFp = {0};
mgmtFp.openFp = NULL; mgmtFp.openFp = smOpen;
mgmtFp.closeFp = NULL; mgmtFp.closeFp = smClose;
mgmtFp.createMsgFp = smProcessCreateReq;
mgmtFp.dropMsgFp = smProcessDropReq;
mgmtFp.requiredFp = smRequire; mgmtFp.requiredFp = smRequire;
// smInitMsgHandles(pWrapper); smInitMsgHandles(pWrapper);
pWrapper->name = "snode"; pWrapper->name = "snode";
pWrapper->fp = mgmtFp; pWrapper->fp = mgmtFp;
} }
...@@ -16,8 +16,42 @@ ...@@ -16,8 +16,42 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "smInt.h" #include "smInt.h"
int32_t smProcessCreateReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {return 0;} int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {return 0;} SDnode *pDnode = pWrapper->pDnode;
SRpcMsg *pReq = &pMsg->rpcMsg;
void smInitMsgHandles(SMgmtWrapper *pWrapper) { SDCreateSnodeReq createReq = {0};
if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
if (createReq.dnodeId != pDnode->dnodeId) {
terrno = TSDB_CODE_NODE_INVALID_OPTION;
dError("failed to create snode since %s, input:%d cur:%d", terrstr(), createReq.dnodeId, pDnode->dnodeId);
return -1;
} else {
return smOpen(pWrapper);
}
}
int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SDnode *pDnode = pWrapper->pDnode;
SRpcMsg *pReq = &pMsg->rpcMsg;
SDDropSnodeReq dropReq = {0};
if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
if (dropReq.dnodeId != pDnode->dnodeId) {
terrno = TSDB_CODE_NODE_INVALID_OPTION;
dError("failed to drop snode since %s", terrstr());
return -1;
} else {
return smDrop(pWrapper);
}
} }
void smInitMsgHandles(SMgmtWrapper *pWrapper) {}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册