未验证 提交 07d11d2b 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #12370 from taosdata/fix/fix_fd_leak

fix(rpc): avoid fd leak
...@@ -21,15 +21,16 @@ typedef struct SCliConn { ...@@ -21,15 +21,16 @@ typedef struct SCliConn {
uv_connect_t connReq; uv_connect_t connReq;
uv_stream_t* stream; uv_stream_t* stream;
uv_write_t writeReq; uv_write_t writeReq;
void* hostThrd;
SConnBuffer readBuf;
void* data;
STransQueue cliMsgs;
queue conn;
uint64_t expireTime;
int hThrdIdx;
STransCtx ctx;
void* hostThrd;
int hThrdIdx;
SConnBuffer readBuf;
STransQueue cliMsgs;
queue conn;
uint64_t expireTime;
STransCtx ctx;
bool broken; // link broken or not bool broken; // link broken or not
ConnStatus status; // ConnStatus status; //
...@@ -157,13 +158,11 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); ...@@ -157,13 +158,11 @@ static void cliWalkCb(uv_handle_t* handle, void* arg);
transClearBuffer(&conn->readBuf); \ transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \ transFreeMsg(transContFromHead((char*)head)); \
tDebug("cli conn %p receive release request, ref: %d", conn, T_REF_VAL_GET(conn)); \ tDebug("cli conn %p receive release request, ref: %d", conn, T_REF_VAL_GET(conn)); \
while (T_REF_VAL_GET(conn) > 1) { \ if (T_REF_VAL_GET(conn) > 1) { \
transUnrefCliHandle(conn); \
} \
if (T_REF_VAL_GET(conn) == 1) { \
transUnrefCliHandle(conn); \ transUnrefCliHandle(conn); \
} \ } \
destroyCmsg(pMsg); \ destroyCmsg(pMsg); \
addConnToPool(((SCliThrdObj*)conn->hostThrd)->pool, conn); \
return; \ return; \
} \ } \
} while (0) } while (0)
......
...@@ -35,7 +35,6 @@ typedef struct SSrvConn { ...@@ -35,7 +35,6 @@ typedef struct SSrvConn {
uv_timer_t pTimer; uv_timer_t pTimer;
queue queue; queue queue;
int persist; // persist connection or not
SConnBuffer readBuf; // read buf, SConnBuffer readBuf; // read buf,
int inType; int inType;
void* pTransInst; // rpc init void* pTransInst; // rpc init
...@@ -138,6 +137,7 @@ static void destroySmsg(SSrvMsg* smsg); ...@@ -138,6 +137,7 @@ static void destroySmsg(SSrvMsg* smsg);
// check whether already read complete packet // check whether already read complete packet
static SSrvConn* createConn(void* hThrd); static SSrvConn* createConn(void* hThrd);
static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/); static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/);
static int reallocConnRefHandle(SSrvConn* conn);
static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd);
static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd);
...@@ -164,7 +164,7 @@ static void* transWorkerThread(void* arg); ...@@ -164,7 +164,7 @@ static void* transWorkerThread(void* arg);
static void* transAcceptThread(void* arg); static void* transAcceptThread(void* arg);
// add handle loop // add handle loop
static bool addHandleToWorkloop(SWorkThrdObj* pThrd,char *pipeName); static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName);
static bool addHandleToAcceptloop(void* arg); static bool addHandleToAcceptloop(void* arg);
#define CONN_SHOULD_RELEASE(conn, head) \ #define CONN_SHOULD_RELEASE(conn, head) \
...@@ -180,6 +180,7 @@ static bool addHandleToAcceptloop(void* arg); ...@@ -180,6 +180,7 @@ static bool addHandleToAcceptloop(void* arg);
srvMsg->msg = tmsg; \ srvMsg->msg = tmsg; \
srvMsg->type = Release; \ srvMsg->type = Release; \
srvMsg->pConn = conn; \ srvMsg->pConn = conn; \
reallocConnRefHandle(conn); \
if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
return; \ return; \
} \ } \
...@@ -360,10 +361,14 @@ void uvOnSendCb(uv_write_t* req, int status) { ...@@ -360,10 +361,14 @@ void uvOnSendCb(uv_write_t* req, int status) {
tTrace("server conn %p data already was written on stream", conn); tTrace("server conn %p data already was written on stream", conn);
if (!transQueueEmpty(&conn->srvMsgs)) { if (!transQueueEmpty(&conn->srvMsgs)) {
SSrvMsg* msg = transQueuePop(&conn->srvMsgs); SSrvMsg* msg = transQueuePop(&conn->srvMsgs);
if (msg->type == Release && conn->status != ConnNormal) { // if (msg->type == Release && conn->status != ConnNormal) {
conn->status = ConnNormal; // conn->status = ConnNormal;
transUnrefSrvHandle(conn); // transUnrefSrvHandle(conn);
} // reallocConnRefHandle(conn);
// destroySmsg(msg);
// transQueueClear(&conn->srvMsgs);
// return;
//}
destroySmsg(msg); destroySmsg(msg);
// send second data, just use for push // send second data, just use for push
if (!transQueueEmpty(&conn->srvMsgs)) { if (!transQueueEmpty(&conn->srvMsgs)) {
...@@ -421,8 +426,15 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { ...@@ -421,8 +426,15 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
if (pConn->status == ConnNormal) { if (pConn->status == ConnNormal) {
pHead->msgType = pConn->inType + 1; pHead->msgType = pConn->inType + 1;
} else { } else {
pHead->msgType = smsg->type == Release ? 0 : pMsg->msgType; if (smsg->type == Release) {
pHead->msgType = 0;
pConn->status = ConnNormal;
transUnrefSrvHandle(pConn);
} else {
pHead->msgType = pMsg->msgType;
}
} }
pHead->release = smsg->type == Release ? 1 : 0; pHead->release = smsg->type == Release ? 1 : 0;
pHead->code = htonl(pMsg->code); pHead->code = htonl(pMsg->code);
...@@ -517,7 +529,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) { ...@@ -517,7 +529,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
int64_t refId = transMsg.refId; int64_t refId = transMsg.refId;
SExHandle* exh2 = uvAcquireExHandle(refId); SExHandle* exh2 = uvAcquireExHandle(refId);
if (exh2 == NULL || exh1 != exh2) { if (exh2 == NULL || exh1 != exh2) {
tTrace("server handle %p except msg, ignore it", exh1); tTrace("server handle except msg %p, ignore it", exh1);
uvReleaseExHandle(refId); uvReleaseExHandle(refId);
destroySmsg(msg); destroySmsg(msg);
continue; continue;
...@@ -581,11 +593,12 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { ...@@ -581,11 +593,12 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
if (uv_accept(stream, (uv_stream_t*)cli) == 0) { if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
if (pObj->numOfWorkerReady < pObj->numOfThreads) { if (pObj->numOfWorkerReady < pObj->numOfThreads) {
tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads, pObj->numOfWorkerReady); tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads,
pObj->numOfWorkerReady);
uv_close((uv_handle_t*)cli, NULL); uv_close((uv_handle_t*)cli, NULL);
return; return;
} }
uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t)); uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t));
wr->data = cli; wr->data = cli;
uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify)); uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));
...@@ -681,14 +694,14 @@ void* transAcceptThread(void* arg) { ...@@ -681,14 +694,14 @@ void* transAcceptThread(void* arg) {
return NULL; return NULL;
} }
void uvOnPipeConnectionCb(uv_connect_t *connect, int status) { void uvOnPipeConnectionCb(uv_connect_t* connect, int status) {
if (status != 0) { if (status != 0) {
return; return;
} }
SWorkThrdObj* pThrd = container_of(connect, SWorkThrdObj, connect_req); SWorkThrdObj* pThrd = container_of(connect, SWorkThrdObj, connect_req);
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
} }
static bool addHandleToWorkloop(SWorkThrdObj* pThrd,char *pipeName) { static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName) {
pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
if (0 != uv_loop_init(pThrd->loop)) { if (0 != uv_loop_init(pThrd->loop)) {
return false; return false;
...@@ -787,6 +800,19 @@ static void destroyConn(SSrvConn* conn, bool clear) { ...@@ -787,6 +800,19 @@ static void destroyConn(SSrvConn* conn, bool clear) {
// uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb); // uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb);
} }
} }
static int reallocConnRefHandle(SSrvConn* conn) {
uvReleaseExHandle(conn->refId);
uvRemoveExHandle(conn->refId);
// avoid app continue to send msg on invalid handle
SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
exh->handle = conn;
exh->pThrd = conn->hostThrd;
exh->refId = uvAddExHandle(exh);
uvAcquireExHandle(exh->refId);
conn->refId = exh->refId;
return 0;
}
static void uvDestroyConn(uv_handle_t* handle) { static void uvDestroyConn(uv_handle_t* handle) {
SSrvConn* conn = handle->data; SSrvConn* conn = handle->data;
if (conn == NULL) { if (conn == NULL) {
...@@ -822,7 +848,7 @@ static void uvPipeListenCb(uv_stream_t* handle, int status) { ...@@ -822,7 +848,7 @@ static void uvPipeListenCb(uv_stream_t* handle, int status) {
ASSERT(status == 0); ASSERT(status == 0);
SServerObj* srv = container_of(handle, SServerObj, pipeListen); SServerObj* srv = container_of(handle, SServerObj, pipeListen);
uv_pipe_t* pipe = &(srv->pipe[srv->numOfWorkerReady][0]); uv_pipe_t* pipe = &(srv->pipe[srv->numOfWorkerReady][0]);
ASSERT(0 == uv_pipe_init(srv->loop, pipe, 1)); ASSERT(0 == uv_pipe_init(srv->loop, pipe, 1));
ASSERT(0 == uv_accept((uv_stream_t*)&srv->pipeListen, (uv_stream_t*)pipe)); ASSERT(0 == uv_accept((uv_stream_t*)&srv->pipeListen, (uv_stream_t*)pipe));
...@@ -859,7 +885,8 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, ...@@ -859,7 +885,8 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%p-%lu", taosSafeRand(), GetCurrentProcessId()); snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%p-%lu", taosSafeRand(), GetCurrentProcessId());
#else #else
char pipeName[PATH_MAX] = {0}; char pipeName[PATH_MAX] = {0};
snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08X-%lu", tsTempDir, TD_DIRSEP, taosSafeRand(), taosGetSelfPthreadId()); snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08X-%lu", tsTempDir, TD_DIRSEP, taosSafeRand(),
taosGetSelfPthreadId());
#endif #endif
assert(0 == uv_pipe_bind(&srv->pipeListen, pipeName)); assert(0 == uv_pipe_bind(&srv->pipeListen, pipeName));
assert(0 == uv_listen((uv_stream_t*)&srv->pipeListen, SOMAXCONN, uvPipeListenCb)); assert(0 == uv_listen((uv_stream_t*)&srv->pipeListen, SOMAXCONN, uvPipeListenCb));
...@@ -874,7 +901,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, ...@@ -874,7 +901,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
thrd->pipe = &(srv->pipe[i][1]); // init read thrd->pipe = &(srv->pipe[i][1]); // init read
if (false == addHandleToWorkloop(thrd,pipeName)) { if (false == addHandleToWorkloop(thrd, pipeName)) {
goto End; goto End;
} }
int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd)); int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd));
...@@ -959,6 +986,7 @@ void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) { ...@@ -959,6 +986,7 @@ void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) {
void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) { void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) {
SSrvConn* conn = msg->pConn; SSrvConn* conn = msg->pConn;
if (conn->status == ConnAcquire) { if (conn->status == ConnAcquire) {
reallocConnRefHandle(conn);
if (!transQueuePush(&conn->srvMsgs, msg)) { if (!transQueuePush(&conn->srvMsgs, msg)) {
return; return;
} }
......
...@@ -35,7 +35,7 @@ class TDSimClient: ...@@ -35,7 +35,7 @@ class TDSimClient:
"tableIncStepPerVnode": "10000", "tableIncStepPerVnode": "10000",
"maxVgroupsPerDb": "1000", "maxVgroupsPerDb": "1000",
"sdbDebugFlag": "143", "sdbDebugFlag": "143",
"rpcDebugFlag": "135", "rpcDebugFlag": "143",
"tmrDebugFlag": "131", "tmrDebugFlag": "131",
"cDebugFlag": "135", "cDebugFlag": "135",
"udebugFlag": "135", "udebugFlag": "135",
...@@ -136,7 +136,7 @@ class TDDnode: ...@@ -136,7 +136,7 @@ class TDDnode:
"tsdbDebugFlag": "135", "tsdbDebugFlag": "135",
"mDebugFlag": "135", "mDebugFlag": "135",
"sdbDebugFlag": "135", "sdbDebugFlag": "135",
"rpcDebugFlag": "135", "rpcDebugFlag": "143",
"tmrDebugFlag": "131", "tmrDebugFlag": "131",
"cDebugFlag": "135", "cDebugFlag": "135",
"httpDebugFlag": "135", "httpDebugFlag": "135",
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册