diff --git a/src/util/src/tqueue.c b/src/util/src/tqueue.c index d9abf0d7c358fec3c6efc8185b21ffc504507fdb..8c6d6243eb4f677e9a5436b4449e07df011d2b52 100644 --- a/src/util/src/tqueue.c +++ b/src/util/src/tqueue.c @@ -61,6 +61,7 @@ taos_queue taosOpenQueue() { pthread_mutex_init(&queue->mutex, NULL); + uTrace("queue:%p is openned", queue); return queue; } @@ -89,6 +90,8 @@ void taosCloseQueue(taos_queue param) { pthread_mutex_unlock(&queue->mutex); pthread_mutex_destroy(&queue->mutex); free(queue); + + uTrace("queue:%p is closed", queue); } void *taosAllocateQitem(int size) { @@ -161,7 +164,7 @@ int taosReadQitem(taos_queue param, int *type, void **pitem) { } void *taosAllocateQall() { - void *p = malloc(sizeof(STaosQall)); + void *p = calloc(sizeof(STaosQall), 1); return p; } @@ -230,15 +233,29 @@ taos_qset taosOpenQset() { pthread_mutex_init(&qset->mutex, NULL); tsem_init(&qset->sem, 0, 0); + uTrace("qset:%p is openned", qset); return qset; } void taosCloseQset(taos_qset param) { if (param == NULL) return; STaosQset *qset = (STaosQset *)param; + + // remove all the queues from qset + pthread_mutex_lock(&qset->mutex); + while (qset->head) { + STaosQueue *queue = qset->head; + qset->head = qset->head->next; + + queue->qset = NULL; + queue->next = NULL; + } + pthread_mutex_unlock(&qset->mutex); + pthread_mutex_destroy(&qset->mutex); tsem_destroy(&qset->sem); free(qset); + uTrace("qset:%p is closed", qset); } // tsem_post 'qset->sem', so that reader threads waiting for it @@ -269,6 +286,7 @@ int taosAddIntoQset(taos_qset p1, taos_queue p2, void *ahandle) { pthread_mutex_unlock(&qset->mutex); + uTrace("queue:%p is added into qset:%p", queue, qset); return 0; } @@ -288,6 +306,7 @@ void taosRemoveFromQset(taos_qset p1, taos_queue p2) { STaosQueue *prev = qset->head; tqueue = qset->head->next; while (tqueue) { + assert(tqueue->qset); if (tqueue== queue) { prev->next = tqueue->next; break; @@ -305,11 +324,14 @@ void taosRemoveFromQset(taos_qset p1, taos_queue p2) { pthread_mutex_lock(&queue->mutex); atomic_sub_fetch_32(&qset->numOfItems, queue->numOfItems); queue->qset = NULL; + queue->next = NULL; pthread_mutex_unlock(&queue->mutex); } } pthread_mutex_unlock(&qset->mutex); + + uTrace("queue:%p is removed from qset:%p", queue, qset); } int taosGetQueueNumber(taos_qset param) {