提交 fc41f43c 编写于 作者: S Shengliang Guan

fix: deadlock while drop db

上级 a0983cf8
...@@ -28,6 +28,7 @@ typedef struct SWWorkerPool SWWorkerPool; ...@@ -28,6 +28,7 @@ typedef struct SWWorkerPool SWWorkerPool;
typedef struct SQWorker { typedef struct SQWorker {
int32_t id; // worker ID int32_t id; // worker ID
TdThread thread; // thread TdThread thread; // thread
int64_t pid;
SQWorkerPool *pool; SQWorkerPool *pool;
} SQWorker; } SQWorker;
...@@ -44,6 +45,7 @@ typedef struct SQWorkerPool { ...@@ -44,6 +45,7 @@ typedef struct SQWorkerPool {
typedef struct SWWorker { typedef struct SWWorker {
int32_t id; // worker id int32_t id; // worker id
TdThread thread; // thread TdThread thread; // thread
int64_t pid;
STaosQall *qall; STaosQall *qall;
STaosQset *qset; // queue set STaosQset *qset; // queue set
SWWorkerPool *pool; SWWorkerPool *pool;
......
...@@ -79,35 +79,43 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { ...@@ -79,35 +79,43 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
char path[TSDB_FILENAME_LEN] = {0}; char path[TSDB_FILENAME_LEN] = {0};
vnodePreClose(pVnode->pImpl);
taosThreadRwlockWrlock(&pMgmt->lock); taosThreadRwlockWrlock(&pMgmt->lock);
taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t)); taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
taosThreadRwlockUnlock(&pMgmt->lock); taosThreadRwlockUnlock(&pMgmt->lock);
vmReleaseVnode(pMgmt, pVnode); vmReleaseVnode(pMgmt, pVnode);
dInfo("vgId:%d, pre close", pVnode->vgId);
vnodePreClose(pVnode->pImpl);
dInfo("vgId:%d, wait for vnode ref become 0", pVnode->vgId); dInfo("vgId:%d, wait for vnode ref become 0", pVnode->vgId);
while (pVnode->refCount > 0) taosMsleep(10); while (pVnode->refCount > 0) taosMsleep(10);
dInfo("vgId:%d, wait for vnode write queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteQ, dInfo("vgId:%d, wait for vnode write queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteQ,
pVnode->pWriteQ->threadId); pVnode->pWriteQ->threadId);
while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10);
dInfo("vgId:%d, wait for vnode sync queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncQ, dInfo("vgId:%d, wait for vnode sync queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncQ,
pVnode->pWriteQ->threadId); pVnode->pSyncQ->threadId);
while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10);
dInfo("vgId:%d, wait for vnode sync ctrl queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncCtrlQ, dInfo("vgId:%d, wait for vnode sync ctrl queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncCtrlQ,
pVnode->pWriteQ->threadId); pVnode->pSyncCtrlQ->threadId);
while (!taosQueueEmpty(pVnode->pSyncCtrlQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pSyncCtrlQ)) taosMsleep(10);
dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyQ, dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyQ,
pVnode->pWriteQ->threadId); pVnode->pApplyQ->threadId);
while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10);
dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ); dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ);
while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
dInfo("vgId:%d, wait for vnode fetch queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ, dInfo("vgId:%d, wait for vnode fetch queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
pVnode->pWriteQ->threadId); pVnode->pFetchQ->threadId);
while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
dInfo("vgId:%d, wait for vnode stream queue:%p is empty", pVnode->vgId, pVnode->pStreamQ); dInfo("vgId:%d, wait for vnode stream queue:%p is empty", pVnode->vgId, pVnode->pStreamQ);
while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10);
dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId); dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);
vmFreeQueue(pMgmt, pVnode); vmFreeQueue(pMgmt, pVnode);
......
...@@ -97,6 +97,7 @@ bool vnodeShouldRollback(SVnode* pVnode); ...@@ -97,6 +97,7 @@ bool vnodeShouldRollback(SVnode* pVnode);
// vnodeSync.c // vnodeSync.c
int32_t vnodeSyncOpen(SVnode* pVnode, char* path); int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
void vnodeSyncStart(SVnode* pVnode); void vnodeSyncStart(SVnode* pVnode);
void vnodeSyncPreClose(SVnode* pVnode);
void vnodeSyncClose(SVnode* pVnode); void vnodeSyncClose(SVnode* pVnode);
void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg); void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg);
bool vnodeIsLeader(SVnode* pVnode); bool vnodeIsLeader(SVnode* pVnode);
......
...@@ -242,12 +242,7 @@ _err: ...@@ -242,12 +242,7 @@ _err:
return NULL; return NULL;
} }
void vnodePreClose(SVnode *pVnode) { void vnodePreClose(SVnode *pVnode) { vnodeSyncPreClose(pVnode); }
if (pVnode) {
syncLeaderTransfer(pVnode->sync);
syncPreStop(pVnode->sync);
}
}
void vnodeClose(SVnode *pVnode) { void vnodeClose(SVnode *pVnode) {
if (pVnode) { if (pVnode) {
......
...@@ -474,12 +474,25 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { ...@@ -474,12 +474,25 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
} }
void vnodeSyncStart(SVnode *pVnode) { void vnodeSyncStart(SVnode *pVnode) {
vDebug("vgId:%d, start sync", pVnode->config.vgId); vInfo("vgId:%d, start sync", pVnode->config.vgId);
syncStart(pVnode->sync); syncStart(pVnode->sync);
} }
void vnodeSyncPreClose(SVnode *pVnode) {
vInfo("vgId:%d, pre close sync", pVnode->config.vgId);
syncLeaderTransfer(pVnode->sync);
syncPreStop(pVnode->sync);
taosThreadMutexLock(&pVnode->lock);
if (pVnode->blocked) {
vInfo("vgId:%d, post block after close sync", pVnode->config.vgId);
pVnode->blocked = false;
tsem_post(&pVnode->syncSem);
}
taosThreadMutexUnlock(&pVnode->lock);
}
void vnodeSyncClose(SVnode *pVnode) { void vnodeSyncClose(SVnode *pVnode) {
vDebug("vgId:%d, close sync", pVnode->config.vgId); vInfo("vgId:%d, close sync", pVnode->config.vgId);
syncStop(pVnode->sync); syncStop(pVnode->sync);
} }
......
...@@ -73,12 +73,13 @@ static void *tQWorkerThreadFp(SQWorker *worker) { ...@@ -73,12 +73,13 @@ static void *tQWorkerThreadFp(SQWorker *worker) {
taosBlockSIGPIPE(); taosBlockSIGPIPE();
setThreadName(pool->name); setThreadName(pool->name);
uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, taosGetSelfPthreadId()); worker->pid = taosGetSelfPthreadId();
uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
while (1) { while (1) {
if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) { if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset, uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
taosGetSelfPthreadId()); worker->pid);
break; break;
} }
...@@ -125,7 +126,7 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) { ...@@ -125,7 +126,7 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
} }
taosThreadMutexUnlock(&pool->mutex); taosThreadMutexUnlock(&pool->mutex);
uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle); uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
return queue; return queue;
} }
...@@ -192,13 +193,14 @@ static void *tWWorkerThreadFp(SWWorker *worker) { ...@@ -192,13 +193,14 @@ static void *tWWorkerThreadFp(SWWorker *worker) {
taosBlockSIGPIPE(); taosBlockSIGPIPE();
setThreadName(pool->name); setThreadName(pool->name);
uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, taosGetSelfPthreadId()); worker->pid = taosGetSelfPthreadId();
uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
while (1) { while (1) {
numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &qinfo); numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &qinfo);
if (numOfMsgs == 0) { if (numOfMsgs == 0) {
uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, worker->qset, uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, worker->qset,
taosGetSelfPthreadId()); worker->pid);
break; break;
} }
...@@ -246,8 +248,9 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) { ...@@ -246,8 +248,9 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
pool->nextId = (pool->nextId + 1) % pool->max; pool->nextId = (pool->nextId + 1) % pool->max;
} }
queue->threadId = taosGetPthreadId(worker->thread); while (worker->pid <= 0) taosMsleep(10);
uDebug("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle, queue->threadId); queue->threadId = worker->pid;
uInfo("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle, queue->threadId);
code = 0; code = 0;
_OVER: _OVER:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册