提交 4fa5121c 编写于 作者: S Shengliang Guan

adjust refcount

上级 8058a0f1
......@@ -180,6 +180,10 @@ static int32_t taosSetTfsCfg(SConfig *pCfg) {
memcpy(&tsDiskCfg[index], pCfg, sizeof(SDiskCfg));
if (pCfg->level == 0 && pCfg->primary == 1) {
tstrncpy(tsDataDir, pCfg->dir, PATH_MAX);
if (taosMkDir(tsDataDir) != 0) {
uError("failed to create dataDir:%s since %s", tsDataDir, terrstr());
return -1;
}
}
if (taosMkDir(pCfg->dir) != 0) {
uError("failed to create tfsDir:%s since %s", tsDataDir, terrstr());
......
......@@ -77,7 +77,11 @@ int32_t bmStartWorker(SBnodeMgmt *pMgmt) {
return -1;
}
dDebug("bnode workers are initialized");
return 0;
}
void bmStopWorker(SBnodeMgmt *pMgmt) { tMultiWorkerCleanup(&pMgmt->writeWorker); }
void bmStopWorker(SBnodeMgmt *pMgmt) {
tMultiWorkerCleanup(&pMgmt->writeWorker);
dDebug("bnode workers are closed");
}
......@@ -130,12 +130,11 @@ typedef struct SDnode {
SMgmtWrapper wrappers[NODE_MAX];
} SDnode;
EDndStatus dndGetStatus(SDnode *pDnode);
void dndSetStatus(SDnode *pDnode, EDndStatus stat);
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType);
void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp, int32_t vgId);
void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc);
void dndSendMonitorReport(SDnode *pDnode);
EDndStatus dndGetStatus(SDnode *pDnode);
void dndSetStatus(SDnode *pDnode, EDndStatus stat);
void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp, int32_t vgId);
void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc);
void dndSendMonitorReport(SDnode *pDnode);
int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, SEpSet *pEpSet, SRpcMsg *pMsg);
......@@ -145,6 +144,10 @@ int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg);
int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed);
int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed);
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType);
int32_t dndMarkWrapper(SMgmtWrapper *pWrapper);
void dndReleaseWrapper(SMgmtWrapper *pWrapper);
#ifdef __cplusplus
}
#endif
......
......@@ -50,10 +50,6 @@ SDnode *dndCreate(const SDnodeOpt *pOption);
void dndClose(SDnode *pDnode);
void dndHandleEvent(SDnode *pDnode, EDndEvent event);
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType);
int32_t dndMarkWrapper(SMgmtWrapper *pWrapper);
void dndReleaseWrapper(SMgmtWrapper *pWrapper);
// dndTransport.c
int32_t dndInitServer(SDnode *pDnode);
void dndCleanupServer(SDnode *pDnode);
......
......@@ -22,7 +22,12 @@ static int32_t dndGetMonitorDiskInfo(SDnode *pDnode, SMonDiskInfo *pInfo) {
tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name));
pInfo->tempdir.size = tsTempSpace.size;
return vmMonitorTfsInfo(dndAcquireWrapper(pDnode, VNODES), pInfo);
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, VNODES);
if (pWrapper != NULL) {
vmMonitorTfsInfo(pWrapper, pInfo);
dndReleaseWrapper(pWrapper);
}
return 0;
}
static void dndGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) {
......@@ -45,8 +50,17 @@ static void dndGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) {
taosGetCardInfo(&pInfo->net_in, &pInfo->net_out);
taosGetProcIO(&pInfo->io_read, &pInfo->io_write, &pInfo->io_read_disk, &pInfo->io_write_disk);
vmMonitorVnodeReqs(dndAcquireWrapper(pDnode, VNODES), pInfo);
pInfo->has_mnode = (dndAcquireWrapper(pDnode, MNODE)->required);
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, VNODES);
if (pWrapper != NULL) {
vmMonitorVnodeReqs(pWrapper, pInfo);
dndReleaseWrapper(pWrapper);
}
pWrapper = dndAcquireWrapper(pDnode, MNODE);
if (pWrapper != NULL) {
pInfo->has_mnode = pWrapper->required;
dndReleaseWrapper(pWrapper);
}
}
void dndSendMonitorReport(SDnode *pDnode) {
......@@ -63,10 +77,15 @@ void dndSendMonitorReport(SDnode *pDnode) {
SMonClusterInfo clusterInfo = {0};
SMonVgroupInfo vgroupInfo = {0};
SMonGrantInfo grantInfo = {0};
if (mmMonitorMnodeInfo(dndAcquireWrapper(pDnode, MNODE), &clusterInfo, &vgroupInfo, &grantInfo) == 0) {
monSetClusterInfo(pMonitor, &clusterInfo);
monSetVgroupInfo(pMonitor, &vgroupInfo);
monSetGrantInfo(pMonitor, &grantInfo);
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, MNODE);
if (pWrapper != NULL) {
if (mmMonitorMnodeInfo(pWrapper, &clusterInfo, &vgroupInfo, &grantInfo) == 0) {
monSetClusterInfo(pMonitor, &clusterInfo);
monSetVgroupInfo(pMonitor, &vgroupInfo);
monSetGrantInfo(pMonitor, &grantInfo);
}
dndReleaseWrapper(pWrapper);
}
SMonDnodeInfo dnodeInfo = {0};
......
......@@ -20,8 +20,8 @@ static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE);
if (pWrapper != NULL) {
dmUpdateMnodeEpSet(pWrapper->pMgmt, pEpSet);
dndReleaseWrapper(pWrapper);
}
dndReleaseWrapper(pWrapper);
}
static inline NodeMsgFp dndGetMsgFp(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
......
......@@ -146,9 +146,14 @@ static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) {
static void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pRpcMsg, SRpcMsg *pRpcRsp) {
STransMgmt *pMgmt = &pDnode->trans;
SEpSet epSet = {0};
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE);
if (pWrapper != NULL) {
dmGetMnodeEpSet(pWrapper->pMgmt, &epSet);
dndReleaseWrapper(pWrapper);
}
SEpSet epSet = {0};
dmGetMnodeEpSet(dndAcquireWrapper(pDnode, DNODE)->pMgmt, &epSet);
rpcSendRecv(pMgmt->clientRpc, &epSet, pRpcMsg, pRpcRsp);
}
......@@ -182,9 +187,14 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
return 0;
}
if (mmGetUserAuth(dndAcquireWrapper(pDnode, MNODE), user, spi, encrypt, secret, ckey) == 0) {
dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt);
return 0;
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, MNODE);
if (pWrapper != NULL) {
if (mmGetUserAuth(pWrapper, user, spi, encrypt, secret, ckey) == 0) {
dndReleaseWrapper(pWrapper);
dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt);
return 0;
}
dndReleaseWrapper(pWrapper);
}
if (terrno != TSDB_CODE_APP_NOT_READY) {
......@@ -328,7 +338,12 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) {
SDnode *pDnode = pWrapper->pDnode;
STransMgmt *pTrans = &pDnode->trans;
SEpSet epSet = {0};
dmGetMnodeEpSet(dndAcquireWrapper(pDnode, DNODE)->pMgmt, &epSet);
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE);
if (pWrapper != NULL) {
dmGetMnodeEpSet(pWrapper->pMgmt, &epSet);
dndReleaseWrapper(pWrapper);
}
return dndSendRpcReq(pTrans, &epSet, pReq);
}
}
......@@ -336,7 +351,12 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) {
void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
if (pRsp->code == TSDB_CODE_APP_NOT_READY) {
SMgmtWrapper *pDnodeWrapper = dndAcquireWrapper(pWrapper->pDnode, DNODE);
dmSendRedirectRsp(pDnodeWrapper->pMgmt, pRsp);
if (pDnodeWrapper != NULL) {
dmSendRedirectRsp(pDnodeWrapper->pMgmt, pRsp);
dndReleaseWrapper(pDnodeWrapper);
} else {
rpcSendResponse(pRsp);
}
} else {
rpcSendResponse(pRsp);
}
......
......@@ -209,7 +209,7 @@ int32_t dmWriteFile(SDnodeMgmt *pMgmt) {
}
pMgmt->updateTime = taosGetTimestampMs();
dDebug("successed to write %s", file);
dDebug("successed to write %s", realfile);
return 0;
}
......
......@@ -74,14 +74,14 @@ void dmSendRedirectRsp(SDnodeMgmt *pMgmt, SRpcMsg *pReq) {
}
static int32_t dmStart(SMgmtWrapper *pWrapper) {
dDebug("dnode mgmt start to run");
dDebug("dnode-mgmt start to run");
return dmStartThread(pWrapper->pMgmt);
}
int32_t dmInit(SMgmtWrapper *pWrapper) {
SDnode *pDnode = pWrapper->pDnode;
SDnodeMgmt *pMgmt = calloc(1, sizeof(SDnodeMgmt));
dInfo("dnode-mgmt is initialized");
dInfo("dnode-mgmt start to init");
pDnode->dnodeId = 0;
pDnode->dropped = 0;
......
......@@ -41,8 +41,12 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
taosRUnLockLatch(&pMgmt->latch);
req.pVloads = taosArrayInit(TSDB_MAX_VNODES, sizeof(SVnodeLoad));
vmMonitorVnodeLoads(dndAcquireWrapper(pDnode, VNODES), req.pVloads);
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, VNODES);
if (pWrapper != NULL) {
req.pVloads = taosArrayInit(TSDB_MAX_VNODES, sizeof(SVnodeLoad));
vmMonitorVnodeLoads(pWrapper, req.pVloads);
dndReleaseWrapper(pWrapper);
}
int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
void *pHead = rpcMallocCont(contLen);
......
......@@ -114,6 +114,7 @@ int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
return -1;
}
dDebug("dnode workers are initialized");
return 0;
}
......@@ -136,6 +137,7 @@ void dmStopWorker(SDnodeMgmt *pMgmt) {
taosDestoryThread(pMgmt->threadId);
pMgmt->threadId = NULL;
}
dDebug("dnode workers are closed");
}
int32_t dmProcessMgmtMsg(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
......@@ -144,6 +146,6 @@ int32_t dmProcessMgmtMsg(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
pWorker = &pMgmt->statusWorker;
}
dTrace("msg:%p, will be written to worker %s", pMsg, pWorker->name);
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
return taosWriteQitem(pWorker->queue, pMsg);
}
......@@ -227,7 +227,7 @@ static int32_t mmOpen(SMgmtWrapper *pWrapper) {
}
static int32_t mmStart(SMgmtWrapper *pWrapper) {
dDebug("mnode mgmt start to run");
dDebug("mnode-mgmt start to run");
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
return mndStart(pMgmt->pMnode);
}
......
......@@ -108,6 +108,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
return -1;
}
dDebug("mnode workers are initialized");
return 0;
}
......@@ -115,4 +116,5 @@ void mmStopWorker(SMnodeMgmt *pMgmt) {
tSingleWorkerCleanup(&pMgmt->readWorker);
tSingleWorkerCleanup(&pMgmt->writeWorker);
tSingleWorkerCleanup(&pMgmt->syncWorker);
dDebug("mnode workers are closed");
}
......@@ -132,10 +132,12 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
return -1;
}
dDebug("qnode workers are initialized");
return 0;
}
void qmStopWorker(SQnodeMgmt *pMgmt) {
tSingleWorkerCleanup(&pMgmt->queryWorker);
tSingleWorkerCleanup(&pMgmt->fetchWorker);
dDebug("qnode workers are closed");
}
......@@ -80,6 +80,7 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
return -1;
}
dDebug("snode workers are initialized");
return 0;
}
......@@ -90,6 +91,7 @@ void smStopWorker(SSnodeMgmt *pMgmt) {
}
taosArrayDestroy(pMgmt->uniqueWorkers);
tSingleWorkerCleanup(&pMgmt->sharedWorker);
dDebug("snode workers are closed");
}
static FORCE_INLINE int32_t smGetSWIdFromMsg(SRpcMsg *pMsg) {
......
......@@ -257,14 +257,14 @@ static void vmCleanup(SMgmtWrapper *pWrapper) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return;
dInfo("vnodes-mgmt start to cleanup");
dInfo("vnode-mgmt start to cleanup");
vmCloseVnodes(pMgmt);
vmStopWorker(pMgmt);
vnodeCleanup();
// walCleanUp();
free(pMgmt);
pWrapper->pMgmt = NULL;
dInfo("vnodes-mgmt is cleaned up");
dInfo("vnode-mgmt is cleaned up");
}
static int32_t vmInit(SMgmtWrapper *pWrapper) {
......@@ -272,7 +272,7 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) {
SVnodesMgmt *pMgmt = calloc(1, sizeof(SVnodesMgmt));
int32_t code = -1;
dInfo("vnodes-mgmt start to init");
dInfo("vnode-mgmt start to init");
if (pMgmt == NULL) goto _OVER;
pMgmt->path = pWrapper->path;
......@@ -312,7 +312,7 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) {
}
if (vmOpenVnodes(pMgmt) != 0) {
dError("failed to open vnodes since %s", terrstr());
dError("failed to open vnode since %s", terrstr());
return -1;
}
......
......@@ -356,7 +356,7 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
return -1;
}
dDebug("vnode workers is initialized");
dDebug("vnode workers are initialized");
return 0;
}
......@@ -366,5 +366,5 @@ void vmStopWorker(SVnodesMgmt *pMgmt) {
tQWorkerCleanup(&pMgmt->queryPool);
tWWorkerCleanup(&pMgmt->writePool);
tWWorkerCleanup(&pMgmt->syncPool);
dDebug("vnode workers is closed");
dDebug("vnode workers are closed");
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册