提交 7ad96ac2 编写于 作者: S Shengliang Guan

enh: control the memory of the rpc queue

上级 245f82ff
...@@ -69,7 +69,6 @@ extern int32_t tsNumOfQnodeFetchThreads; ...@@ -69,7 +69,6 @@ extern int32_t tsNumOfQnodeFetchThreads;
extern int32_t tsNumOfSnodeSharedThreads; extern int32_t tsNumOfSnodeSharedThreads;
extern int32_t tsNumOfSnodeUniqueThreads; extern int32_t tsNumOfSnodeUniqueThreads;
extern int64_t tsRpcQueueMemoryAllowed; extern int64_t tsRpcQueueMemoryAllowed;
extern int64_t tsRpcQueueMemoryUsed;
// monitor // monitor
extern bool tsEnableMonitor; extern bool tsEnableMonitor;
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#define _TD_UTIL_PROCESS_H_ #define _TD_UTIL_PROCESS_H_
#include "os.h" #include "os.h"
#include "tqueue.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -25,7 +26,7 @@ extern "C" { ...@@ -25,7 +26,7 @@ extern "C" {
typedef enum { PROC_FUNC_REQ = 1, PROC_FUNC_RSP, PROC_FUNC_REGIST, PROC_FUNC_RELEASE } EProcFuncType; typedef enum { PROC_FUNC_REQ = 1, PROC_FUNC_RSP, PROC_FUNC_REGIST, PROC_FUNC_RELEASE } EProcFuncType;
typedef struct SProcObj SProcObj; typedef struct SProcObj SProcObj;
typedef void *(*ProcMallocFp)(int32_t contLen); typedef void *(*ProcMallocFp)(int32_t contLen, EQItype itype);
typedef void *(*ProcFreeFp)(void *pCont); typedef void *(*ProcFreeFp)(void *pCont);
typedef void (*ProcConsumeFp)(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, typedef void (*ProcConsumeFp)(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen,
EProcFuncType ftype); EProcFuncType ftype);
......
...@@ -48,18 +48,24 @@ typedef struct { ...@@ -48,18 +48,24 @@ typedef struct {
int32_t threadNum; int32_t threadNum;
} SQueueInfo; } SQueueInfo;
typedef enum {
DEF_QITEM = 0,
RPC_QITEM = 1,
} EQItype;
typedef void (*FItem)(SQueueInfo *pInfo, void *pItem); typedef void (*FItem)(SQueueInfo *pInfo, void *pItem);
typedef void (*FItems)(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfItems); typedef void (*FItems)(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfItems);
STaosQueue *taosOpenQueue(); STaosQueue *taosOpenQueue();
void taosCloseQueue(STaosQueue *queue); void taosCloseQueue(STaosQueue *queue);
void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp); void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp);
void *taosAllocateQitem(int32_t size); void *taosAllocateQitem(int32_t size, EQItype itype);
void taosFreeQitem(void *pItem); void taosFreeQitem(void *pItem);
void taosWriteQitem(STaosQueue *queue, void *pItem); void taosWriteQitem(STaosQueue *queue, void *pItem);
int32_t taosReadQitem(STaosQueue *queue, void **ppItem); int32_t taosReadQitem(STaosQueue *queue, void **ppItem);
bool taosQueueEmpty(STaosQueue *queue); bool taosQueueEmpty(STaosQueue *queue);
int32_t taosQueueItemSize(STaosQueue *queue); int32_t taosQueueItemSize(STaosQueue *queue);
int64_t taosQueueMemorySize(STaosQueue *queue);
STaosQall *taosAllocateQall(); STaosQall *taosAllocateQall();
void taosFreeQall(STaosQall *qall); void taosFreeQall(STaosQall *qall);
...@@ -77,8 +83,8 @@ int32_t taosGetQueueNumber(STaosQset *qset); ...@@ -77,8 +83,8 @@ int32_t taosGetQueueNumber(STaosQset *qset);
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp); int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp);
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp); int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp);
void taosResetQsetThread(STaosQset *qset, void *pItem); void taosResetQsetThread(STaosQset *qset, void *pItem);
int32_t taosGetQueueItemsNumber(STaosQueue *queue);
int32_t taosGetQsetItemsNumber(STaosQset *qset); extern int64_t tsRpcQueueMemoryAllowed;
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -316,7 +316,7 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) { ...@@ -316,7 +316,7 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
void tmqAssignDelayedHbTask(void* param, void* tmrId) { void tmqAssignDelayedHbTask(void* param, void* tmrId) {
tmq_t* tmq = (tmq_t*)param; tmq_t* tmq = (tmq_t*)param;
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t)); int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
*pTaskType = TMQ_DELAYED_TASK__HB; *pTaskType = TMQ_DELAYED_TASK__HB;
taosWriteQitem(tmq->delayedTask, pTaskType); taosWriteQitem(tmq->delayedTask, pTaskType);
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
...@@ -324,7 +324,7 @@ void tmqAssignDelayedHbTask(void* param, void* tmrId) { ...@@ -324,7 +324,7 @@ void tmqAssignDelayedHbTask(void* param, void* tmrId) {
void tmqAssignDelayedCommitTask(void* param, void* tmrId) { void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
tmq_t* tmq = (tmq_t*)param; tmq_t* tmq = (tmq_t*)param;
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t)); int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
*pTaskType = TMQ_DELAYED_TASK__COMMIT; *pTaskType = TMQ_DELAYED_TASK__COMMIT;
taosWriteQitem(tmq->delayedTask, pTaskType); taosWriteQitem(tmq->delayedTask, pTaskType);
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
...@@ -332,7 +332,7 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) { ...@@ -332,7 +332,7 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
void tmqAssignDelayedReportTask(void* param, void* tmrId) { void tmqAssignDelayedReportTask(void* param, void* tmrId) {
tmq_t* tmq = (tmq_t*)param; tmq_t* tmq = (tmq_t*)param;
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t)); int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
*pTaskType = TMQ_DELAYED_TASK__REPORT; *pTaskType = TMQ_DELAYED_TASK__REPORT;
taosWriteQitem(tmq->delayedTask, pTaskType); taosWriteQitem(tmq->delayedTask, pTaskType);
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
...@@ -848,7 +848,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -848,7 +848,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch); tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch);
} }
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper)); SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
if (pRspWrapper == NULL) { if (pRspWrapper == NULL) {
tscWarn("msg discard from vg %d, epoch %d since out of memory", pParam->vgId, pParam->epoch); tscWarn("msg discard from vg %d, epoch %d since out of memory", pParam->vgId, pParam->epoch);
goto CREATE_MSG_FAIL; goto CREATE_MSG_FAIL;
...@@ -987,7 +987,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -987,7 +987,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
tmqUpdateEp(tmq, head->epoch, &rsp); tmqUpdateEp(tmq, head->epoch, &rsp);
tDeleteSMqAskEpRsp(&rsp); tDeleteSMqAskEpRsp(&rsp);
} else { } else {
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper)); SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
if (pWrapper == NULL) { if (pWrapper == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1; code = -1;
......
...@@ -61,8 +61,6 @@ int32_t tsNumOfQnodeQueryThreads = 2; ...@@ -61,8 +61,6 @@ int32_t tsNumOfQnodeQueryThreads = 2;
int32_t tsNumOfQnodeFetchThreads = 2; int32_t tsNumOfQnodeFetchThreads = 2;
int32_t tsNumOfSnodeSharedThreads = 2; int32_t tsNumOfSnodeSharedThreads = 2;
int32_t tsNumOfSnodeUniqueThreads = 2; int32_t tsNumOfSnodeUniqueThreads = 2;
int64_t tsRpcQueueMemoryAllowed = 0;
int64_t tsRpcQueueMemoryUsed = 0;
// monitor // monitor
bool tsEnableMonitor = true; bool tsEnableMonitor = true;
......
...@@ -104,7 +104,7 @@ int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { ...@@ -104,7 +104,7 @@ int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
} }
static inline int32_t mmPutRpcMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pRpc) { static inline int32_t mmPutRpcMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pRpc) {
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg), RPC_QITEM);
if (pMsg == NULL) return -1; if (pMsg == NULL) return -1;
dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType)); dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType));
......
...@@ -102,7 +102,7 @@ int32_t qmPutNodeMsgToMonitorQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { ...@@ -102,7 +102,7 @@ int32_t qmPutNodeMsgToMonitorQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
} }
static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) { static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) {
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg), RPC_QITEM);
if (pMsg == NULL) { if (pMsg == NULL) {
return -1; return -1;
} }
......
...@@ -326,7 +326,7 @@ static int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc, EQueueType q ...@@ -326,7 +326,7 @@ static int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc, EQueueType q
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) return -1; if (pVnode == NULL) return -1;
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg), RPC_QITEM);
int32_t code = 0; int32_t code = 0;
if (pMsg != NULL) { if (pMsg != NULL) {
......
...@@ -79,7 +79,7 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe ...@@ -79,7 +79,7 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe
needRelease = true; needRelease = true;
if ((msgFp = dmGetMsgFp(pWrapper, pRpc)) == NULL) goto _OVER; if ((msgFp = dmGetMsgFp(pWrapper, pRpc)) == NULL) goto _OVER;
if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg))) == NULL) goto _OVER; if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg), RPC_QITEM)) == NULL) goto _OVER;
if (dmBuildMsg(pMsg, pRpc) != 0) goto _OVER; if (dmBuildMsg(pMsg, pRpc) != 0) goto _OVER;
if (pWrapper->procType != DND_PROC_PARENT) { if (pWrapper->procType != DND_PROC_PARENT) {
......
...@@ -124,7 +124,7 @@ static int32_t getStatus(SDataDispatchHandle* pDispatcher) { ...@@ -124,7 +124,7 @@ static int32_t getStatus(SDataDispatchHandle* pDispatcher) {
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) { static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf)); SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf), DEF_QITEM);
if (NULL == pBuf || !allocBuf(pDispatcher, pInput, pBuf)) { if (NULL == pBuf || !allocBuf(pDispatcher, pInput, pBuf)) {
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
......
...@@ -92,7 +92,7 @@ int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg) { ...@@ -92,7 +92,7 @@ int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg) {
syncRpcMsgLog2((char *)"==syncIOEqMsg==", pMsg); syncRpcMsgLog2((char *)"==syncIOEqMsg==", pMsg);
SRpcMsg *pTemp; SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg)); pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM);
memcpy(pTemp, pMsg, sizeof(SRpcMsg)); memcpy(pTemp, pMsg, sizeof(SRpcMsg));
STaosQueue *pMsgQ = queue; STaosQueue *pMsgQ = queue;
...@@ -360,7 +360,7 @@ static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -360,7 +360,7 @@ static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
syncRpcMsgLog2((char *)"==syncIOProcessRequest==", pMsg); syncRpcMsgLog2((char *)"==syncIOProcessRequest==", pMsg);
SSyncIO *io = pParent; SSyncIO *io = pParent;
SRpcMsg *pTemp; SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg)); pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM);
memcpy(pTemp, pMsg, sizeof(SRpcMsg)); memcpy(pTemp, pMsg, sizeof(SRpcMsg));
taosWriteQitem(io->pMsgQ, pTemp); taosWriteQitem(io->pMsgQ, pTemp);
} }
...@@ -420,7 +420,7 @@ static void syncIOTickQ(void *param, void *tmrId) { ...@@ -420,7 +420,7 @@ static void syncIOTickQ(void *param, void *tmrId) {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncPingReply2RpcMsg(pMsg, &rpcMsg); syncPingReply2RpcMsg(pMsg, &rpcMsg);
SRpcMsg *pTemp; SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg)); pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM);
memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg)); memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg));
syncRpcMsgLog2((char *)"==syncIOTickQ==", &rpcMsg); syncRpcMsgLog2((char *)"==syncIOTickQ==", &rpcMsg);
taosWriteQitem(io->pMsgQ, pTemp); taosWriteQitem(io->pMsgQ, pTemp);
......
...@@ -114,7 +114,7 @@ int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encrypt, char ...@@ -114,7 +114,7 @@ int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encrypt, char
void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SRpcMsg *pTemp; SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg)); pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM);
memcpy(pTemp, pMsg, sizeof(SRpcMsg)); memcpy(pTemp, pMsg, sizeof(SRpcMsg));
tDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp); tDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp);
......
...@@ -103,7 +103,7 @@ int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encrypt, char ...@@ -103,7 +103,7 @@ int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encrypt, char
void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SRpcMsg *pTemp; SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg)); pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM);
memcpy(pTemp, pMsg, sizeof(SRpcMsg)); memcpy(pTemp, pMsg, sizeof(SRpcMsg));
tDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp); tDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp);
......
...@@ -258,8 +258,8 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea ...@@ -258,8 +258,8 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea
int16_t headLen = CEIL8(rawHeadLen); int16_t headLen = CEIL8(rawHeadLen);
int32_t bodyLen = CEIL8(rawBodyLen); int32_t bodyLen = CEIL8(rawBodyLen);
void *pHead = (*mallocHeadFp)(headLen); void *pHead = (*mallocHeadFp)(headLen, RPC_QITEM);
void *pBody = (*mallocBodyFp)(bodyLen); void *pBody = (*mallocBodyFp)(bodyLen, RPC_QITEM);
if (pHead == NULL || pBody == NULL) { if (pHead == NULL || pBody == NULL) {
taosThreadMutexUnlock(&pQueue->mutex); taosThreadMutexUnlock(&pQueue->mutex);
tsem_post(&pQueue->sem); tsem_post(&pQueue->sem);
......
...@@ -18,20 +18,21 @@ ...@@ -18,20 +18,21 @@
#include "taoserror.h" #include "taoserror.h"
#include "tlog.h" #include "tlog.h"
int64_t tsRpcQueueMemoryAllowed = 0;
int64_t tsRpcQueueMemoryUsed = 0;
typedef struct STaosQnode STaosQnode; typedef struct STaosQnode STaosQnode;
typedef struct STaosQnode { typedef struct STaosQnode {
STaosQnode *next; STaosQnode *next;
STaosQueue *queue; STaosQueue *queue;
int32_t size; int32_t size;
int32_t reserved; int8_t itype;
int8_t reserved[3];
char item[]; char item[];
} STaosQnode; } STaosQnode;
typedef struct STaosQueue { typedef struct STaosQueue {
int64_t memOfItems;
int32_t numOfItems;
int32_t threadId;
STaosQnode *head; STaosQnode *head;
STaosQnode *tail; STaosQnode *tail;
STaosQueue *next; // for queue set STaosQueue *next; // for queue set
...@@ -40,15 +41,17 @@ typedef struct STaosQueue { ...@@ -40,15 +41,17 @@ typedef struct STaosQueue {
FItem itemFp; FItem itemFp;
FItems itemsFp; FItems itemsFp;
TdThreadMutex mutex; TdThreadMutex mutex;
int64_t memOfItems;
int32_t numOfItems;
} STaosQueue; } STaosQueue;
typedef struct STaosQset { typedef struct STaosQset {
STaosQueue *head; STaosQueue *head;
STaosQueue *current; STaosQueue *current;
TdThreadMutex mutex; TdThreadMutex mutex;
tsem_t sem;
int32_t numOfQueues; int32_t numOfQueues;
int32_t numOfItems; int32_t numOfItems;
tsem_t sem;
} STaosQset; } STaosQset;
typedef struct STaosQall { typedef struct STaosQall {
...@@ -120,6 +123,8 @@ bool taosQueueEmpty(STaosQueue *queue) { ...@@ -120,6 +123,8 @@ bool taosQueueEmpty(STaosQueue *queue) {
} }
int32_t taosQueueItemSize(STaosQueue *queue) { int32_t taosQueueItemSize(STaosQueue *queue) {
if (queue == NULL) return 0;
taosThreadMutexLock(&queue->mutex); taosThreadMutexLock(&queue->mutex);
int32_t numOfItems = queue->numOfItems; int32_t numOfItems = queue->numOfItems;
taosThreadMutexUnlock(&queue->mutex); taosThreadMutexUnlock(&queue->mutex);
...@@ -127,32 +132,51 @@ int32_t taosQueueItemSize(STaosQueue *queue) { ...@@ -127,32 +132,51 @@ int32_t taosQueueItemSize(STaosQueue *queue) {
} }
int64_t taosQueueMemorySize(STaosQueue *queue) { int64_t taosQueueMemorySize(STaosQueue *queue) {
if (queue == NULL) return 0;
taosThreadMutexLock(&queue->mutex); taosThreadMutexLock(&queue->mutex);
int64_t memOfItems = queue->memOfItems; int64_t memOfItems = queue->memOfItems;
taosThreadMutexUnlock(&queue->mutex); taosThreadMutexUnlock(&queue->mutex);
return memOfItems; return memOfItems;
} }
void *taosAllocateQitem(int32_t size) { void *taosAllocateQitem(int32_t size, EQItype itype) {
STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size); STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size);
pNode->size = size; pNode->size = size;
pNode->itype = itype;
if (pNode == NULL) { if (pNode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
uTrace("item:%p, node:%p is allocated", pNode->item, pNode); if (itype == RPC_QITEM) {
int64_t alloced = atomic_add_fetch_64(&tsRpcQueueMemoryUsed, size);
if (alloced > tsRpcQueueMemoryUsed) {
taosMemoryFree(pNode);
terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE;
return NULL;
}
uTrace("item:%p, node:%p is allocated, alloc:%" PRId64, pNode->item, pNode, alloced);
} else {
uTrace("item:%p, node:%p is allocated", pNode->item, pNode);
}
return (void *)pNode->item; return (void *)pNode->item;
} }
void taosFreeQitem(void *pItem) { void taosFreeQitem(void *pItem) {
if (pItem == NULL) return; if (pItem == NULL) return;
char *temp = pItem; STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
temp -= sizeof(STaosQnode); if (pNode->itype > 0) {
uTrace("item:%p, node:%p is freed", pItem, temp); int64_t alloced = atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, pNode->size);
taosMemoryFree(temp); uTrace("item:%p, node:%p is freed, alloc:%" PRId64, pItem, pNode, alloced);
} else {
uTrace("item:%p, node:%p is freed", pItem, pNode);
}
taosMemoryFree(pNode);
} }
void taosWriteQitem(STaosQueue *queue, void *pItem) { void taosWriteQitem(STaosQueue *queue, void *pItem) {
...@@ -203,7 +227,13 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) { ...@@ -203,7 +227,13 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
return code; return code;
} }
STaosQall *taosAllocateQall() { return taosMemoryCalloc(1, sizeof(STaosQall)); } STaosQall *taosAllocateQall() {
STaosQall *qall = taosMemoryCalloc(1, sizeof(STaosQall));
if (qall != NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
}
return qall;
}
void taosFreeQall(STaosQall *qall) { taosMemoryFree(qall); } void taosFreeQall(STaosQall *qall) { taosMemoryFree(qall); }
...@@ -458,23 +488,3 @@ void taosResetQsetThread(STaosQset *qset, void *pItem) { ...@@ -458,23 +488,3 @@ void taosResetQsetThread(STaosQset *qset, void *pItem) {
} }
taosThreadMutexUnlock(&qset->mutex); taosThreadMutexUnlock(&qset->mutex);
} }
int32_t taosGetQueueItemsNumber(STaosQueue *queue) {
if (!queue) return 0;
int32_t num;
taosThreadMutexLock(&queue->mutex);
num = queue->numOfItems;
taosThreadMutexUnlock(&queue->mutex);
return num;
}
int32_t taosGetQsetItemsNumber(STaosQset *qset) {
if (!qset) return 0;
int32_t num = 0;
taosThreadMutexLock(&qset->mutex);
num = qset->numOfItems;
taosThreadMutexUnlock(&qset->mutex);
return num;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册