未验证 提交 3b4802cc 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #16496 from taosdata/feat/avoidClientDeadLock

avoid deadlock
......@@ -155,6 +155,8 @@ static void clientSentCb(uv_write_t* req, int32_t status) {
if (status != 0) {
terrno = TAOS_SYSTEM_ERROR(status);
uError("http-report failed to send data %s", uv_strerror(status));
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
return;
} else {
uTrace("http-report succ to send data");
}
......
......@@ -16,7 +16,7 @@
#include "transComm.h"
typedef struct SConnList {
queue conn;
queue conns;
int32_t size;
} SConnList;
......@@ -107,11 +107,11 @@ static void doCloseIdleConn(void* param);
static void cliReadTimeoutCb(uv_timer_t* handle);
// register timer in each thread to clear expire conn
// static void cliTimeoutCb(uv_timer_t* handle);
// alloc buf for recv
// alloc buffer for recv
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
// callback after read nbytes from socket
// callback after recv nbytes from socket
static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
// callback after write data to socket
// callback after send data to socket
static void cliSendCb(uv_write_t* req, int status);
// callback after conn to server
static void cliConnCb(uv_connect_t* req, int status);
......@@ -129,19 +129,14 @@ static SCliConn* cliCreateConn(SCliThrd* thrd);
static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
static void cliDestroy(uv_handle_t* handle);
static void cliSend(SCliConn* pConn);
static void cliDestroyConnMsgs(SCliConn* conn, bool destroy);
static bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) {
if (code != 0) return false;
if (pCtx->retryCnt == 0) return false;
if (transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) return false;
return true;
}
// cli util func
static bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx);
static void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr);
static int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp);
void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr);
/*
* set TCP connection timeout per-socket level
*/
static int cliCreateSocket();
// process data read from server, add decompress etc later
static void cliHandleResp(SCliConn* conn);
// handle except about conn
......@@ -169,15 +164,14 @@ static void destroyThrdObj(SCliThrd* pThrd);
static void cliWalkCb(uv_handle_t* handle, void* arg);
static void cliReleaseUnfinishedMsg(SCliConn* conn) {
SCliMsg* pMsg = NULL;
for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) {
pMsg = transQueueGet(&conn->cliMsgs, i);
if (pMsg != NULL && pMsg->ctx != NULL) {
if (conn->ctx.freeFunc != NULL) {
conn->ctx.freeFunc(pMsg->ctx->ahandle);
SCliMsg* msg = transQueueGet(&conn->cliMsgs, i);
if (msg != NULL && msg->ctx != NULL) {
if (conn->ctx.freeFunc != NULL && msg->ctx->ahandle != NULL) {
conn->ctx.freeFunc(msg->ctx->ahandle);
}
}
destroyCmsg(pMsg);
destroyCmsg(msg);
}
}
#define CLI_RELEASE_UV(loop) \
......@@ -217,8 +211,10 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
} \
if (i == sz) { \
pMsg = NULL; \
tDebug("msg not found, %" PRIu64 "", ahandle); \
} else { \
pMsg = transQueueRm(&conn->cliMsgs, i); \
tDebug("msg found, %" PRIu64 "", ahandle); \
} \
} while (0)
#define CONN_GET_NEXT_SENDMSG(conn) \
......@@ -470,8 +466,8 @@ void* createConnPool(int size) {
void* destroyConnPool(void* pool) {
SConnList* connList = taosHashIterate((SHashObj*)pool, NULL);
while (connList != NULL) {
while (!QUEUE_IS_EMPTY(&connList->conn)) {
queue* h = QUEUE_HEAD(&connList->conn);
while (!QUEUE_IS_EMPTY(&connList->conns)) {
queue* h = QUEUE_HEAD(&connList->conns);
SCliConn* c = QUEUE_DATA(h, SCliConn, q);
cliDestroyConn(c, true);
}
......@@ -484,21 +480,21 @@ void* destroyConnPool(void* pool) {
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
char key[32] = {0};
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
SHashObj* pPool = pool;
SConnList* plist = taosHashGet(pPool, key, strlen(key));
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
if (plist == NULL) {
SConnList list = {0};
taosHashPut(pPool, key, strlen(key), (void*)&list, sizeof(list));
plist = taosHashGet(pPool, key, strlen(key));
QUEUE_INIT(&plist->conn);
taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
plist = taosHashGet((SHashObj*)pool, key, strlen(key));
QUEUE_INIT(&plist->conns);
}
if (QUEUE_IS_EMPTY(&plist->conn)) {
if (QUEUE_IS_EMPTY(&plist->conns)) {
return NULL;
}
plist->size -= 1;
queue* h = QUEUE_HEAD(&plist->conn);
queue* h = QUEUE_HEAD(&plist->conns);
SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
conn->status = ConnNormal;
QUEUE_REMOVE(&conn->q);
......@@ -514,22 +510,21 @@ static void addConnToPool(void* pool, SCliConn* conn) {
if (conn->status == ConnInPool) {
return;
}
SCliThrd* thrd = conn->hostThrd;
CONN_HANDLE_THREAD_QUIT(thrd);
allocConnRef(conn, true);
SCliThrd* thrd = conn->hostThrd;
if (conn->timer != NULL) {
uv_timer_stop(conn->timer);
taosArrayPush(thrd->timerList, &conn->timer);
conn->timer->data = NULL;
conn->timer = NULL;
}
if (T_REF_VAL_GET(conn) > 1) {
transUnrefCliHandle(conn);
}
cliDestroyConnMsgs(conn, false);
STrans* pTransInst = thrd->pTransInst;
cliReleaseUnfinishedMsg(conn);
transQueueClear(&conn->cliMsgs);
transCtxCleanup(&conn->ctx);
conn->status = ConnInPool;
if (conn->list == NULL) {
......@@ -540,18 +535,15 @@ static void addConnToPool(void* pool, SCliConn* conn) {
} else {
tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap);
}
assert(conn->list != NULL);
QUEUE_INIT(&conn->q);
QUEUE_PUSH(&conn->list->conn, &conn->q);
QUEUE_PUSH(&conn->list->conns, &conn->q);
conn->list->size += 1;
conn->task = NULL;
assert(!QUEUE_IS_EMPTY(&conn->list->conn));
if (conn->list->size >= 50) {
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
arg->param1 = conn;
arg->param2 = thrd;
STrans* pTransInst = thrd->pTransInst;
conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
}
}
......@@ -691,11 +683,10 @@ static void cliDestroy(uv_handle_t* handle) {
transRemoveExHandle(transGetRefMgt(), conn->refId);
taosMemoryFree(conn->ip);
conn->stream->data = NULL;
taosMemoryFree(conn->stream);
transCtxCleanup(&conn->ctx);
cliReleaseUnfinishedMsg(conn);
transQueueDestroy(&conn->cliMsgs);
cliDestroyConnMsgs(conn, true);
tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
transReqQueueClear(&conn->wreqQueue);
transDestroyBuffer(&conn->readBuf);
......@@ -738,8 +729,6 @@ static void cliSendCb(uv_write_t* req, int status) {
}
void cliSend(SCliConn* pConn) {
CONN_HANDLE_BROKEN(pConn);
assert(!transQueueEmpty(&pConn->cliMsgs));
SCliMsg* pCliMsg = NULL;
......@@ -756,8 +745,8 @@ void cliSend(SCliConn* pConn) {
pMsg->pCont = (void*)rpcMallocCont(0);
pMsg->contLen = 0;
}
int msgLen = transMsgLenFromCont(pMsg->contLen);
int msgLen = transMsgLenFromCont(pMsg->contLen);
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0;
......@@ -769,8 +758,6 @@ void cliSend(SCliConn* pConn) {
pHead->traceId = pMsg->info.traceId;
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
STraceId* trace = &pMsg->info.traceId;
tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn,
TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, pMsg->contLen);
......@@ -792,6 +779,8 @@ void cliSend(SCliConn* pConn) {
tGTrace("%s conn %p start timer for msg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType));
uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0);
}
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
return;
......@@ -807,7 +796,6 @@ void cliConnCb(uv_connect_t* req, int status) {
cliHandleExcept(pConn);
return;
}
// int addrlen = sizeof(pConn->addr);
struct sockaddr peername, sockname;
int addrlen = sizeof(peername);
......@@ -840,7 +828,7 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
int64_t refId = (int64_t)(pMsg->msg.info.handle);
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
if (exh == NULL) {
tDebug("%" PRId64 " already release", refId);
tDebug("%" PRId64 " already released", refId);
destroyCmsg(pMsg);
return;
}
......@@ -856,6 +844,9 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
return;
}
cliSend(conn);
} else {
tError("%s conn %p already released", CONN_GET_INST_LABEL(conn), conn);
destroyCmsg(pMsg);
}
}
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) {
......@@ -905,6 +896,27 @@ void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) {
}
}
}
bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) {
if (code != 0) return false;
if (pCtx->retryCnt == 0) return false;
if (transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) return false;
return true;
}
int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) {
if (pMsg == NULL) return -1;
memset(pResp, 0, sizeof(STransMsg));
pResp->code = TSDB_CODE_RPC_BROKEN_LINK;
pResp->msgType = pMsg->msg.msgType + 1;
pResp->info.ahandle = pMsg->ctx ? pMsg->ctx->ahandle : NULL;
pResp->info.traceId = pMsg->msg.info.traceId;
return 0;
}
void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
STrans* pTransInst = pThrd->pTransInst;
STransConnCtx* pCtx = pMsg->ctx;
......@@ -920,13 +932,8 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore);
if (ignore == true) {
// persist conn already release by server
STransMsg resp = {0};
resp.code = TSDB_CODE_RPC_BROKEN_LINK;
resp.msgType = pMsg->msg.msgType + 1;
resp.info.ahandle = pMsg && pMsg->ctx ? pMsg->ctx->ahandle : NULL;
resp.info.traceId = pMsg->msg.info.traceId;
STransMsg resp;
cliBuildExceptResp(pMsg, &resp);
pTransInst->cfp(pTransInst->parent, &resp, NULL);
destroyCmsg(pMsg);
return;
......@@ -991,9 +998,6 @@ static void cliAsyncCb(uv_async_t* handle) {
QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
if (pMsg == NULL) {
continue;
}
(*cliAsyncHandle[pMsg->type])(pMsg, pThrd);
count++;
}
......@@ -1035,24 +1039,58 @@ static void cliPrepareCb(uv_prepare_t* handle) {
if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd);
}
void cliDestroyConnMsgs(SCliConn* conn, bool destroy) {
transCtxCleanup(&conn->ctx);
cliReleaseUnfinishedMsg(conn);
if (destroy == 1) {
transQueueDestroy(&conn->cliMsgs);
} else {
transQueueClear(&conn->cliMsgs);
}
}
void cliIteraConnMsgs(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) {
SCliMsg* cmsg = transQueueGet(&conn->cliMsgs, i);
if (cmsg->type == Release || REQUEST_NO_RESP(&cmsg->msg) || cmsg->msg.msgType == TDMT_SCH_DROP_TASK) {
continue;
}
STransMsg resp = {0};
if (-1 == cliBuildExceptResp(cmsg, &resp)) {
continue;
}
pTransInst->cfp(pTransInst->parent, &resp, NULL);
cmsg->ctx->ahandle = NULL;
}
}
bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) {
if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
uint64_t ahandle = pHead->ahandle;
tDebug("ahandle = %" PRIu64 "", ahandle);
SCliMsg* pMsg = NULL;
CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
transClearBuffer(&conn->readBuf);
transFreeMsg(transContFromHead((char*)pHead));
if (transQueueSize(&conn->cliMsgs) > 0 && ahandle == 0) {
SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, 0);
if (cliMsg->type == Release) return true;
for (int i = 0; ahandle == 0 && i < transQueueSize(&conn->cliMsgs); i++) {
SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, i);
if (cliMsg->type == Release) {
assert(pMsg == NULL);
return true;
}
}
cliIteraConnMsgs(conn);
tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId);
if (T_REF_VAL_GET(conn) > 1) {
transUnrefCliHandle(conn);
}
destroyCmsg(pMsg);
cliReleaseUnfinishedMsg(conn);
transQueueClear(&conn->cliMsgs);
addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn);
return true;
}
......
......@@ -492,7 +492,6 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
// release handle to rpc init
if (msg->type == Quit) {
(*transAsyncHandle[msg->type])(msg, pThrd);
continue;
} else {
STransMsg transMsg = msg->msg;
......@@ -771,7 +770,7 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
// conn set
QUEUE_INIT(&pThrd->conn);
pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 1, pThrd, uvWorkerAsyncCb);
pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 5, pThrd, uvWorkerAsyncCb);
uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb);
// uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
return true;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册