提交 359482be 编写于 作者: S Shengliang Guan

shm

上级 143a3113
...@@ -45,6 +45,7 @@ extern SDiskSpace tsTempSpace; ...@@ -45,6 +45,7 @@ extern SDiskSpace tsTempSpace;
void osInit(); void osInit();
void osUpdate(); void osUpdate();
void osCleanup();
bool osLogSpaceAvailable(); bool osLogSpaceAvailable();
void osSetTimezone(const char *timezone); void osSetTimezone(const char *timezone);
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
static int8_t once = DND_ENV_INIT; static int8_t once = DND_ENV_INIT;
int32_t dndInit() { int32_t dndInit() {
dDebug("start to init dnode env");
if (atomic_val_compare_exchange_8(&once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) { if (atomic_val_compare_exchange_8(&once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) {
terrno = TSDB_CODE_REPEAT_INIT; terrno = TSDB_CODE_REPEAT_INIT;
dError("failed to init dnode env since %s", terrstr()); dError("failed to init dnode env since %s", terrstr());
...@@ -51,6 +52,7 @@ int32_t dndInit() { ...@@ -51,6 +52,7 @@ int32_t dndInit() {
} }
void dndCleanup() { void dndCleanup() {
dDebug("start to cleanup dnode env");
if (atomic_val_compare_exchange_8(&once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) { if (atomic_val_compare_exchange_8(&once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) {
dError("dnode env is already cleaned up"); dError("dnode env is already cleaned up");
return; return;
...@@ -121,5 +123,6 @@ TdFilePtr dndCheckRunning(char *dataDir) { ...@@ -121,5 +123,6 @@ TdFilePtr dndCheckRunning(char *dataDir) {
return NULL; return NULL;
} }
dDebug("file:%s is locked", filepath);
return pFile; return pFile;
} }
...@@ -45,7 +45,11 @@ static bool dndRequireNode(SMgmtWrapper *pMgmt) { ...@@ -45,7 +45,11 @@ static bool dndRequireNode(SMgmtWrapper *pMgmt) {
static int32_t dndOpenNode(SMgmtWrapper *pWrapper) { return (*pWrapper->fp.openFp)(pWrapper); } static int32_t dndOpenNode(SMgmtWrapper *pWrapper) { return (*pWrapper->fp.openFp)(pWrapper); }
static void dndCloseNode(SMgmtWrapper *pWrapper) { (*pWrapper->fp.closeFp)(pWrapper); } static void dndCloseNode(SMgmtWrapper *pWrapper) {
if (pWrapper->required) {
(*pWrapper->fp.closeFp)(pWrapper);
}
}
static void dndClearMemory(SDnode *pDnode) { static void dndClearMemory(SDnode *pDnode) {
for (ENodeType n = 0; n < NODE_MAX; ++n) { for (ENodeType n = 0; n < NODE_MAX; ++n) {
...@@ -243,7 +247,7 @@ int32_t dndRun(SDnode *pDnode) { ...@@ -243,7 +247,7 @@ int32_t dndRun(SDnode *pDnode) {
while (1) { while (1) {
if (pDnode->event == DND_EVENT_STOP) { if (pDnode->event == DND_EVENT_STOP) {
dInfo("dnode object receive stop event"); dInfo("dnode is about to stop");
break; break;
} }
taosMsleep(100); taosMsleep(100);
...@@ -302,8 +306,7 @@ void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) { ...@@ -302,8 +306,7 @@ void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
goto _OVER; goto _OVER;
} }
dTrace("msg:%p, is created, app:%p RPC:%p user:%s, processd by %s", pMsg, pRpc->ahandle, pRpc->handle, pMsg->user, dTrace("msg:%p, is created, user:%s", pMsg, pMsg->user);
pWrapper->name);
code = (*msgFp)(pWrapper, pMsg); code = (*msgFp)(pWrapper, pMsg);
_OVER: _OVER:
......
...@@ -30,18 +30,18 @@ static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { ...@@ -30,18 +30,18 @@ static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
if (dndGetStatus(pDnode) == DND_STAT_STOPPED) { if (dndGetStatus(pDnode) == DND_STAT_STOPPED) {
if (pRsp == NULL || pRsp->pCont == NULL) return; if (pRsp == NULL || pRsp->pCont == NULL) return;
dTrace("RPC %p, rsp:%s ignored since dnode exiting, app:%p", pRsp->handle, TMSG_INFO(msgType), pRsp->ahandle); dTrace("rsp:%s ignored since dnode exiting, app:%p", TMSG_INFO(msgType), pRsp->ahandle);
rpcFreeCont(pRsp->pCont); rpcFreeCont(pRsp->pCont);
return; return;
} }
SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)]; SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
if (pHandle->msgFp != NULL) { if (pHandle->msgFp != NULL) {
dTrace("RPC %p, rsp:%s will be processed by %s, code:0x%x app:%p", pRsp->handle, TMSG_INFO(msgType), dTrace("rsp:%s will be processed by %s, code:0x%x app:%p", TMSG_INFO(msgType), pHandle->pWrapper->name,
pHandle->pWrapper->name, pRsp->code & 0XFFFF, pRsp->ahandle); pRsp->code & 0XFFFF, pRsp->ahandle);
dndProcessRpcMsg(pHandle->pWrapper, pRsp, pEpSet); dndProcessRpcMsg(pHandle->pWrapper, pRsp, pEpSet);
} else { } else {
dError("RPC %p, rsp:%s not processed, app:%p", pRsp->handle, TMSG_INFO(msgType), pRsp->ahandle); dError("rsp:%s not processed, app:%p", TMSG_INFO(msgType), pRsp->ahandle);
rpcFreeCont(pRsp->pCont); rpcFreeCont(pRsp->pCont);
} }
} }
...@@ -258,12 +258,13 @@ int32_t dndInitMsgHandle(SDnode *pDnode) { ...@@ -258,12 +258,13 @@ int32_t dndInitMsgHandle(SDnode *pDnode) {
SMsgHandle *pHandle = &pMgmt->msgHandles[msgIndex]; SMsgHandle *pHandle = &pMgmt->msgHandles[msgIndex];
if (pHandle->msgFp != NULL) { if (pHandle->msgFp != NULL) {
dError("msg:%s, has multiple process nodes, prev node:%s, curr node:%s", tMsgInfo[msgIndex], dError("msg:%s has multiple process nodes, prev node:%s, curr node:%s", tMsgInfo[msgIndex],
pHandle->pWrapper->name, pWrapper->name); pHandle->pWrapper->name, pWrapper->name);
return -1; return -1;
} else { } else {
dDebug("msg:%s, will be processed by node:%s", tMsgInfo[msgIndex], pWrapper->name); dTrace("msg:%s will be processed by %s", tMsgInfo[msgIndex], pWrapper->name);
pHandle->msgFp = msgFp; pHandle->msgFp = msgFp;
pHandle->pWrapper = pWrapper;
} }
} }
} }
......
...@@ -16,9 +16,9 @@ ...@@ -16,9 +16,9 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dmFile.h" #include "dmFile.h"
static void dndPrintDnodes(SDnodeMgmt *pMgmt); static void dmPrintDnodes(SDnodeMgmt *pMgmt);
static bool dndIsEpChanged(SDnodeMgmt *pMgmt, const char *ep); static bool dmIsEpChanged(SDnodeMgmt *pMgmt, const char *ep);
static void dndResetDnodes(SDnodeMgmt *pMgmt, SArray *pDnodeEps); static void dmResetDnodes(SDnodeMgmt *pMgmt, SArray *pDnodeEps);
int32_t dmReadFile(SDnodeMgmt *pMgmt) { int32_t dmReadFile(SDnodeMgmt *pMgmt) {
int32_t code = TSDB_CODE_DND_DNODE_READ_FILE_ERROR; int32_t code = TSDB_CODE_DND_DNODE_READ_FILE_ERROR;
...@@ -130,14 +130,14 @@ int32_t dmReadFile(SDnodeMgmt *pMgmt) { ...@@ -130,14 +130,14 @@ int32_t dmReadFile(SDnodeMgmt *pMgmt) {
code = 0; code = 0;
dInfo("succcessed to read file %s", file); dInfo("succcessed to read file %s", file);
dndPrintDnodes(pMgmt); dmPrintDnodes(pMgmt);
PRASE_DNODE_OVER: PRASE_DNODE_OVER:
if (content != NULL) free(content); if (content != NULL) free(content);
if (root != NULL) cJSON_Delete(root); if (root != NULL) cJSON_Delete(root);
if (pFile != NULL) taosCloseFile(&pFile); if (pFile != NULL) taosCloseFile(&pFile);
if (dndIsEpChanged(pMgmt, pMgmt->pDnode->cfg.localEp)) { if (dmIsEpChanged(pMgmt, pMgmt->pDnode->cfg.localEp)) {
dError("localEp %s different with %s and need reconfigured", pMgmt->pDnode->cfg.localEp, file); dError("localEp %s different with %s and need reconfigured", pMgmt->pDnode->cfg.localEp, file);
return -1; return -1;
} }
...@@ -149,7 +149,7 @@ PRASE_DNODE_OVER: ...@@ -149,7 +149,7 @@ PRASE_DNODE_OVER:
taosArrayPush(pMgmt->pDnodeEps, &dnodeEp); taosArrayPush(pMgmt->pDnodeEps, &dnodeEp);
} }
dndResetDnodes(pMgmt, pMgmt->pDnodeEps); dmResetDnodes(pMgmt, pMgmt->pDnodeEps);
terrno = 0; terrno = 0;
return 0; return 0;
...@@ -218,12 +218,12 @@ void dmUpdateDnodeEps(SDnodeMgmt *pMgmt, SArray *pDnodeEps) { ...@@ -218,12 +218,12 @@ void dmUpdateDnodeEps(SDnodeMgmt *pMgmt, SArray *pDnodeEps) {
int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pMgmt->pDnodeEps); int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pMgmt->pDnodeEps);
if (numOfEps != numOfEpsOld) { if (numOfEps != numOfEpsOld) {
dndResetDnodes(pMgmt, pDnodeEps); dmResetDnodes(pMgmt, pDnodeEps);
dmWriteFile(pMgmt); dmWriteFile(pMgmt);
} else { } else {
int32_t size = numOfEps * sizeof(SDnodeEp); int32_t size = numOfEps * sizeof(SDnodeEp);
if (memcmp(pMgmt->pDnodeEps->pData, pDnodeEps->pData, size) != 0) { if (memcmp(pMgmt->pDnodeEps->pData, pDnodeEps->pData, size) != 0) {
dndResetDnodes(pMgmt, pDnodeEps); dmResetDnodes(pMgmt, pDnodeEps);
dmWriteFile(pMgmt); dmWriteFile(pMgmt);
} }
} }
...@@ -231,7 +231,7 @@ void dmUpdateDnodeEps(SDnodeMgmt *pMgmt, SArray *pDnodeEps) { ...@@ -231,7 +231,7 @@ void dmUpdateDnodeEps(SDnodeMgmt *pMgmt, SArray *pDnodeEps) {
taosWUnLockLatch(&pMgmt->latch); taosWUnLockLatch(&pMgmt->latch);
} }
static void dndResetDnodes(SDnodeMgmt *pMgmt, SArray *pDnodeEps) { static void dmResetDnodes(SDnodeMgmt *pMgmt, SArray *pDnodeEps) {
if (pMgmt->pDnodeEps != pDnodeEps) { if (pMgmt->pDnodeEps != pDnodeEps) {
SArray *tmp = pMgmt->pDnodeEps; SArray *tmp = pMgmt->pDnodeEps;
pMgmt->pDnodeEps = taosArrayDup(pDnodeEps); pMgmt->pDnodeEps = taosArrayDup(pDnodeEps);
...@@ -259,10 +259,10 @@ static void dndResetDnodes(SDnodeMgmt *pMgmt, SArray *pDnodeEps) { ...@@ -259,10 +259,10 @@ static void dndResetDnodes(SDnodeMgmt *pMgmt, SArray *pDnodeEps) {
taosHashPut(pMgmt->dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp)); taosHashPut(pMgmt->dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp));
} }
dndPrintDnodes(pMgmt); dmPrintDnodes(pMgmt);
} }
static void dndPrintDnodes(SDnodeMgmt *pMgmt) { static void dmPrintDnodes(SDnodeMgmt *pMgmt) {
int32_t numOfEps = (int32_t)taosArrayGetSize(pMgmt->pDnodeEps); int32_t numOfEps = (int32_t)taosArrayGetSize(pMgmt->pDnodeEps);
dDebug("print dnode ep list, num:%d", numOfEps); dDebug("print dnode ep list, num:%d", numOfEps);
for (int32_t i = 0; i < numOfEps; i++) { for (int32_t i = 0; i < numOfEps; i++) {
...@@ -271,7 +271,7 @@ static void dndPrintDnodes(SDnodeMgmt *pMgmt) { ...@@ -271,7 +271,7 @@ static void dndPrintDnodes(SDnodeMgmt *pMgmt) {
} }
} }
static bool dndIsEpChanged(SDnodeMgmt *pMgmt, const char *ep) { static bool dmIsEpChanged(SDnodeMgmt *pMgmt, const char *ep) {
bool changed = false; bool changed = false;
taosRLockLatch(&pMgmt->latch); taosRLockLatch(&pMgmt->latch);
......
...@@ -141,11 +141,11 @@ int32_t dmInit(SMgmtWrapper *pWrapper) { ...@@ -141,11 +141,11 @@ int32_t dmInit(SMgmtWrapper *pWrapper) {
} }
pWrapper->pMgmt = pMgmt; pWrapper->pMgmt = pMgmt;
dInfo("dnode-mgmt is initialized");
dndSetStatus(pDnode, DND_STAT_RUNNING); dndSetStatus(pDnode, DND_STAT_RUNNING);
dmSendStatusReq(pMgmt); dmSendStatusReq(pMgmt);
dndReportStartup(pDnode, "TDengine", "initialized successfully"); dndReportStartup(pDnode, "TDengine", "initialized successfully");
dInfo("dnode-mgmt is initialized");
return 0; return 0;
} }
......
...@@ -54,11 +54,11 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { ...@@ -54,11 +54,11 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527}; SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527};
pMgmt->statusSent = 1; pMgmt->statusSent = 1;
dTrace("pDnode:%p, send status req to mnode", pDnode); dTrace("send status req to mnode, ahandle:%p", rpcMsg.ahandle);
dndSendReqToMnode(pMgmt->pDnode, &rpcMsg); dndSendReqToMnode(pMgmt->pDnode, &rpcMsg);
} }
static void dndUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) { static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
if (pMgmt->dnodeId == 0) { if (pMgmt->dnodeId == 0) {
dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId); dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
taosWLockLatch(&pMgmt->latch); taosWLockLatch(&pMgmt->latch);
...@@ -70,7 +70,7 @@ static void dndUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) { ...@@ -70,7 +70,7 @@ static void dndUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
} }
void dmProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) { void dmProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) {
SDnodeMgmt *pMgmt = dndGetWrapper(pDnode, MNODE)->pMgmt; SDnodeMgmt *pMgmt = dndGetWrapper(pDnode, DNODE)->pMgmt;
if (pRsp->code != TSDB_CODE_SUCCESS) { if (pRsp->code != TSDB_CODE_SUCCESS) {
if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->dropped && pMgmt->dnodeId > 0) { if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->dropped && pMgmt->dnodeId > 0) {
...@@ -83,7 +83,7 @@ void dmProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) { ...@@ -83,7 +83,7 @@ void dmProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) {
if (pRsp->pCont != NULL && pRsp->contLen != 0 && if (pRsp->pCont != NULL && pRsp->contLen != 0 &&
tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) { tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
pMgmt->dver = statusRsp.dver; pMgmt->dver = statusRsp.dver;
dndUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg); dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
dmUpdateDnodeEps(pMgmt, statusRsp.pDnodeEps); dmUpdateDnodeEps(pMgmt, statusRsp.pDnodeEps);
} }
taosArrayDestroy(statusRsp.pDnodeEps); taosArrayDestroy(statusRsp.pDnodeEps);
......
...@@ -23,7 +23,7 @@ ...@@ -23,7 +23,7 @@
#include "smInt.h" #include "smInt.h"
#include "vmInt.h" #include "vmInt.h"
static void *dnodeThreadRoutine(void *param) { static void *dmThreadRoutine(void *param) {
SDnodeMgmt *pMgmt = param; SDnodeMgmt *pMgmt = param;
SDnode *pDnode = pMgmt->pDnode; SDnode *pDnode = pMgmt->pDnode;
int64_t lastStatusTime = taosGetTimestampMs(); int64_t lastStatusTime = taosGetTimestampMs();
...@@ -54,8 +54,10 @@ static void *dnodeThreadRoutine(void *param) { ...@@ -54,8 +54,10 @@ static void *dnodeThreadRoutine(void *param) {
} }
} }
static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) { static void dmProcessMgmtQueue(SDnode *pDnode, SNodeMsg *pNodeMsg) {
int32_t code = 0; int32_t code = 0;
SRpcMsg *pMsg = &pNodeMsg->rpcMsg;
dTrace("msg:%p, will be processed in mgmt queue", pNodeMsg);
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_DND_CREATE_MNODE: case TDMT_DND_CREATE_MNODE:
...@@ -127,21 +129,23 @@ static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) { ...@@ -127,21 +129,23 @@ static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) {
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL; pMsg->pCont = NULL;
taosFreeQitem(pMsg); taosFreeQitem(pNodeMsg);
} }
int32_t dmStartWorker(SDnodeMgmt *pMgmt) { int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
if (dndInitWorker(pMgmt->pDnode, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "dnode-mgmt", 1, 1, dndProcessMgmtQueue) != 0) { if (dndInitWorker(pMgmt->pDnode, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "dnode-mgmt", 1, 1, dmProcessMgmtQueue) !=
0) {
dError("failed to start dnode mgmt worker since %s", terrstr()); dError("failed to start dnode mgmt worker since %s", terrstr());
return -1; return -1;
} }
if (dndInitWorker(pMgmt->pDnode, &pMgmt->statusWorker, DND_WORKER_SINGLE, "dnode-status", 1, 1, dndProcessMgmtQueue) != 0) { if (dndInitWorker(pMgmt->pDnode, &pMgmt->statusWorker, DND_WORKER_SINGLE, "dnode-status", 1, 1,
dmProcessMgmtQueue) != 0) {
dError("failed to start dnode mgmt worker since %s", terrstr()); dError("failed to start dnode mgmt worker since %s", terrstr());
return -1; return -1;
} }
pMgmt->threadId = taosCreateThread(dnodeThreadRoutine, pMgmt); pMgmt->threadId = taosCreateThread(dmThreadRoutine, pMgmt);
if (pMgmt->threadId == NULL) { if (pMgmt->threadId == NULL) {
dError("failed to init dnode thread"); dError("failed to init dnode thread");
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -169,5 +173,6 @@ int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -169,5 +173,6 @@ int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
pWorker = &pMgmt->statusWorker; pWorker = &pMgmt->statusWorker;
} }
dTrace("msg:%p, will be written to worker %s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg, sizeof(SNodeMsg)); return dndWriteMsgToWorker(pWorker, pMsg, sizeof(SNodeMsg));
} }
\ No newline at end of file
...@@ -85,6 +85,7 @@ static int32_t dndRunDnode() { ...@@ -85,6 +85,7 @@ static int32_t dndRunDnode() {
int32_t code = dndRun(pDnode); int32_t code = dndRun(pDnode);
dInfo("start shutting down the TDengine service"); dInfo("start shutting down the TDengine service");
global.pDnode = NULL;
dndClose(pDnode); dndClose(pDnode);
dndCleanup(); dndCleanup();
taosCloseLog(); taosCloseLog();
......
...@@ -101,6 +101,8 @@ void osUpdate() { ...@@ -101,6 +101,8 @@ void osUpdate() {
} }
} }
void osCleanup() {}
bool osLogSpaceAvailable() { return tsLogSpace.reserved <= tsLogSpace.size.avail; } bool osLogSpaceAvailable() { return tsLogSpace.reserved <= tsLogSpace.size.avail; }
void osSetTimezone(const char *timezone) { taosSetSystemTimezone(tsTimezone, tsTimezone, &tsDaylight); } void osSetTimezone(const char *timezone) { taosSetSystemTimezone(tsTimezone, tsTimezone, &tsDaylight); }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册