/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "os.h" #include "ulog.h" #include "taoserror.h" #include "tqueue.h" typedef struct STaosQnode { struct STaosQnode *next; char item[]; } STaosQnode; typedef struct STaosQueue { int32_t itemSize; int32_t numOfItems; struct STaosQnode *head; struct STaosQnode *tail; struct STaosQueue *next; // for queue set struct STaosQset *qset; // for queue set void *ahandle; // for queue set FProcessItem itemFp; FProcessItems itemsFp; pthread_mutex_t mutex; } STaosQueue; typedef struct STaosQset { STaosQueue *head; STaosQueue *current; pthread_mutex_t mutex; int32_t numOfQueues; int32_t numOfItems; tsem_t sem; } STaosQset; typedef struct STaosQall { STaosQnode *current; STaosQnode *start; int32_t itemSize; int32_t numOfItems; } STaosQall; taos_queue taosOpenQueue() { STaosQueue *queue = (STaosQueue *)calloc(sizeof(STaosQueue), 1); if (queue == NULL) { terrno = TSDB_CODE_COM_OUT_OF_MEMORY; return NULL; } pthread_mutex_init(&queue->mutex, NULL); uTrace("queue:%p is opened", queue); return queue; } void taosSetQueueFp(taos_queue param, FProcessItem itemFp, FProcessItems itemsFp) { if (param == NULL) return; STaosQueue *queue = (STaosQueue *)param; queue->itemFp = itemFp; queue->itemsFp = itemsFp; } void taosCloseQueue(taos_queue param) { if (param == NULL) return; STaosQueue *queue = (STaosQueue *)param; STaosQnode *pTemp; STaosQset *qset; pthread_mutex_lock(&queue->mutex); STaosQnode *pNode = queue->head; queue->head = NULL; qset = queue->qset; pthread_mutex_unlock(&queue->mutex); if (queue->qset) taosRemoveFromQset(qset, queue); while (pNode) { pTemp = pNode; pNode = pNode->next; free(pTemp); } pthread_mutex_destroy(&queue->mutex); free(queue); uTrace("queue:%p is closed", queue); } bool taosQueueEmpty(taos_queue param) { if (param == NULL) return true; STaosQueue *queue = (STaosQueue *)param; bool empty = false; pthread_mutex_lock(&queue->mutex); if (queue->head == NULL && queue->tail == NULL) { empty = true; } pthread_mutex_destroy(&queue->mutex); return empty; } void *taosAllocateQitem(int size) { STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1); if (pNode == NULL) return NULL; uTrace("item:%p, node:%p is allocated", pNode->item, pNode); return (void *)pNode->item; } void taosFreeQitem(void *param) { if (param == NULL) return; char *temp = (char *)param; temp -= sizeof(STaosQnode); uTrace("item:%p, node:%p is freed", param, temp); free(temp); } int taosWriteQitem(taos_queue param, void *item) { STaosQueue *queue = (STaosQueue *)param; STaosQnode *pNode = (STaosQnode *)(((char *)item) - sizeof(STaosQnode)); pNode->next = NULL; pthread_mutex_lock(&queue->mutex); if (queue->tail) { queue->tail->next = pNode; queue->tail = pNode; } else { queue->head = pNode; queue->tail = pNode; } queue->numOfItems++; if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1); uTrace("item:%p is put into queue:%p, items:%d", item, queue, queue->numOfItems); pthread_mutex_unlock(&queue->mutex); if (queue->qset) tsem_post(&queue->qset->sem); return 0; } int taosReadQitem(taos_queue param, void **pitem) { STaosQueue *queue = (STaosQueue *)param; STaosQnode *pNode = NULL; int code = 0; pthread_mutex_lock(&queue->mutex); if (queue->head) { pNode = queue->head; *pitem = pNode->item; queue->head = pNode->next; if (queue->head == NULL) queue->tail = NULL; queue->numOfItems--; if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1); code = 1; uDebug("item:%p is read out from queue:%p, items:%d", *pitem, queue, queue->numOfItems); } pthread_mutex_unlock(&queue->mutex); return code; } void *taosAllocateQall() { void *p = calloc(sizeof(STaosQall), 1); return p; } void taosFreeQall(void *param) { free(param); } int taosReadAllQitems(taos_queue param, taos_qall p2) { STaosQueue *queue = (STaosQueue *)param; STaosQall *qall = (STaosQall *)p2; int code = 0; bool empty; pthread_mutex_lock(&queue->mutex); empty = queue->head == NULL; if (!empty) { memset(qall, 0, sizeof(STaosQall)); qall->current = queue->head; qall->start = queue->head; qall->numOfItems = queue->numOfItems; qall->itemSize = queue->itemSize; code = qall->numOfItems; queue->head = NULL; queue->tail = NULL; queue->numOfItems = 0; if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems); } pthread_mutex_unlock(&queue->mutex); // if source queue is empty, we set destination qall to empty too. if (empty) { qall->current = NULL; qall->start = NULL; qall->numOfItems = 0; } return code; } int taosGetQitem(taos_qall param, void **pitem) { STaosQall *qall = (STaosQall *)param; STaosQnode *pNode; int num = 0; pNode = qall->current; if (pNode) qall->current = pNode->next; if (pNode) { *pitem = pNode->item; num = 1; uTrace("item:%p is fetched", *pitem); } return num; } void taosResetQitems(taos_qall param) { STaosQall *qall = (STaosQall *)param; qall->current = qall->start; } taos_qset taosOpenQset() { STaosQset *qset = (STaosQset *)calloc(sizeof(STaosQset), 1); if (qset == NULL) { terrno = TSDB_CODE_COM_OUT_OF_MEMORY; return NULL; } pthread_mutex_init(&qset->mutex, NULL); tsem_init(&qset->sem, 0, 0); uTrace("qset:%p is opened", qset); return qset; } void taosCloseQset(taos_qset param) { if (param == NULL) return; STaosQset *qset = (STaosQset *)param; // remove all the queues from qset pthread_mutex_lock(&qset->mutex); while (qset->head) { STaosQueue *queue = qset->head; qset->head = qset->head->next; queue->qset = NULL; queue->next = NULL; } pthread_mutex_unlock(&qset->mutex); pthread_mutex_destroy(&qset->mutex); tsem_destroy(&qset->sem); free(qset); uTrace("qset:%p is closed", qset); } // tsem_post 'qset->sem', so that reader threads waiting for it // resumes execution and return, should only be used to signal the // thread to exit. void taosQsetThreadResume(taos_qset param) { STaosQset *qset = (STaosQset *)param; uDebug("qset:%p, it will exit", qset); tsem_post(&qset->sem); } int taosAddIntoQset(taos_qset p1, taos_queue p2, void *ahandle) { STaosQueue *queue = (STaosQueue *)p2; STaosQset *qset = (STaosQset *)p1; if (queue->qset) return -1; pthread_mutex_lock(&qset->mutex); queue->next = qset->head; queue->ahandle = ahandle; qset->head = queue; qset->numOfQueues++; pthread_mutex_lock(&queue->mutex); atomic_add_fetch_32(&qset->numOfItems, queue->numOfItems); queue->qset = qset; pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&qset->mutex); uTrace("queue:%p is added into qset:%p", queue, qset); return 0; } void taosRemoveFromQset(taos_qset p1, taos_queue p2) { STaosQueue *queue = (STaosQueue *)p2; STaosQset *qset = (STaosQset *)p1; STaosQueue *tqueue = NULL; pthread_mutex_lock(&qset->mutex); if (qset->head) { if (qset->head == queue) { qset->head = qset->head->next; tqueue = queue; } else { STaosQueue *prev = qset->head; tqueue = qset->head->next; while (tqueue) { assert(tqueue->qset); if (tqueue == queue) { prev->next = tqueue->next; break; } else { prev = tqueue; tqueue = tqueue->next; } } } if (tqueue) { if (qset->current == queue) qset->current = tqueue->next; qset->numOfQueues--; pthread_mutex_lock(&queue->mutex); atomic_sub_fetch_32(&qset->numOfItems, queue->numOfItems); queue->qset = NULL; queue->next = NULL; pthread_mutex_unlock(&queue->mutex); } } pthread_mutex_unlock(&qset->mutex); uTrace("queue:%p is removed from qset:%p", queue, qset); } int taosGetQueueNumber(taos_qset param) { return ((STaosQset *)param)->numOfQueues; } int taosReadQitemFromQset(taos_qset param, void **pitem, void **ahandle, FProcessItem *itemFp) { STaosQset *qset = (STaosQset *)param; STaosQnode *pNode = NULL; int code = 0; tsem_wait(&qset->sem); pthread_mutex_lock(&qset->mutex); for (int i = 0; i < qset->numOfQueues; ++i) { if (qset->current == NULL) qset->current = qset->head; STaosQueue *queue = qset->current; if (queue) qset->current = queue->next; if (queue == NULL) break; if (queue->head == NULL) continue; pthread_mutex_lock(&queue->mutex); if (queue->head) { pNode = queue->head; *pitem = pNode->item; if (ahandle) *ahandle = queue->ahandle; if (itemFp) *itemFp = queue->itemFp; queue->head = pNode->next; if (queue->head == NULL) queue->tail = NULL; queue->numOfItems--; atomic_sub_fetch_32(&qset->numOfItems, 1); code = 1; uTrace("item:%p is read out from queue:%p, items:%d", *pitem, queue, queue->numOfItems); } pthread_mutex_unlock(&queue->mutex); if (pNode) break; } pthread_mutex_unlock(&qset->mutex); return code; } int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **ahandle, FProcessItems *itemsFp) { STaosQset *qset = (STaosQset *)param; STaosQueue *queue; STaosQall *qall = (STaosQall *)p2; int code = 0; tsem_wait(&qset->sem); pthread_mutex_lock(&qset->mutex); for(int i=0; inumOfQueues; ++i) { if (qset->current == NULL) qset->current = qset->head; queue = qset->current; if (queue) qset->current = queue->next; if (queue == NULL) break; if (queue->head == NULL) continue; pthread_mutex_lock(&queue->mutex); if (queue->head) { qall->current = queue->head; qall->start = queue->head; qall->numOfItems = queue->numOfItems; qall->itemSize = queue->itemSize; code = qall->numOfItems; if (ahandle) *ahandle = queue->ahandle; if (itemsFp) *itemsFp = queue->itemsFp; queue->head = NULL; queue->tail = NULL; queue->numOfItems = 0; atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems); for (int j=1; jnumOfItems; ++j) tsem_wait(&qset->sem); } pthread_mutex_unlock(&queue->mutex); if (code != 0) break; } pthread_mutex_unlock(&qset->mutex); return code; } int taosGetQueueItemsNumber(taos_queue param) { STaosQueue *queue = (STaosQueue *)param; if (!queue) return 0; int num; pthread_mutex_lock(&queue->mutex); num = queue->numOfItems; pthread_mutex_unlock(&queue->mutex); return num; } int taosGetQsetItemsNumber(taos_qset param) { STaosQset *qset = (STaosQset *)param; if (!qset) return 0; int num = 0; pthread_mutex_lock(&qset->mutex); num = qset->numOfItems; pthread_mutex_unlock(&qset->mutex); return num; }