diff --git a/include/common/tglobal.h b/include/common/tglobal.h index fd8cea449f0a8cd1b34ae91ed8072cc2b2f4ac1a..94b6ca551b361f632cbe06c6e81ca7bf26dd830f 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -51,6 +51,7 @@ extern int32_t tsVnodeShmSize; extern int32_t tsQnodeShmSize; extern int32_t tsSnodeShmSize; extern int32_t tsBnodeShmSize; +extern int32_t tsNumOfShmThreads; // queue & threads extern int32_t tsNumOfRpcThreads; @@ -67,6 +68,8 @@ extern int32_t tsNumOfQnodeQueryThreads; extern int32_t tsNumOfQnodeFetchThreads; extern int32_t tsNumOfSnodeSharedThreads; extern int32_t tsNumOfSnodeUniqueThreads; +extern int64_t tsRpcQueueMemoryAllowed; +extern int64_t tsRpcQueueMemoryUsed; // monitor extern bool tsEnableMonitor; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index aa2b32daab0d2733d2dcd398100d87e0f55815c1..b73d39090bfe2be5d8cecce0b6e5761bea7239ca 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -89,6 +89,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_REPEAT_INIT TAOS_DEF_ERROR_CODE(0, 0x0115) #define TSDB_CODE_DUP_KEY TAOS_DEF_ERROR_CODE(0, 0x0116) #define TSDB_CODE_NEED_RETRY TAOS_DEF_ERROR_CODE(0, 0x0117) +#define TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE TAOS_DEF_ERROR_CODE(0, 0x0118) #define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0140) #define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0141) diff --git a/include/util/tqueue.h b/include/util/tqueue.h index 70db65d50f850e1099032ddbde3e68913f1f5386..fc02ea2491a23d1f8c155c476b7f352e9c852f13 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -59,7 +59,7 @@ void taosFreeQitem(void *pItem); void taosWriteQitem(STaosQueue *queue, void *pItem); int32_t taosReadQitem(STaosQueue *queue, void **ppItem); bool taosQueueEmpty(STaosQueue *queue); -int32_t taosQueueSize(STaosQueue *queue); +int32_t taosQueueItemSize(STaosQueue *queue); STaosQall *taosAllocateQall(); void taosFreeQall(STaosQall *qall); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index d82378c3157c3f5648f864329047e2d117b29d33..53b59fa47938e2b109e480245287c2079d3245ac 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -44,6 +44,7 @@ int32_t tsVnodeShmSize = TSDB_MAX_WAL_SIZE * 10 + 128; int32_t tsQnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128; int32_t tsSnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128; int32_t tsBnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128; +int32_t tsNumOfShmThreads = 1; // queue & threads int32_t tsNumOfRpcThreads = 1; @@ -60,6 +61,8 @@ int32_t tsNumOfQnodeQueryThreads = 2; int32_t tsNumOfQnodeFetchThreads = 2; int32_t tsNumOfSnodeSharedThreads = 2; int32_t tsNumOfSnodeUniqueThreads = 2; +int64_t tsRpcQueueMemoryAllowed = 0; +int64_t tsRpcQueueMemoryUsed = 0; // monitor bool tsEnableMonitor = true; @@ -374,6 +377,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1; if (cfgAddInt32(pCfg, "snodeShmSize", tsSnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1; if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "mumOfShmThreads", tsNumOfShmThreads, 1, 1024, 0) != 0) return -1; tsNumOfRpcThreads = tsNumOfCores / 2; tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 1, 4); @@ -427,6 +431,10 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfSnodeUniqueThreads = TRANGE(tsNumOfSnodeUniqueThreads, 2, 4); if (cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeUniqueThreads, 1, 1024, 0) != 0) return -1; + tsRpcQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1; + tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_WAL_SIZE * 10L, TSDB_MAX_WAL_SIZE * 10000L); + if (cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsRpcQueueMemoryAllowed, 1, INT64_MAX, 0) != 0) return -1; + if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1; if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, 0) != 0) return -1; if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, 0) != 0) return -1; @@ -566,6 +574,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32; tsNumOfSnodeUniqueThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32; + tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64; tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval; tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32; diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c index c95a4e8292cbf28c573c0f6e6eb8c47ce80c3c66..40b91cb93ef83dc948f5767ca1bc3d7586309741 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c @@ -126,10 +126,10 @@ int32_t qmGetQueueSize(SQnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { switch (qtype) { case QUERY_QUEUE: - size = taosQueueSize(pMgmt->queryWorker.queue); + size = taosQueueItemSize(pMgmt->queryWorker.queue); break; case FETCH_QUEUE: - size = taosQueueSize(pMgmt->fetchWorker.queue); + size = taosQueueItemSize(pMgmt->fetchWorker.queue); break; default: break; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index cf8f4d5010e425d1c98b4f55d0a9a3d5287a523f..1f671179b6b04f1f6c9d5e3eba898d19781d9cd3 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -397,22 +397,22 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { if (pVnode != NULL) { switch (qtype) { case WRITE_QUEUE: - size = taosQueueSize(pVnode->pWriteQ); + size = taosQueueItemSize(pVnode->pWriteQ); break; case SYNC_QUEUE: - size = taosQueueSize(pVnode->pSyncQ); + size = taosQueueItemSize(pVnode->pSyncQ); break; case APPLY_QUEUE: - size = taosQueueSize(pVnode->pApplyQ); + size = taosQueueItemSize(pVnode->pApplyQ); break; case QUERY_QUEUE: - size = taosQueueSize(pVnode->pQueryQ); + size = taosQueueItemSize(pVnode->pQueryQ); break; case FETCH_QUEUE: - size = taosQueueSize(pVnode->pFetchQ); + size = taosQueueItemSize(pVnode->pFetchQ); break; case MERGE_QUEUE: - size = taosQueueSize(pVnode->pMergeQ); + size = taosQueueItemSize(pVnode->pMergeQ); break; default: break; diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index a217701471c07b1b2b86b25de7bc89ac9f400a45..1d371d43f6738f7b93f6cdbfcc68ad08263e1776 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -88,9 +88,9 @@ static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputDat static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) { uint32_t capacity = pDispatcher->pManager->cfg.maxDataBlockNumPerQuery; - if (taosQueueSize(pDispatcher->pDataBlocks) > capacity) { + if (taosQueueItemSize(pDispatcher->pDataBlocks) > capacity) { qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity, - taosQueueSize(pDispatcher->pDataBlocks)); + taosQueueItemSize(pDispatcher->pDataBlocks)); return false; } @@ -106,7 +106,7 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, static int32_t updateStatus(SDataDispatchHandle* pDispatcher) { taosThreadMutexLock(&pDispatcher->mutex); - int32_t blockNums = taosQueueSize(pDispatcher->pDataBlocks); + int32_t blockNums = taosQueueItemSize(pDispatcher->pDataBlocks); int32_t status = (0 == blockNums ? DS_BUF_EMPTY : (blockNums < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL)); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index a1bc37e6cd1231a6f5486d526756beb5916f45e8..6c0a9b132480944c082c4c7e0e8e4abd9ac4727d 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -95,6 +95,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CFG_NOT_FOUND, "Config not found") TAOS_DEFINE_ERROR(TSDB_CODE_REPEAT_INIT, "Repeat initialization") TAOS_DEFINE_ERROR(TSDB_CODE_DUP_KEY, "Cannot add duplicate keys to hash") TAOS_DEFINE_ERROR(TSDB_CODE_NEED_RETRY, "Retry needed") +TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE, "Out of memory in rpc queue") TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs") diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 3c3a8460b96041538c3a625812982c391c91bdf5..0d96dcda0a19aaeaa3fd01c1ac160b430f25ca5d 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -23,36 +23,36 @@ typedef struct STaosQnode STaosQnode; typedef struct STaosQnode { STaosQnode *next; STaosQueue *queue; + int32_t size; char item[]; } STaosQnode; typedef struct STaosQueue { - int32_t itemSize; - int32_t numOfItems; - int32_t threadId; - STaosQnode *head; - STaosQnode *tail; - STaosQueue *next; // for queue set - STaosQset *qset; // for queue set - void *ahandle; // for queue set - FItem itemFp; - FItems itemsFp; + int32_t memOfItems; + int32_t numOfItems; + int32_t threadId; + STaosQnode *head; + STaosQnode *tail; + STaosQueue *next; // for queue set + STaosQset *qset; // for queue set + void *ahandle; // for queue set + FItem itemFp; + FItems itemsFp; TdThreadMutex mutex; } STaosQueue; typedef struct STaosQset { - STaosQueue *head; - STaosQueue *current; + STaosQueue *head; + STaosQueue *current; TdThreadMutex mutex; - int32_t numOfQueues; - int32_t numOfItems; - tsem_t sem; + int32_t numOfQueues; + int32_t numOfItems; + tsem_t sem; } STaosQset; typedef struct STaosQall { STaosQnode *current; STaosQnode *start; - int32_t itemSize; int32_t numOfItems; } STaosQall; @@ -118,15 +118,23 @@ bool taosQueueEmpty(STaosQueue *queue) { return empty; } -int32_t taosQueueSize(STaosQueue *queue) { +int32_t taosQueueItemSize(STaosQueue *queue) { taosThreadMutexLock(&queue->mutex); int32_t numOfItems = queue->numOfItems; taosThreadMutexUnlock(&queue->mutex); return numOfItems; } +int32_t taosQueueMemorySize(STaosQueue *queue) { + taosThreadMutexLock(&queue->mutex); + int32_t memOfItems = queue->memOfItems; + taosThreadMutexUnlock(&queue->mutex); + return memOfItems; +} + void *taosAllocateQitem(int32_t size) { STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size); + pNode->size = size; if (pNode == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -161,8 +169,9 @@ void taosWriteQitem(STaosQueue *queue, void *pItem) { } queue->numOfItems++; + queue->memOfItems += pNode->size; if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1); - uTrace("item:%p is put into queue:%p, items:%d", pItem, queue, queue->numOfItems); + uTrace("item:%p is put into queue:%p, items:%d mem:%" PRId64, pItem, queue, queue->numOfItems, queue->memOfItems); taosThreadMutexUnlock(&queue->mutex); @@ -181,9 +190,11 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) { queue->head = pNode->next; if (queue->head == NULL) queue->tail = NULL; queue->numOfItems--; + queue->memOfItems -= pNode->size; if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1); code = 1; - uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems); + uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems, + queue->memOfItems); } taosThreadMutexUnlock(&queue->mutex); @@ -207,12 +218,12 @@ int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) { qall->current = queue->head; qall->start = queue->head; qall->numOfItems = queue->numOfItems; - qall->itemSize = queue->itemSize; code = qall->numOfItems; queue->head = NULL; queue->tail = NULL; queue->numOfItems = 0; + queue->memOfItems = 0; if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems); } @@ -377,9 +388,11 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FI queue->head = pNode->next; if (queue->head == NULL) queue->tail = NULL; queue->numOfItems--; + queue->memOfItems -= pNode->size; atomic_sub_fetch_32(&qset->numOfItems, 1); code = 1; - uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems); + uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems, + queue->memOfItems); } taosThreadMutexUnlock(&queue->mutex); @@ -411,7 +424,6 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand qall->current = queue->head; qall->start = queue->head; qall->numOfItems = queue->numOfItems; - qall->itemSize = queue->itemSize; code = qall->numOfItems; if (ahandle) *ahandle = queue->ahandle; if (itemsFp) *itemsFp = queue->itemsFp; @@ -419,6 +431,7 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand queue->head = NULL; queue->tail = NULL; queue->numOfItems = 0; + queue->memOfItems = 0; atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems); for (int32_t j = 1; j < qall->numOfItems; ++j) { tsem_wait(&qset->sem);