diff --git a/include/util/tqueue.h b/include/util/tqueue.h index da409a90bb96c2b19ad081c4599a9fa75de1ad4e..8b46bbd0642366d9dce26f406939b3c3112cb97e 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -59,6 +59,47 @@ typedef enum { typedef void (*FItem)(SQueueInfo *pInfo, void *pItem); typedef void (*FItems)(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfItems); +typedef struct STaosQnode STaosQnode; + +typedef struct STaosQnode { + STaosQnode *next; + STaosQueue *queue; + int64_t timestamp; + int32_t size; + int8_t itype; + int8_t reserved[3]; + char item[]; +} STaosQnode; + +typedef struct STaosQueue { + STaosQnode *head; + STaosQnode *tail; + STaosQueue *next; // for queue set + STaosQset *qset; // for queue set + void *ahandle; // for queue set + FItem itemFp; + FItems itemsFp; + TdThreadMutex mutex; + int64_t memOfItems; + int32_t numOfItems; + int64_t threadId; +} STaosQueue; + +typedef struct STaosQset { + STaosQueue *head; + STaosQueue *current; + TdThreadMutex mutex; + tsem_t sem; + int32_t numOfQueues; + int32_t numOfItems; +} STaosQset; + +typedef struct STaosQall { + STaosQnode *current; + STaosQnode *start; + int32_t numOfItems; +} STaosQall; + STaosQueue *taosOpenQueue(); void taosCloseQueue(STaosQueue *queue); void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index f36604eb27a299822bb57d578256a1b590762ba0..16cfa480b16117f8e93924ab6bc9669438934e74 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -86,22 +86,34 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { taosThreadRwlockUnlock(&pMgmt->lock); vmReleaseVnode(pMgmt, pVnode); - dTrace("vgId:%d, wait for vnode ref become 0", pVnode->vgId); + dInfo("vgId:%d, wait for vnode ref become 0", pVnode->vgId); while (pVnode->refCount > 0) taosMsleep(10); - dTrace("vgId:%d, wait for vnode queue is empty", pVnode->vgId); + dInfo("vgId:%d, wait for vnode write queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteQ, + pVnode->pWriteQ->threadId); while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10); + dInfo("vgId:%d, wait for vnode sync queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncQ, + pVnode->pWriteQ->threadId); while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10); + dInfo("vgId:%d, wait for vnode sync ctrl queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncCtrlQ, + pVnode->pWriteQ->threadId); + while (!taosQueueEmpty(pVnode->pSyncCtrlQ)) taosMsleep(10); + dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyQ, + pVnode->pWriteQ->threadId); while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10); + dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ); while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10); + dInfo("vgId:%d, wait for vnode fetch queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ, + pVnode->pWriteQ->threadId); while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10); + dInfo("vgId:%d, wait for vnode stream queue:%p is empty", pVnode->vgId, pVnode->pStreamQ); while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10); - dTrace("vgId:%d, vnode queue is empty", pVnode->vgId); + dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId); vmFreeQueue(pMgmt, pVnode); vnodeClose(pVnode->pImpl); pVnode->pImpl = NULL; - dDebug("vgId:%d, vnode is closed", pVnode->vgId); + dInfo("vgId:%d, vnode is closed", pVnode->vgId); if (pVnode->dropped) { dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 40d0e32c2b43f2c7170e8a8a085e8214399999c2..a5070e09aa0ee2e280646237a42d5bc73c8c05da 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -320,12 +320,17 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { return -1; } - dDebug("vgId:%d, write-queue:%p is alloced", pVnode->vgId, pVnode->pWriteQ); - dDebug("vgId:%d, sync-queue:%p is alloced", pVnode->vgId, pVnode->pSyncQ); - dDebug("vgId:%d, apply-queue:%p is alloced", pVnode->vgId, pVnode->pApplyQ); - dDebug("vgId:%d, query-queue:%p is alloced", pVnode->vgId, pVnode->pQueryQ); - dDebug("vgId:%d, stream-queue:%p is alloced", pVnode->vgId, pVnode->pStreamQ); - dDebug("vgId:%d, fetch-queue:%p is alloced", pVnode->vgId, pVnode->pFetchQ); + dInfo("vgId:%d, write-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteQ, + pVnode->pWriteQ->threadId); + dInfo("vgId:%d, sync-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncQ, pVnode->pSyncQ->threadId); + dInfo("vgId:%d, sync-ctrl-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncCtrlQ, + pVnode->pSyncCtrlQ->threadId); + dInfo("vgId:%d, apply-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyQ, + pVnode->pApplyQ->threadId); + dInfo("vgId:%d, query-queue:%p is alloced", pVnode->vgId, pVnode->pQueryQ); + dInfo("vgId:%d, fetch-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ, + pVnode->pFetchQ->threadId); + dInfo("vgId:%d, stream-queue:%p is alloced", pVnode->vgId, pVnode->pStreamQ); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index e27ae074605005881c0cf4cfbd708fd23ecae1e1..7f3b1db7c3d76be502f2c546bcde8e28fb30bcb6 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -244,16 +244,6 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { - if (pMsg == NULL || pMsg->pCont == NULL) { - return -1; - } - - if (msgcb == NULL || msgcb->putToQueueFp == NULL) { - rpcFreeCont(pMsg->pCont); - pMsg->pCont = NULL; - return -1; - } - int32_t code = tmsgPutToQueue(msgcb, SYNC_CTRL_QUEUE, pMsg); if (code != 0) { rpcFreeCont(pMsg->pCont); @@ -263,16 +253,6 @@ static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { } static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { - if (pMsg == NULL || pMsg->pCont == NULL) { - return -1; - } - - if (msgcb == NULL || msgcb->putToQueueFp == NULL) { - rpcFreeCont(pMsg->pCont); - pMsg->pCont = NULL; - return -1; - } - int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); if (code != 0) { rpcFreeCont(pMsg->pCont); @@ -342,52 +322,26 @@ static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, cons TMSG_INFO(pMsg->msgType)); } -#define USE_TSDB_SNAPSHOT - static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) { -#ifdef USE_TSDB_SNAPSHOT SVnode *pVnode = pFsm->data; SSnapshotParam *pSnapshotParam = pParam; int32_t code = vnodeSnapReaderOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapReader **)ppReader); return code; -#else - *ppReader = taosMemoryMalloc(32); - return 0; -#endif } static int32_t vnodeSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) { -#ifdef USE_TSDB_SNAPSHOT SVnode *pVnode = pFsm->data; int32_t code = vnodeSnapReaderClose(pReader); return code; -#else - taosMemoryFree(pReader); - return 0; -#endif } static int32_t vnodeSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { -#ifdef USE_TSDB_SNAPSHOT SVnode *pVnode = pFsm->data; int32_t code = vnodeSnapRead(pReader, (uint8_t **)ppBuf, len); return code; -#else - static int32_t times = 0; - if (times++ < 5) { - *len = 64; - *ppBuf = taosMemoryMalloc(*len); - snprintf(*ppBuf, *len, "snapshot block %d", times); - } else { - *len = 0; - *ppBuf = NULL; - } - return 0; -#endif } static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) { -#ifdef USE_TSDB_SNAPSHOT SVnode *pVnode = pFsm->data; SSnapshotParam *pSnapshotParam = pParam; @@ -404,14 +358,9 @@ static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void int32_t code = vnodeSnapWriterOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapWriter **)ppWriter); return code; -#else - *ppWriter = taosMemoryMalloc(32); - return 0; -#endif } static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) { -#ifdef USE_TSDB_SNAPSHOT SVnode *pVnode = pFsm->data; vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, pVnode->config.vgId, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex); @@ -419,22 +368,14 @@ static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot); vInfo("vgId:%d, apply vnode snapshot finished, code:0x%x", pVnode->config.vgId, code); return code; -#else - taosMemoryFree(pWriter); - return 0; -#endif } static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { -#ifdef USE_TSDB_SNAPSHOT SVnode *pVnode = pFsm->data; vDebug("vgId:%d, continue write vnode snapshot, len:%d", pVnode->config.vgId, len); int32_t code = vnodeSnapWrite(pWriter, pBuf, len); vDebug("vgId:%d, continue write vnode snapshot finished, len:%d", pVnode->config.vgId, len); return code; -#else - return 0; -#endif } static void vnodeRestoreFinish(const SSyncFSM *pFsm) { @@ -461,7 +402,6 @@ static void vnodeBecomeFollower(const SSyncFSM *pFsm) { SVnode *pVnode = pFsm->data; vDebug("vgId:%d, become follower", pVnode->config.vgId); - // clear old leader resource taosThreadMutexLock(&pVnode->lock); if (pVnode->blocked) { pVnode->blocked = false; @@ -474,15 +414,6 @@ static void vnodeBecomeFollower(const SSyncFSM *pFsm) { static void vnodeBecomeLeader(const SSyncFSM *pFsm) { SVnode *pVnode = pFsm->data; vDebug("vgId:%d, become leader", pVnode->config.vgId); - -#if 0 - taosThreadMutexLock(&pVnode->lock); - if (pVnode->blocked) { - pVnode->blocked = false; - tsem_post(&pVnode->syncSem); - } - taosThreadMutexUnlock(&pVnode->lock); -#endif } static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 19b9b89caba70b8769de2e79a9902ec8721f7784..c8f128e6663f1a0545cb0374a174a60bd303a7a8 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -21,46 +21,6 @@ int64_t tsRpcQueueMemoryAllowed = 0; int64_t tsRpcQueueMemoryUsed = 0; -typedef struct STaosQnode STaosQnode; - -typedef struct STaosQnode { - STaosQnode *next; - STaosQueue *queue; - int64_t timestamp; - int32_t size; - int8_t itype; - int8_t reserved[3]; - char item[]; -} STaosQnode; - -typedef struct STaosQueue { - STaosQnode *head; - STaosQnode *tail; - STaosQueue *next; // for queue set - STaosQset *qset; // for queue set - void *ahandle; // for queue set - FItem itemFp; - FItems itemsFp; - TdThreadMutex mutex; - int64_t memOfItems; - int32_t numOfItems; -} STaosQueue; - -typedef struct STaosQset { - STaosQueue *head; - STaosQueue *current; - TdThreadMutex mutex; - tsem_t sem; - int32_t numOfQueues; - int32_t numOfItems; -} STaosQset; - -typedef struct STaosQall { - STaosQnode *current; - STaosQnode *start; - int32_t numOfItems; -} STaosQall; - STaosQueue *taosOpenQueue() { STaosQueue *queue = taosMemoryCalloc(1, sizeof(STaosQueue)); if (queue == NULL) { diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index f7d4173d3fcb4e30c927543436acb83596bf7339..27c641555fe524e4dd3a2c87f2b6cdc60bd1ddd6 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -36,7 +36,7 @@ int32_t tQWorkerInit(SQWorkerPool *pool) { worker->pool = pool; } - uInfo("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max); + uDebug("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max); return 0; } @@ -73,11 +73,12 @@ static void *tQWorkerThreadFp(SQWorker *worker) { taosBlockSIGPIPE(); setThreadName(pool->name); - uDebug("worker:%s:%d is running", pool->name, worker->id); + uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, taosGetSelfPthreadId()); while (1) { if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) { - uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset); + uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset, + taosGetSelfPthreadId()); break; } @@ -191,12 +192,13 @@ static void *tWWorkerThreadFp(SWWorker *worker) { taosBlockSIGPIPE(); setThreadName(pool->name); - uDebug("worker:%s:%d is running", pool->name, worker->id); + uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, taosGetSelfPthreadId()); while (1) { numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &qinfo); if (numOfMsgs == 0) { - uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, worker->qset); + uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, worker->qset, + taosGetSelfPthreadId()); break; } @@ -244,7 +246,8 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) { pool->nextId = (pool->nextId + 1) % pool->max; } - uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle); + queue->threadId = taosGetPthreadId(worker->thread); + uDebug("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle, queue->threadId); code = 0; _OVER: