提交 8b877865 编写于 作者: S Shengliang Guan

fix(cluster): create qnode in multi-process mode may crash

上级 6958f3ab
......@@ -275,19 +275,24 @@ static void dmProcessProcHandle(void *handle) {
}
static void dmWatchNodes(SDnode *pDnode) {
taosThreadMutexLock(&pDnode->mutex);
if (pDnode->ptype == DND_PROC_PARENT) {
for (EDndNodeType n = DNODE + 1; n < NODE_END; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
if (!pWrapper->required) continue;
if (pWrapper->procType != DND_PROC_PARENT) continue;
if (pDnode->ntype == NODE_END) continue;
if (pWrapper->procId <= 0 || !taosProcExist(pWrapper->procId)) {
dWarn("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId);
taosProcCloseHandles(pWrapper->procObj, dmProcessProcHandle);
if (pWrapper->procObj) {
taosProcCloseHandles(pWrapper->procObj, dmProcessProcHandle);
}
dmNewNodeProc(pWrapper, n);
}
}
}
taosThreadMutexUnlock(&pDnode->mutex);
}
int32_t dmRun(SDnode *pDnode) {
......
......@@ -116,7 +116,7 @@ int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMs
return -1;
}
taosWLockLatch(&pDnode->wrapperLock);
taosThreadMutexLock(&pDnode->mutex);
pWrapper = &pDnode->wrappers[ntype];
if (taosMkDir(pWrapper->path) != 0) {
......@@ -132,10 +132,11 @@ int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMs
dDebug("node:%s, has been created", pWrapper->name);
pWrapper->required = true;
pWrapper->deployed = true;
pWrapper->procType = pDnode->ptype;
(void)dmOpenNode(pWrapper);
}
taosWUnLockLatch(&pDnode->wrapperLock);
taosThreadMutexUnlock(&pDnode->mutex);
return code;
}
......@@ -147,24 +148,24 @@ int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg)
return -1;
}
taosWLockLatch(&pWrapper->latch);
taosThreadMutexLock(&pDnode->mutex);
int32_t code = (*pWrapper->fp.dropFp)(pWrapper, pMsg);
if (code != 0) {
dError("node:%s, failed to drop since %s", pWrapper->name, terrstr());
} else {
dDebug("node:%s, has been dropped", pWrapper->name);
pWrapper->required = false;
pWrapper->deployed = false;
taosRemoveDir(pWrapper->path);
}
taosWUnLockLatch(&pWrapper->latch);
dmReleaseWrapper(pWrapper);
if (code == 0) {
dmCloseNode(pWrapper);
pWrapper->required = false;
pWrapper->deployed = false;
taosRemoveDir(pWrapper->path);
}
taosThreadMutexUnlock(&pDnode->mutex);
return code;
}
......
......@@ -48,7 +48,7 @@ static int32_t dmInitVars(SDnode *pDnode, const SDnodeOpt *pOption) {
}
taosInitRWLatch(&pDnode->data.latch);
taosInitRWLatch(&pDnode->wrapperLock);
taosThreadMutexInit(&pDnode->mutex, NULL);
return 0;
}
......@@ -67,6 +67,8 @@ static void dmClearVars(SDnode *pDnode) {
taosMemoryFreeClear(pDnode->data.firstEp);
taosMemoryFreeClear(pDnode->data.secondEp);
taosMemoryFreeClear(pDnode->data.dataDir);
taosThreadMutexDestroy(&pDnode->mutex);
memset(&pDnode->mutex, 0, sizeof(pDnode->mutex));
taosMemoryFree(pDnode);
dDebug("dnode memory is cleared, data:%p", pDnode);
}
......
......@@ -140,7 +140,7 @@ typedef struct SDnode {
SStartupReq startup;
SDnodeTrans trans;
SDnodeData data;
SRWLatch wrapperLock;
TdThreadMutex mutex;
SMgmtWrapper wrappers[NODE_END];
} SDnode;
......
......@@ -147,8 +147,8 @@ static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleR
static void uvDestroyConn(uv_handle_t* handle);
// server and worker thread
static void* workerThread(void* arg);
static void* acceptThread(void* arg);
static void* transWorkerThread(void* arg);
static void* transAcceptThread(void* arg);
// add handle loop
static bool addHandleToWorkloop(void* arg);
......@@ -538,7 +538,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
}
}
void* acceptThread(void* arg) {
void* transAcceptThread(void* arg) {
// opt
setThreadName("trans-accept");
SServerObj* srv = (SServerObj*)arg;
......@@ -596,7 +596,7 @@ static bool addHandleToAcceptloop(void* arg) {
}
return true;
}
void* workerThread(void* arg) {
void* transWorkerThread(void* arg) {
setThreadName("trans-worker");
SWorkThrdObj* pThrd = (SWorkThrdObj*)arg;
uv_run(pThrd->loop, UV_RUN_DEFAULT);
......@@ -686,7 +686,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
if (false == addHandleToWorkloop(thrd)) {
goto End;
}
int err = taosThreadCreate(&(thrd->thread), NULL, workerThread, (void*)(thrd));
int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd));
if (err == 0) {
tDebug("sucess to create worker-thread %d", i);
// printf("thread %d create\n", i);
......@@ -698,7 +698,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
if (false == addHandleToAcceptloop(srv)) {
goto End;
}
int err = taosThreadCreate(&srv->thread, NULL, acceptThread, (void*)srv);
int err = taosThreadCreate(&srv->thread, NULL, transAcceptThread, (void*)srv);
if (err == 0) {
tDebug("success to create accept-thread");
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册