提交 652f51fe 编写于 作者: S Shengliang Guan

fix: control rpc qitem memory

上级 50b66df3
...@@ -65,7 +65,7 @@ typedef struct STaosQnode { ...@@ -65,7 +65,7 @@ typedef struct STaosQnode {
STaosQnode *next; STaosQnode *next;
STaosQueue *queue; STaosQueue *queue;
int64_t timestamp; int64_t timestamp;
int32_t dataSize; int64_t dataSize;
int32_t size; int32_t size;
int8_t itype; int8_t itype;
int8_t reserved[3]; int8_t reserved[3];
...@@ -104,7 +104,7 @@ typedef struct STaosQall { ...@@ -104,7 +104,7 @@ typedef struct STaosQall {
STaosQueue *taosOpenQueue(); STaosQueue *taosOpenQueue();
void taosCloseQueue(STaosQueue *queue); void taosCloseQueue(STaosQueue *queue);
void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp); void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp);
void *taosAllocateQitem(int32_t size, EQItype itype, int32_t dataSize); void *taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize);
void taosFreeQitem(void *pItem); void taosFreeQitem(void *pItem);
void taosWriteQitem(STaosQueue *queue, void *pItem); void taosWriteQitem(STaosQueue *queue, void *pItem);
int32_t taosReadQitem(STaosQueue *queue, void **ppItem); int32_t taosReadQitem(STaosQueue *queue, void **ppItem);
......
...@@ -164,7 +164,7 @@ int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { ...@@ -164,7 +164,7 @@ int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
memcpy(pMsg, pRpc, sizeof(SRpcMsg)); memcpy(pMsg, pRpc, sizeof(SRpcMsg));
pRpc->pCont = NULL; pRpc->pCont = NULL;
dTrace("msg:%p, is created and will put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType)); dTrace("msg:%p, is created and will put into %s queue, type:%s len:%d", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType), pRpc->contLen);
int32_t code = mmPutMsgToWorker(pMgmt, pWorker, pMsg); int32_t code = mmPutMsgToWorker(pMgmt, pWorker, pMsg);
if (code != 0) { if (code != 0) {
dTrace("msg:%p, is freed", pMsg); dTrace("msg:%p, is freed", pMsg);
......
...@@ -65,12 +65,12 @@ int32_t qmPutRpcMsgToQueue(SQnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { ...@@ -65,12 +65,12 @@ int32_t qmPutRpcMsgToQueue(SQnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
switch (qtype) { switch (qtype) {
case QUERY_QUEUE: case QUERY_QUEUE:
dTrace("msg:%p, is created and will put into qnode-query queue", pMsg); dTrace("msg:%p, is created and will put into qnode-query queue, len:%d", pMsg, pRpc->contLen);
taosWriteQitem(pMgmt->queryWorker.queue, pMsg); taosWriteQitem(pMgmt->queryWorker.queue, pMsg);
return 0; return 0;
case READ_QUEUE: case READ_QUEUE:
case FETCH_QUEUE: case FETCH_QUEUE:
dTrace("msg:%p, is created and will put into qnode-fetch queue", pMsg); dTrace("msg:%p, is created and will put into qnode-fetch queue, len:%d", pMsg, pRpc->contLen);
taosWriteQitem(pMgmt->fetchWorker.queue, pMsg); taosWriteQitem(pMgmt->fetchWorker.queue, pMsg);
return 0; return 0;
default: default:
......
...@@ -139,8 +139,8 @@ int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { ...@@ -139,8 +139,8 @@ int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
SSnode *pSnode = pMgmt->pSnode; SSnode *pSnode = pMgmt->pSnode;
if (pSnode == NULL) { if (pSnode == NULL) {
dError("msg:%p failed to put into snode queue since %s, type:%s qtype:%d", pMsg, terrstr(), dError("msg:%p failed to put into snode queue since %s, type:%s qtype:%d len:%d", pMsg, terrstr(),
TMSG_INFO(pMsg->msgType), qtype); TMSG_INFO(pMsg->msgType), qtype, pRpc->contLen);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
rpcFreeCont(pRpc->pCont); rpcFreeCont(pRpc->pCont);
pRpc->pCont = NULL; pRpc->pCont = NULL;
......
...@@ -241,7 +241,7 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { ...@@ -241,7 +241,7 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
} }
SMsgHead *pHead = pRpc->pCont; SMsgHead *pHead = pRpc->pCont;
dTrace("vgId:%d, msg:%p is created, type:%s", pHead->vgId, pMsg, TMSG_INFO(pRpc->msgType)); dTrace("vgId:%d, msg:%p is created, type:%s len:%d", pHead->vgId, pMsg, TMSG_INFO(pRpc->msgType), pRpc->contLen);
pHead->contLen = htonl(pHead->contLen); pHead->contLen = htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId); pHead->vgId = htonl(pHead->vgId);
......
...@@ -145,7 +145,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { ...@@ -145,7 +145,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
if (pMsg == NULL) goto _OVER; if (pMsg == NULL) goto _OVER;
memcpy(pMsg, pRpc, sizeof(SRpcMsg)); memcpy(pMsg, pRpc, sizeof(SRpcMsg));
dGTrace("msg:%p, is created, type:%s handle:%p", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle); dGTrace("msg:%p, is created, type:%s handle:%p len:%d", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle, pRpc->contLen);
code = dmProcessNodeMsg(pWrapper, pMsg); code = dmProcessNodeMsg(pWrapper, pMsg);
......
...@@ -109,7 +109,7 @@ int64_t taosQueueMemorySize(STaosQueue *queue) { ...@@ -109,7 +109,7 @@ int64_t taosQueueMemorySize(STaosQueue *queue) {
return memOfItems; return memOfItems;
} }
void *taosAllocateQitem(int32_t size, EQItype itype, int32_t dataSize) { void *taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize) {
STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size); STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size);
if (pNode == NULL) { if (pNode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -124,7 +124,9 @@ void *taosAllocateQitem(int32_t size, EQItype itype, int32_t dataSize) { ...@@ -124,7 +124,9 @@ void *taosAllocateQitem(int32_t size, EQItype itype, int32_t dataSize) {
if (itype == RPC_QITEM) { if (itype == RPC_QITEM) {
int64_t alloced = atomic_add_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize); int64_t alloced = atomic_add_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize);
if (alloced > tsRpcQueueMemoryAllowed) { if (alloced > tsRpcQueueMemoryAllowed) {
uError("failed to alloc qitem, size:%d alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced, tsRpcQueueMemoryUsed); uError("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced,
tsRpcQueueMemoryUsed);
atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize);
taosMemoryFree(pNode); taosMemoryFree(pNode);
terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE; terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE;
return NULL; return NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册