提交 84261149 编写于 作者: S Shengliang Guan

refactor: multi process mode

上级 b7b18e47
...@@ -74,19 +74,9 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { ...@@ -74,19 +74,9 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
SMonVloadInfo vinfo = {0}; SMonVloadInfo vinfo = {0};
dmGetVnodeLoads(pMgmt, &vinfo); dmGetVnodeLoads(pMgmt, &vinfo);
req.pVloads = vinfo.pVloads; req.pVloads = vinfo.pVloads;
pMgmt->pData->unsyncedVgId = 0;
pMgmt->pData->vndState = TAOS_SYNC_STATE_LEADER;
for (int32_t i = 0; i < taosArrayGetSize(req.pVloads); ++i) {
SVnodeLoad *pLoad = taosArrayGet(req.pVloads, i);
if (pLoad->syncState != TAOS_SYNC_STATE_LEADER && pLoad->syncState != TAOS_SYNC_STATE_FOLLOWER) {
pMgmt->pData->unsyncedVgId = pLoad->vgId;
pMgmt->pData->vndState = pLoad->syncState;
}
}
SMonMloadInfo minfo = {0}; SMonMloadInfo minfo = {0};
dmGetMnodeLoads(pMgmt, &minfo); dmGetMnodeLoads(pMgmt, &minfo);
pMgmt->pData->mndState = minfo.load.syncState;
int32_t contLen = tSerializeSStatusReq(NULL, 0, &req); int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
void *pHead = rpcMallocCont(contLen); void *pHead = rpcMallocCont(contLen);
......
...@@ -41,30 +41,13 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { ...@@ -41,30 +41,13 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
} }
pMgmt->pDnode = pInput->pDnode; pMgmt->pDnode = pInput->pDnode;
pMgmt->pData = pInput->pData;
pMgmt->msgCb = pInput->msgCb; pMgmt->msgCb = pInput->msgCb;
pMgmt->path = pInput->path; pMgmt->path = pInput->path;
pMgmt->name = pInput->name; pMgmt->name = pInput->name;
pMgmt->processCreateNodeFp = pInput->processCreateNodeFp; pMgmt->processCreateNodeFp = pInput->processCreateNodeFp;
pMgmt->processDropNodeFp = pInput->processDropNodeFp; pMgmt->processDropNodeFp = pInput->processDropNodeFp;
pMgmt->isNodeRequiredFp = pInput->isNodeRequiredFp; pMgmt->isNodeRequiredFp = pInput->isNodeRequiredFp;
taosInitRWLatch(&pMgmt->pData->latch);
pMgmt->pData->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
if (pMgmt->pData->dnodeHash == NULL) {
dError("failed to init dnode hash");
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (dmReadEps(pMgmt->pData) != 0) {
dError("failed to read file since %s", terrstr());
return -1;
}
if (pMgmt->pData->dropped) {
dError("dnode will not start since its already dropped");
return -1;
}
if (dmStartWorker(pMgmt) != 0) { if (dmStartWorker(pMgmt) != 0) {
return -1; return -1;
...@@ -82,19 +65,7 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { ...@@ -82,19 +65,7 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
static void dmCloseMgmt(SDnodeMgmt *pMgmt) { static void dmCloseMgmt(SDnodeMgmt *pMgmt) {
dInfo("dnode-mgmt start to clean up"); dInfo("dnode-mgmt start to clean up");
dmStopWorker(pMgmt); dmStopWorker(pMgmt);
taosWLockLatch(&pMgmt->pData->latch);
if (pMgmt->pData->dnodeEps != NULL) {
taosArrayDestroy(pMgmt->pData->dnodeEps);
pMgmt->pData->dnodeEps = NULL;
}
if (pMgmt->pData->dnodeHash != NULL) {
taosHashCleanup(pMgmt->pData->dnodeHash);
pMgmt->pData->dnodeHash = NULL;
}
taosWUnLockLatch(&pMgmt->pData->latch);
taosMemoryFree(pMgmt); taosMemoryFree(pMgmt);
dInfo("dnode-mgmt is cleaned up"); dInfo("dnode-mgmt is cleaned up");
} }
......
...@@ -53,28 +53,48 @@ static int32_t dmInitVars(SDnode *pDnode, const SDnodeOpt *pOption) { ...@@ -53,28 +53,48 @@ static int32_t dmInitVars(SDnode *pDnode, const SDnodeOpt *pOption) {
dInfo("dnode will run in child-process mode, node:%s", pWrapper->name); dInfo("dnode will run in child-process mode, node:%s", pWrapper->name);
} }
pDnode->data.dnodeId = 0; SDnodeData *pData = &pDnode->data;
pDnode->data.clusterId = 0; pData->dnodeId = 0;
pDnode->data.dnodeVer = 0; pData->clusterId = 0;
pDnode->data.updateTime = 0; pData->dnodeVer = 0;
pDnode->data.rebootTime = taosGetTimestampMs(); pData->updateTime = 0;
pDnode->data.dropped = 0; pData->rebootTime = taosGetTimestampMs();
pDnode->data.localEp = strdup(pOption->localEp); pData->dropped = 0;
pDnode->data.localFqdn = strdup(pOption->localFqdn); pData->stopped = 0;
pDnode->data.firstEp = strdup(pOption->firstEp); pData->localEp = strdup(pOption->localEp);
pDnode->data.secondEp = strdup(pOption->secondEp); pData->localFqdn = strdup(pOption->localFqdn);
pDnode->data.serverPort = pOption->serverPort; pData->firstEp = strdup(pOption->firstEp);
pDnode->data.supportVnodes = pOption->numOfSupportVnodes; pData->secondEp = strdup(pOption->secondEp);
pDnode->data.numOfDisks = pOption->numOfDisks; pData->supportVnodes = pOption->numOfSupportVnodes;
pDnode->data.disks = pOption->disks; pData->serverPort = pOption->serverPort;
pDnode->data.dataDir = strdup(pOption->dataDir); pData->numOfDisks = pOption->numOfDisks;
pData->disks = pOption->disks;
if (pDnode->data.dataDir == NULL || pDnode->data.localEp == NULL || pDnode->data.localFqdn == NULL || pData->dataDir = strdup(pOption->dataDir);
pDnode->data.firstEp == NULL || pDnode->data.secondEp == NULL) {
if (pData->dataDir == NULL || pData->localEp == NULL || pData->localFqdn == NULL ||
pData->firstEp == NULL || pData->secondEp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
pData->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
if (pData->dnodeHash == NULL) {
dError("failed to init dnode hash");
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (dmReadEps(pData) != 0) {
dError("failed to read file since %s", terrstr());
return -1;
}
if (pData->dropped) {
dError("dnode will not start since its already dropped");
return -1;
}
taosInitRWLatch(&pData->latch);
taosThreadMutexInit(&pDnode->mutex, NULL); taosThreadMutexInit(&pDnode->mutex, NULL);
return 0; return 0;
} }
...@@ -90,11 +110,23 @@ static void dmClearVars(SDnode *pDnode) { ...@@ -90,11 +110,23 @@ static void dmClearVars(SDnode *pDnode) {
pDnode->lockfile = NULL; pDnode->lockfile = NULL;
} }
taosMemoryFreeClear(pDnode->data.localEp); SDnodeData *pData = &pDnode->data;
taosMemoryFreeClear(pDnode->data.localFqdn); taosWLockLatch(&pData->latch);
taosMemoryFreeClear(pDnode->data.firstEp); if (pData->dnodeEps != NULL) {
taosMemoryFreeClear(pDnode->data.secondEp); taosArrayDestroy(pData->dnodeEps);
taosMemoryFreeClear(pDnode->data.dataDir); pData->dnodeEps = NULL;
}
if (pData->dnodeHash != NULL) {
taosHashCleanup(pData->dnodeHash);
pData->dnodeHash = NULL;
}
taosWUnLockLatch(&pData->latch);
taosMemoryFreeClear(pData->localEp);
taosMemoryFreeClear(pData->localFqdn);
taosMemoryFreeClear(pData->firstEp);
taosMemoryFreeClear(pData->secondEp);
taosMemoryFreeClear(pData->dataDir);
taosThreadMutexDestroy(&pDnode->mutex); taosThreadMutexDestroy(&pDnode->mutex);
memset(&pDnode->mutex, 0, sizeof(pDnode->mutex)); memset(&pDnode->mutex, 0, sizeof(pDnode->mutex));
...@@ -163,7 +195,7 @@ SDnode *dmCreate(const SDnodeOpt *pOption) { ...@@ -163,7 +195,7 @@ SDnode *dmCreate(const SDnodeOpt *pOption) {
goto _OVER; goto _OVER;
} }
if (OnlyInSingleProc(pDnode->ptype) && InParentProc(pDnode->ptype)) { if (OnlyInSingleProc(pDnode->ptype) || InParentProc(pDnode->ptype)) {
pDnode->lockfile = dmCheckRunning(pOption->dataDir); pDnode->lockfile = dmCheckRunning(pOption->dataDir);
if (pDnode->lockfile == NULL) { if (pDnode->lockfile == NULL) {
goto _OVER; goto _OVER;
......
...@@ -90,9 +90,6 @@ typedef struct { ...@@ -90,9 +90,6 @@ typedef struct {
int64_t dnodeVer; int64_t dnodeVer;
int64_t updateTime; int64_t updateTime;
int64_t rebootTime; int64_t rebootTime;
int32_t unsyncedVgId;
ESyncState vndState;
ESyncState mndState;
bool dropped; bool dropped;
bool stopped; bool stopped;
SEpSet mnodeEps; SEpSet mnodeEps;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册