提交 c049678a 编写于 作者: S Shengliang Guan

TD-11265 fix deadlock while quit taosd

上级 7330b0f8
...@@ -22,10 +22,13 @@ ...@@ -22,10 +22,13 @@
extern "C" { extern "C" {
#endif #endif
typedef struct SWorkerPool SWorkerPool;
typedef struct SMWorkerPool SMWorkerPool;
typedef struct SWorker { typedef struct SWorker {
int32_t id; // worker ID int32_t id; // worker ID
pthread_t thread; // thread pthread_t thread; // thread
struct SWorkerPool *pool; SWorkerPool *pool;
} SWorker; } SWorker;
typedef struct SWorkerPool { typedef struct SWorkerPool {
...@@ -39,11 +42,11 @@ typedef struct SWorkerPool { ...@@ -39,11 +42,11 @@ typedef struct SWorkerPool {
} SWorkerPool; } SWorkerPool;
typedef struct SMWorker { typedef struct SMWorker {
int32_t id; // worker id int32_t id; // worker id
pthread_t thread; // thread pthread_t thread; // thread
taos_qall qall; taos_qall qall;
taos_qset qset; // queue set taos_qset qset; // queue set
struct SMWorkerPool *pool; SMWorkerPool *pool;
} SMWorker; } SMWorker;
typedef struct SMWorkerPool { typedef struct SMWorkerPool {
......
...@@ -30,7 +30,10 @@ static struct { ...@@ -30,7 +30,10 @@ static struct {
char configDir[PATH_MAX]; char configDir[PATH_MAX];
} global = {0}; } global = {0};
void dmnSigintHandle(int signum, void *info, void *ctx) { global.stop = true; } void dmnSigintHandle(int signum, void *info, void *ctx) {
uError("singal:%d is received", signum);
global.stop = true;
}
void dmnSetSignalHandle() { void dmnSetSignalHandle() {
taosSetSignal(SIGTERM, dmnSigintHandle); taosSetSignal(SIGTERM, dmnSigintHandle);
......
...@@ -74,31 +74,31 @@ typedef struct { ...@@ -74,31 +74,31 @@ typedef struct {
int8_t replica; int8_t replica;
int8_t selfIndex; int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];
SWorkerPool mgmtPool; char *file;
SWorkerPool readPool; SMnode *pMnode;
SWorkerPool writePool; SRWLatch latch;
SWorkerPool syncPool;
taos_queue pReadQ; taos_queue pReadQ;
taos_queue pWriteQ; taos_queue pWriteQ;
taos_queue pApplyQ; taos_queue pApplyQ;
taos_queue pSyncQ; taos_queue pSyncQ;
taos_queue pMgmtQ; taos_queue pMgmtQ;
char *file; SWorkerPool mgmtPool;
SMnode *pMnode; SWorkerPool readPool;
SRWLatch latch; SWorkerPool writePool;
SWorkerPool syncPool;
} SMnodeMgmt; } SMnodeMgmt;
typedef struct { typedef struct {
SHashObj *hash; SHashObj *hash;
int32_t openVnodes;
int32_t totalVnodes;
SRWLatch latch;
taos_queue pMgmtQ;
SWorkerPool mgmtPool; SWorkerPool mgmtPool;
SWorkerPool queryPool; SWorkerPool queryPool;
SWorkerPool fetchPool; SWorkerPool fetchPool;
SMWorkerPool syncPool; SMWorkerPool syncPool;
SMWorkerPool writePool; SMWorkerPool writePool;
taos_queue pMgmtQ;
int32_t openVnodes;
int32_t totalVnodes;
SRWLatch latch;
} SVnodesMgmt; } SVnodesMgmt;
typedef struct { typedef struct {
......
...@@ -294,14 +294,14 @@ static void dndStopMnodeWorker(SDnode *pDnode) { ...@@ -294,14 +294,14 @@ static void dndStopMnodeWorker(SDnode *pDnode) {
while (!taosQueueEmpty(pMgmt->pWriteQ)) taosMsleep(10); while (!taosQueueEmpty(pMgmt->pWriteQ)) taosMsleep(10);
while (!taosQueueEmpty(pMgmt->pSyncQ)) taosMsleep(10); while (!taosQueueEmpty(pMgmt->pSyncQ)) taosMsleep(10);
dndCleanupMnodeReadWorker(pDnode);
dndCleanupMnodeWriteWorker(pDnode);
dndCleanupMnodeSyncWorker(pDnode);
dndFreeMnodeReadQueue(pDnode); dndFreeMnodeReadQueue(pDnode);
dndFreeMnodeWriteQueue(pDnode); dndFreeMnodeWriteQueue(pDnode);
dndFreeMnodeApplyQueue(pDnode); dndFreeMnodeApplyQueue(pDnode);
dndFreeMnodeSyncQueue(pDnode); dndFreeMnodeSyncQueue(pDnode);
dndCleanupMnodeReadWorker(pDnode);
dndCleanupMnodeWriteWorker(pDnode);
dndCleanupMnodeSyncWorker(pDnode);
} }
static bool dndNeedDeployMnode(SDnode *pDnode) { static bool dndNeedDeployMnode(SDnode *pDnode) {
...@@ -714,6 +714,7 @@ static int32_t dndInitMnodeMgmtWorker(SDnode *pDnode) { ...@@ -714,6 +714,7 @@ static int32_t dndInitMnodeMgmtWorker(SDnode *pDnode) {
static void dndCleanupMnodeMgmtWorker(SDnode *pDnode) { static void dndCleanupMnodeMgmtWorker(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
tWorkerCleanup(&pMgmt->mgmtPool); tWorkerCleanup(&pMgmt->mgmtPool);
dDebug("mnode mgmt worker is stopped");
} }
static int32_t dndAllocMnodeReadQueue(SDnode *pDnode) { static int32_t dndAllocMnodeReadQueue(SDnode *pDnode) {
...@@ -750,6 +751,7 @@ static int32_t dndInitMnodeReadWorker(SDnode *pDnode) { ...@@ -750,6 +751,7 @@ static int32_t dndInitMnodeReadWorker(SDnode *pDnode) {
static void dndCleanupMnodeReadWorker(SDnode *pDnode) { static void dndCleanupMnodeReadWorker(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
tWorkerCleanup(&pMgmt->readPool); tWorkerCleanup(&pMgmt->readPool);
dDebug("mnode read worker is stopped");
} }
static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode) { static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode) {
...@@ -803,6 +805,7 @@ static int32_t dndInitMnodeWriteWorker(SDnode *pDnode) { ...@@ -803,6 +805,7 @@ static int32_t dndInitMnodeWriteWorker(SDnode *pDnode) {
static void dndCleanupMnodeWriteWorker(SDnode *pDnode) { static void dndCleanupMnodeWriteWorker(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
tWorkerCleanup(&pMgmt->writePool); tWorkerCleanup(&pMgmt->writePool);
dDebug("mnode write worker is stopped");
} }
static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode) { static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode) {
...@@ -839,6 +842,7 @@ static int32_t dndInitMnodeSyncWorker(SDnode *pDnode) { ...@@ -839,6 +842,7 @@ static int32_t dndInitMnodeSyncWorker(SDnode *pDnode) {
static void dndCleanupMnodeSyncWorker(SDnode *pDnode) { static void dndCleanupMnodeSyncWorker(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
tWorkerCleanup(&pMgmt->syncPool); tWorkerCleanup(&pMgmt->syncPool);
dDebug("mnode sync worker is stopped");
} }
int32_t dndInitMnode(SDnode *pDnode) { int32_t dndInitMnode(SDnode *pDnode) {
......
...@@ -160,6 +160,7 @@ static int32_t dndInitClient(SDnode *pDnode) { ...@@ -160,6 +160,7 @@ static int32_t dndInitClient(SDnode *pDnode) {
rpcInit.user = INTERNAL_USER; rpcInit.user = INTERNAL_USER;
rpcInit.ckey = INTERNAL_CKEY; rpcInit.ckey = INTERNAL_CKEY;
rpcInit.secret = INTERNAL_SECRET; rpcInit.secret = INTERNAL_SECRET;
rpcInit.parent = pDnode;
pMgmt->clientRpc = rpcOpen(&rpcInit); pMgmt->clientRpc = rpcOpen(&rpcInit);
if (pMgmt->clientRpc == NULL) { if (pMgmt->clientRpc == NULL) {
...@@ -167,6 +168,7 @@ static int32_t dndInitClient(SDnode *pDnode) { ...@@ -167,6 +168,7 @@ static int32_t dndInitClient(SDnode *pDnode) {
return -1; return -1;
} }
dDebug("dnode rpc client is initialized");
return 0; return 0;
} }
...@@ -175,7 +177,7 @@ static void dndCleanupClient(SDnode *pDnode) { ...@@ -175,7 +177,7 @@ static void dndCleanupClient(SDnode *pDnode) {
if (pMgmt->clientRpc) { if (pMgmt->clientRpc) {
rpcClose(pMgmt->clientRpc); rpcClose(pMgmt->clientRpc);
pMgmt->clientRpc = NULL; pMgmt->clientRpc = NULL;
dInfo("dnode peer rpc client is closed"); dDebug("dnode rpc client is closed");
} }
} }
...@@ -315,6 +317,7 @@ static int32_t dndInitServer(SDnode *pDnode) { ...@@ -315,6 +317,7 @@ static int32_t dndInitServer(SDnode *pDnode) {
rpcInit.connType = TAOS_CONN_SERVER; rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = pDnode->opt.shellActivityTimer * 1000; rpcInit.idleTime = pDnode->opt.shellActivityTimer * 1000;
rpcInit.afp = dndRetrieveUserAuthInfo; rpcInit.afp = dndRetrieveUserAuthInfo;
rpcInit.parent = pDnode;
pMgmt->serverRpc = rpcOpen(&rpcInit); pMgmt->serverRpc = rpcOpen(&rpcInit);
if (pMgmt->serverRpc == NULL) { if (pMgmt->serverRpc == NULL) {
...@@ -322,6 +325,7 @@ static int32_t dndInitServer(SDnode *pDnode) { ...@@ -322,6 +325,7 @@ static int32_t dndInitServer(SDnode *pDnode) {
return -1; return -1;
} }
dDebug("dnode rpc server is initialized");
return 0; return 0;
} }
...@@ -330,6 +334,7 @@ static void dndCleanupServer(SDnode *pDnode) { ...@@ -330,6 +334,7 @@ static void dndCleanupServer(SDnode *pDnode) {
if (pMgmt->serverRpc) { if (pMgmt->serverRpc) {
rpcClose(pMgmt->serverRpc); rpcClose(pMgmt->serverRpc);
pMgmt->serverRpc = NULL; pMgmt->serverRpc = NULL;
dDebug("dnode rpc server is closed");
} }
} }
...@@ -347,6 +352,7 @@ int32_t dndInitTrans(SDnode *pDnode) { ...@@ -347,6 +352,7 @@ int32_t dndInitTrans(SDnode *pDnode) {
} }
void dndCleanupTrans(SDnode *pDnode) { void dndCleanupTrans(SDnode *pDnode) {
dInfo("dnode-transport start to clean up");
dndCleanupServer(pDnode); dndCleanupServer(pDnode);
dndCleanupClient(pDnode); dndCleanupClient(pDnode);
dInfo("dnode-transport is cleaned up"); dInfo("dnode-transport is cleaned up");
......
...@@ -197,7 +197,7 @@ SDnode *dndInit(SDnodeOpt *pOption) { ...@@ -197,7 +197,7 @@ SDnode *dndInit(SDnodeOpt *pOption) {
dndReportStartup(pDnode, "TDengine", "initialized successfully"); dndReportStartup(pDnode, "TDengine", "initialized successfully");
dInfo("TDengine is initialized successfully"); dInfo("TDengine is initialized successfully");
return 0; return pDnode;
} }
void dndCleanup(SDnode *pDnode) { void dndCleanup(SDnode *pDnode) {
......
...@@ -107,7 +107,7 @@ bool taosQueueEmpty(taos_queue param) { ...@@ -107,7 +107,7 @@ bool taosQueueEmpty(taos_queue param) {
if (queue->head == NULL && queue->tail == NULL) { if (queue->head == NULL && queue->tail == NULL) {
empty = true; empty = true;
} }
pthread_mutex_destroy(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
return empty; return empty;
} }
......
...@@ -50,7 +50,7 @@ void tWorkerCleanup(SWorkerPool *pool) { ...@@ -50,7 +50,7 @@ void tWorkerCleanup(SWorkerPool *pool) {
} }
} }
free(pool->workers); tfree(pool->workers);
taosCloseQset(pool->qset); taosCloseQset(pool->qset);
pthread_mutex_destroy(&pool->mutex); pthread_mutex_destroy(&pool->mutex);
...@@ -159,7 +159,7 @@ void tMWorkerCleanup(SMWorkerPool *pool) { ...@@ -159,7 +159,7 @@ void tMWorkerCleanup(SMWorkerPool *pool) {
} }
} }
free(pool->workers); tfree(pool->workers);
pthread_mutex_destroy(&pool->mutex); pthread_mutex_destroy(&pool->mutex);
uInfo("worker:%s is closed", pool->name); uInfo("worker:%s is closed", pool->name);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册