未验证 提交 b09791d4 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #14476 from taosdata/fix/rpcInvalidRead

fix: avoid invalid read
...@@ -87,8 +87,8 @@ static SProcQueue *dmInitProcQueue(SProc *proc, char *ptr, int32_t size) { ...@@ -87,8 +87,8 @@ static SProcQueue *dmInitProcQueue(SProc *proc, char *ptr, int32_t size) {
static void dmCleanupProcQueue(SProcQueue *queue) {} static void dmCleanupProcQueue(SProcQueue *queue) {}
static inline int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, SRpcMsg *pMsg, EProcFuncType ftype) { static inline int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, SRpcMsg *pMsg, EProcFuncType ftype) {
const void *pHead = pMsg; const void * pHead = pMsg;
const void *pBody = pMsg->pCont; const void * pBody = pMsg->pCont;
const int16_t rawHeadLen = sizeof(SRpcMsg); const int16_t rawHeadLen = sizeof(SRpcMsg);
const int32_t rawBodyLen = pMsg->contLen; const int32_t rawBodyLen = pMsg->contLen;
const int16_t headLen = CEIL8(rawHeadLen); const int16_t headLen = CEIL8(rawHeadLen);
...@@ -257,7 +257,7 @@ int32_t dmInitProc(struct SMgmtWrapper *pWrapper) { ...@@ -257,7 +257,7 @@ int32_t dmInitProc(struct SMgmtWrapper *pWrapper) {
proc->wrapper = pWrapper; proc->wrapper = pWrapper;
proc->name = pWrapper->name; proc->name = pWrapper->name;
SShm *shm = &proc->shm; SShm * shm = &proc->shm;
int32_t cstart = 0; int32_t cstart = 0;
int32_t csize = CEIL8(shm->size / 2); int32_t csize = CEIL8(shm->size / 2);
int32_t pstart = csize; int32_t pstart = csize;
...@@ -281,13 +281,13 @@ int32_t dmInitProc(struct SMgmtWrapper *pWrapper) { ...@@ -281,13 +281,13 @@ int32_t dmInitProc(struct SMgmtWrapper *pWrapper) {
} }
static void *dmConsumChildQueue(void *param) { static void *dmConsumChildQueue(void *param) {
SProc *proc = param; SProc * proc = param;
SMgmtWrapper *pWrapper = proc->wrapper; SMgmtWrapper *pWrapper = proc->wrapper;
SProcQueue *queue = proc->cqueue; SProcQueue * queue = proc->cqueue;
int32_t numOfMsgs = 0; int32_t numOfMsgs = 0;
int32_t code = 0; int32_t code = 0;
EProcFuncType ftype = DND_FUNC_REQ; EProcFuncType ftype = DND_FUNC_REQ;
SRpcMsg *pMsg = NULL; SRpcMsg * pMsg = NULL;
dDebug("node:%s, start to consume from cqueue", proc->name); dDebug("node:%s, start to consume from cqueue", proc->name);
do { do {
...@@ -324,13 +324,13 @@ static void *dmConsumChildQueue(void *param) { ...@@ -324,13 +324,13 @@ static void *dmConsumChildQueue(void *param) {
} }
static void *dmConsumParentQueue(void *param) { static void *dmConsumParentQueue(void *param) {
SProc *proc = param; SProc * proc = param;
SMgmtWrapper *pWrapper = proc->wrapper; SMgmtWrapper *pWrapper = proc->wrapper;
SProcQueue *queue = proc->pqueue; SProcQueue * queue = proc->pqueue;
int32_t numOfMsgs = 0; int32_t numOfMsgs = 0;
int32_t code = 0; int32_t code = 0;
EProcFuncType ftype = DND_FUNC_REQ; EProcFuncType ftype = DND_FUNC_REQ;
SRpcMsg *pMsg = NULL; SRpcMsg * pMsg = NULL;
dDebug("node:%s, start to consume from pqueue", proc->name); dDebug("node:%s, start to consume from pqueue", proc->name);
do { do {
...@@ -353,7 +353,7 @@ static void *dmConsumParentQueue(void *param) { ...@@ -353,7 +353,7 @@ static void *dmConsumParentQueue(void *param) {
rpcRegisterBrokenLinkArg(pMsg); rpcRegisterBrokenLinkArg(pMsg);
} else if (ftype == DND_FUNC_RELEASE) { } else if (ftype == DND_FUNC_RELEASE) {
dmRemoveProcRpcHandle(proc, pMsg->info.handle); dmRemoveProcRpcHandle(proc, pMsg->info.handle);
rpcReleaseHandle(pMsg->info.handle, (int8_t)pMsg->code); rpcReleaseHandle(&pMsg->info, TAOS_CONN_SERVER);
} else { } else {
dError("node:%s, invalid ftype:%d from pqueue", proc->name, ftype); dError("node:%s, invalid ftype:%d from pqueue", proc->name, ftype);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
......
...@@ -245,7 +245,7 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { ...@@ -245,7 +245,7 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) {
SRpcMsg msg = {.code = type, .info = *pHandle}; SRpcMsg msg = {.code = type, .info = *pHandle};
dmPutToProcPQueue(&pWrapper->proc, &msg, DND_FUNC_RELEASE); dmPutToProcPQueue(&pWrapper->proc, &msg, DND_FUNC_RELEASE);
} else { } else {
rpcReleaseHandle(pHandle->handle, type); rpcReleaseHandle(pHandle, type);
} }
} }
......
...@@ -1029,9 +1029,9 @@ void transUnrefSrvHandle(void* handle) { ...@@ -1029,9 +1029,9 @@ void transUnrefSrvHandle(void* handle) {
} }
void transReleaseSrvHandle(void* handle) { void transReleaseSrvHandle(void* handle) {
SExHandle* exh = handle; SRpcHandleInfo* info = handle;
int64_t refId = exh->refId; SExHandle* exh = info->handle;
int64_t refId = info->refId;
ASYNC_CHECK_HANDLE(exh, refId); ASYNC_CHECK_HANDLE(exh, refId);
SWorkThrd* pThrd = exh->pThrd; SWorkThrd* pThrd = exh->pThrd;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册