提交 324de6e4 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into refact/tsdb_last

...@@ -910,10 +910,10 @@ int32_t compareWinRes(void* pKey, void* data, int32_t index) { ...@@ -910,10 +910,10 @@ int32_t compareWinRes(void* pKey, void* data, int32_t index) {
} }
static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) { static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) {
if (!pUpdatedMap || taosHashGetSize(pUpdatedMap) == 0) { int32_t delSize = taosArrayGetSize(pDelWins);
if (taosHashGetSize(pUpdatedMap) == 0 || delSize == 0) {
return; return;
} }
int32_t delSize = taosArrayGetSize(pDelWins);
void* pIte = NULL; void* pIte = NULL;
while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) { while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
SResKeyPos* pResKey = (SResKeyPos*)pIte; SResKeyPos* pResKey = (SResKeyPos*)pIte;
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
#include "tmsg.h" #include "tmsg.h"
#include "tref.h" #include "tref.h"
#include "trpc.h" #include "trpc.h"
// clang-format off
int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
int32_t lastMsgType = pTask->lastMsgType; int32_t lastMsgType = pTask->lastMsgType;
int32_t taskStatus = SCH_GET_TASK_STATUS(pTask); int32_t taskStatus = SCH_GET_TASK_STATUS(pTask);
...@@ -1104,7 +1104,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, ...@@ -1104,7 +1104,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
#if 1 #if 1
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)}; SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL)); code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
msg = NULL; msg = NULL;
SCH_ERR_JRET(code); SCH_ERR_JRET(code);
...@@ -1114,7 +1114,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, ...@@ -1114,7 +1114,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
#else #else
if (TDMT_VND_SUBMIT != msgType) { if (TDMT_VND_SUBMIT != msgType) {
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)}; SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL)); code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
msg = NULL; msg = NULL;
SCH_ERR_JRET(code); SCH_ERR_JRET(code);
...@@ -1136,3 +1136,4 @@ _return: ...@@ -1136,3 +1136,4 @@ _return:
taosMemoryFreeClear(msg); taosMemoryFreeClear(msg);
SCH_RET(code); SCH_RET(code);
} }
// clang-format on
...@@ -1432,7 +1432,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran ...@@ -1432,7 +1432,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
if (pThrd == NULL && valid == false) { if (pThrd == NULL && valid == false) {
transFreeMsg(pReq->pCont); transFreeMsg(pReq->pCont);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return -1; return TSDB_CODE_RPC_BROKEN_LINK;
} }
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
...@@ -1477,7 +1477,7 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs ...@@ -1477,7 +1477,7 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs
if (pThrd == NULL && valid == false) { if (pThrd == NULL && valid == false) {
transFreeMsg(pReq->pCont); transFreeMsg(pReq->pCont);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return -1; return TSDB_CODE_RPC_BROKEN_LINK;
} }
tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t)); tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t));
......
...@@ -275,16 +275,15 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { ...@@ -275,16 +275,15 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
if (pBuf->len <= TRANS_PACKET_LIMIT) { if (pBuf->len <= TRANS_PACKET_LIMIT) {
while (transReadComplete(pBuf)) { while (transReadComplete(pBuf)) {
tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn);
if (pBuf->invalid) { if (true == pBuf->invalid || false == uvHandleReq(conn)) {
tTrace("%s conn %p alread read invalid packet", transLabel(pTransInst), conn); tError("%s conn %p read invalid packet", transLabel(pTransInst), conn);
destroyConn(conn, true); destroyConn(conn, true);
return; return;
} else {
if (false == uvHandleReq(conn)) break;
} }
} }
return; return;
} else { } else {
tError("%s conn %p read invalid packet, exceed limit", transLabel(pTransInst), conn);
destroyConn(conn, true); destroyConn(conn, true);
return; return;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册