提交 8617af2a 编写于 作者: dengyihao's avatar dengyihao

fix queue failure

上级 eb52e733
...@@ -78,7 +78,7 @@ bool taosQueueEmpty(STaosQueue *queue) { ...@@ -78,7 +78,7 @@ bool taosQueueEmpty(STaosQueue *queue) {
bool empty = false; bool empty = false;
taosThreadMutexLock(&queue->mutex); taosThreadMutexLock(&queue->mutex);
if (queue->head == NULL && queue->tail == NULL && queue->numOfItems == 0 && queue->memOfItems == 0) { if (queue->head == NULL && queue->tail == NULL && queue->numOfItems == 0 /*&& queue->memOfItems == 0*/) {
empty = true; empty = true;
} }
taosThreadMutexUnlock(&queue->mutex); taosThreadMutexUnlock(&queue->mutex);
...@@ -162,7 +162,7 @@ int32_t taosWriteQitem(STaosQueue *queue, void *pItem) { ...@@ -162,7 +162,7 @@ int32_t taosWriteQitem(STaosQueue *queue, void *pItem) {
pNode->next = NULL; pNode->next = NULL;
taosThreadMutexLock(&queue->mutex); taosThreadMutexLock(&queue->mutex);
if (queue->memLimit > 0 && queue->memOfItems + pNode->size > queue->memLimit) { if (queue->memLimit > 0 && (queue->memOfItems + pNode->size + pNode->dataSize) > queue->memLimit) {
code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY; code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY;
uError("item:%p failed to put into queue:%p, queue mem limit: %" PRId64 ", reason: %s" PRId64, pItem, queue, uError("item:%p failed to put into queue:%p, queue mem limit: %" PRId64 ", reason: %s" PRId64, pItem, queue,
queue->memLimit, tstrerror(code)); queue->memLimit, tstrerror(code));
...@@ -208,7 +208,7 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) { ...@@ -208,7 +208,7 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
queue->head = pNode->next; queue->head = pNode->next;
if (queue->head == NULL) queue->tail = NULL; if (queue->head == NULL) queue->tail = NULL;
queue->numOfItems--; queue->numOfItems--;
queue->memOfItems -= pNode->size; queue->memOfItems -= (pNode->size + pNode->dataSize);
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1); if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
code = 1; code = 1;
uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems, uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems,
...@@ -413,7 +413,7 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo) ...@@ -413,7 +413,7 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo)
queue->head = pNode->next; queue->head = pNode->next;
if (queue->head == NULL) queue->tail = NULL; if (queue->head == NULL) queue->tail = NULL;
// queue->numOfItems--; // queue->numOfItems--;
queue->memOfItems -= pNode->size; queue->memOfItems -= (pNode->size + pNode->dataSize);
atomic_sub_fetch_32(&qset->numOfItems, 1); atomic_sub_fetch_32(&qset->numOfItems, 1);
code = 1; code = 1;
uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems - 1, uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems - 1,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册