diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 8c9003a9b2af371ebdb50620f53d382c04609a03..ecd9daf1bcd3b83803754017aec27c1ebe62becf 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -20,7 +20,7 @@ #include "tmsg.h" #include "tref.h" #include "trpc.h" - +// clang-format off int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { int32_t lastMsgType = pTask->lastMsgType; int32_t taskStatus = SCH_GET_TASK_STATUS(pTask); @@ -402,7 +402,7 @@ int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) { qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " drop task rsp received, code:0x%x", pParam->queryId, pParam->taskId, code); if (pMsg) { - taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pData); } return TSDB_CODE_SUCCESS; } @@ -415,7 +415,7 @@ int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code) { if (head->isHbParam) { taosMemoryFree(pMsg->pData); - + SSchHbCallbackParam *hbParam = (SSchHbCallbackParam *)param; SSchTrans trans = {.pTrans = hbParam->pTrans, .pHandle = NULL}; SCH_ERR_RET(schUpdateHbConnection(&hbParam->nodeEpId, &trans)); @@ -1104,7 +1104,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, #if 1 SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)}; - schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL)); + code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL)); msg = NULL; SCH_ERR_JRET(code); @@ -1114,7 +1114,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, #else if (TDMT_VND_SUBMIT != msgType) { SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)}; - schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL)); + code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL)); msg = NULL; SCH_ERR_JRET(code); @@ -1136,3 +1136,4 @@ _return: taosMemoryFreeClear(msg); SCH_RET(code); } +// clang-format on diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 763483cbf6245448afa6d70e7952862ce88cbfee..7052b0b915137678d6aff528a26540a973cd74f5 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1432,7 +1432,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran if (pThrd == NULL && valid == false) { transFreeMsg(pReq->pCont); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return -1; + return TSDB_CODE_RPC_BROKEN_LINK; } TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); @@ -1477,7 +1477,7 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs if (pThrd == NULL && valid == false) { transFreeMsg(pReq->pCont); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return -1; + return TSDB_CODE_RPC_BROKEN_LINK; } tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t)); diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index db05aefe7b1930b286e3a2cc8aa92a251077939d..447db7613656613255369230138979a7596754a9 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -275,16 +275,15 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { if (pBuf->len <= TRANS_PACKET_LIMIT) { while (transReadComplete(pBuf)) { tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); - if (pBuf->invalid) { - tTrace("%s conn %p alread read invalid packet", transLabel(pTransInst), conn); + if (true == pBuf->invalid || false == uvHandleReq(conn)) { + tError("%s conn %p read invalid packet", transLabel(pTransInst), conn); destroyConn(conn, true); return; - } else { - if (false == uvHandleReq(conn)) break; } } return; } else { + tError("%s conn %p read invalid packet, exceed limit", transLabel(pTransInst), conn); destroyConn(conn, true); return; }