未验证 提交 e26c474f 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18076 from taosdata/fix/avoidAhandleleak

fix: avoid ahandle leak
......@@ -61,7 +61,8 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
int32_t numOfBatchs = taosArrayGetSize(batchUseRsp.pArray);
for (int32_t i = 0; i < numOfBatchs; ++i) {
SUseDbRsp *rsp = taosArrayGet(batchUseRsp.pArray, i);
tscDebug("hb db rsp, db:%s, vgVersion:%d, stateTs:%" PRId64 ", uid:%" PRIx64, rsp->db, rsp->vgVersion, rsp->stateTs, rsp->uid);
tscDebug("hb db rsp, db:%s, vgVersion:%d, stateTs:%" PRId64 ", uid:%" PRIx64, rsp->db, rsp->vgVersion, rsp->stateTs,
rsp->uid);
if (rsp->vgVersion < 0) {
code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid);
......@@ -293,6 +294,7 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
taosThreadMutexUnlock(&appInfo.mutex);
tscError("cluster not exist, key:%s", key);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
tFreeClientHbBatchRsp(&pRsp);
return -1;
}
......@@ -322,6 +324,7 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
tFreeClientHbBatchRsp(&pRsp);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
return code;
}
......
......@@ -96,7 +96,7 @@ typedef void* queue[2];
//#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit
//#define TRANS_RETRY_INTERVAL 15 // retry interval (ms)
#define TRANS_CONN_TIMEOUT 3 // connect timeout (s)
#define TRANS_CONN_TIMEOUT 3000 // connect timeout (ms)
#define TRANS_READ_TIMEOUT 3000 // read timeout (ms)
#define TRANS_PACKET_LIMIT 1024 * 1024 * 512
......
......@@ -25,7 +25,8 @@ typedef struct SCliConn {
uv_connect_t connReq;
uv_stream_t* stream;
queue wreqQueue;
uv_timer_t* timer;
uv_timer_t* timer; // read timer, forbidden
void* hostThrd;
......@@ -79,6 +80,7 @@ typedef struct SCliThrd {
uint64_t nextTimeout; // next timeout
void* pTransInst; //
void (*destroyAhandleFp)(void* ahandle);
SHashObj* fqdn2ipCache;
SCvtAddr cvtAddr;
......@@ -102,6 +104,8 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
static void addConnToPool(void* pool, SCliConn* conn);
static void doCloseIdleConn(void* param);
// register conn timer
static void cliConnTimeout(uv_timer_t* handle);
// register timer for read
static void cliReadTimeoutCb(uv_timer_t* handle);
// register timer in each thread to clear expire conn
......@@ -155,6 +159,7 @@ static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq,
static FORCE_INLINE void destroyUserdata(STransMsg* userdata);
static FORCE_INLINE void destroyCmsg(void* cmsg);
static FORCE_INLINE void destroyCmsgAndAhandle(void* cmsg);
static FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst);
static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx);
......@@ -258,16 +263,15 @@ static void* cliWorkThread(void* arg);
static void cliReleaseUnfinishedMsg(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) {
SCliMsg* msg = transQueueGet(&conn->cliMsgs, i);
if (msg != NULL && msg->ctx != NULL && msg->ctx->ahandle != (void*)0x9527) {
if (conn->ctx.freeFunc != NULL && msg->ctx->ahandle != NULL) {
conn->ctx.freeFunc(msg->ctx->ahandle);
} else if (msg->ctx->ahandle != NULL && pTransInst->destroyFp != NULL) {
} else if (msg->ctx->ahandle != NULL && pThrd->destroyAhandleFp != NULL) {
tDebug("%s conn %p destroy unfinished ahandle %p", CONN_GET_INST_LABEL(conn), conn, msg->ctx->ahandle);
pTransInst->destroyFp(msg->ctx->ahandle);
pThrd->destroyAhandleFp(msg->ctx->ahandle);
}
}
destroyCmsg(msg);
......@@ -407,9 +411,16 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
bool once = false;
do {
SCliMsg* pMsg = transQueuePop(&pConn->cliMsgs);
if (pMsg == NULL && once) {
break;
}
if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg)) {
destroyCmsg(pMsg);
break;
}
STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
STransMsg transMsg = {0};
......@@ -439,6 +450,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
continue;
}
}
if (pMsg == NULL || (pMsg && pMsg->type != Release)) {
if (cliAppCb(pConn, &transMsg, pMsg) != 0) {
return;
......@@ -454,6 +466,19 @@ void cliHandleExcept(SCliConn* conn) {
cliHandleExceptImpl(conn, -1);
}
void cliConnTimeout(uv_timer_t* handle) {
SCliConn* conn = handle->data;
SCliThrd* pThrd = conn->hostThrd;
tTrace("%s conn %p conn timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
uv_timer_stop(handle);
handle->data = NULL;
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer = NULL;
cliHandleExceptImpl(conn, -1);
}
void cliReadTimeoutCb(uv_timer_t* handle) {
// set up timeout cb
SCliConn* conn = handle->data;
......@@ -545,7 +570,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
if (conn->list->size >= 50) {
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
arg->param1 = conn;
arg->param2 = NULL;
arg->param2 = thrd;
STrans* pTransInst = thrd->pTransInst;
conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
......@@ -630,8 +655,16 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
conn->stream->data = conn;
conn->connReq.data = conn;
uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
if (timer == NULL) {
timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
tDebug("no available timer, create a timer %p", timer);
uv_timer_init(pThrd->loop, timer);
}
timer->data = conn;
conn->timer = timer;
conn->connReq.data = conn;
transReqQueueInit(&conn->wreqQueue);
transQueueInit(&conn->cliMsgs, NULL);
......@@ -661,8 +694,8 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
}
if (conn->timer != NULL) {
uv_timer_stop(conn->timer);
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer->data = NULL;
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer = NULL;
}
......@@ -811,6 +844,15 @@ _RETURN:
void cliConnCb(uv_connect_t* req, int status) {
// impl later
SCliConn* pConn = req->data;
SCliThrd* pThrd = pConn->hostThrd;
if (pConn->timer != NULL) {
uv_timer_stop(pConn->timer);
pConn->timer->data = NULL;
taosArrayPush(pThrd->timerList, &pConn->timer);
pConn->timer = NULL;
}
if (status != 0) {
tError("%s conn %p failed to connect server:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status));
cliHandleExcept(pConn);
......@@ -989,31 +1031,26 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
conn->ip = strdup(EPSET_GET_INUSE_IP(&pCtx->epSet));
conn->port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
int ret = transSetConnOption((uv_tcp_t*)conn->stream);
if (ret) {
tError("%s conn %p failed to set conn option, errmsg %s", transLabel(pTransInst), conn, uv_err_name(ret));
}
int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT);
if (fd == -1) {
tTrace("%s conn %p failed to create socket", transLabel(pTransInst), conn);
cliHandleExcept(conn);
return;
}
uv_tcp_open((uv_tcp_t*)conn->stream, fd);
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, conn->ip);
addr.sin_port = (uint16_t)htons((uint16_t)conn->port);
tTrace("%s conn %p try to connect to %s:%d", pTransInst->label, conn, conn->ip, conn->port);
ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
int ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
if (ret != 0) {
tTrace("%s conn %p failed to connect to %s:%d, reason:%s", pTransInst->label, conn, conn->ip, conn->port,
uv_err_name(ret));
uv_timer_stop(conn->timer);
conn->timer->data = NULL;
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer = NULL;
cliHandleExcept(conn);
return;
}
uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
}
STraceId* trace = &pMsg->msg.info.traceId;
tGTrace("%s conn %p ready", pTransInst->label, conn);
......@@ -1136,6 +1173,8 @@ static void* cliWorkThread(void* arg) {
pThrd->pid = taosGetSelfPthreadId();
setThreadName("trans-cli-work");
uv_run(pThrd->loop, UV_RUN_DEFAULT);
tDebug("thread quit-thread:%08" PRId64, pThrd->pid);
return NULL;
}
......@@ -1177,6 +1216,25 @@ static FORCE_INLINE void destroyCmsg(void* arg) {
taosMemoryFree(pMsg);
}
static FORCE_INLINE void destroyCmsgAndAhandle(void* param) {
if (param == NULL) return;
STaskArg* arg = param;
SCliMsg* pMsg = arg->param1;
SCliThrd* pThrd = arg->param2;
tDebug("destroy Ahandle A");
if (pThrd != NULL && pThrd->destroyAhandleFp != NULL) {
tDebug("destroy Ahandle B");
pThrd->destroyAhandleFp(pMsg->ctx->ahandle);
}
tDebug("destroy Ahandle C");
transDestroyConnCtx(pMsg->ctx);
destroyUserdata(&pMsg->msg);
taosMemoryFree(pMsg);
}
static SCliThrd* createThrdObj(void* trans) {
STrans* pTransInst = trans;
......@@ -1195,7 +1253,7 @@ static SCliThrd* createThrdObj(void* trans) {
pThrd->prepare->data = pThrd;
// uv_prepare_start(pThrd->prepare, cliPrepareCb);
int32_t timerSize = 512;
int32_t timerSize = 64;
pThrd->timerList = taosArrayInit(timerSize, sizeof(void*));
for (int i = 0; i < timerSize; i++) {
uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
......@@ -1211,6 +1269,7 @@ static SCliThrd* createThrdObj(void* trans) {
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
pThrd->pTransInst = trans;
pThrd->destroyAhandleFp = pTransInst->destroyFp;
pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pThrd->quit = false;
return pThrd;
......@@ -1226,9 +1285,10 @@ static void destroyThrdObj(SCliThrd* pThrd) {
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg);
transAsyncPoolDestroy(pThrd->asyncPool);
transDQDestroy(pThrd->delayQueue, destroyCmsg);
transDQDestroy(pThrd->delayQueue, destroyCmsgAndAhandle);
transDQDestroy(pThrd->timeoutQueue, NULL);
tDebug("thread destroy %" PRId64, pThrd->pid);
for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) {
uv_timer_t* timer = taosArrayGetP(pThrd->timerList, i);
taosMemoryFree(timer);
......@@ -1254,7 +1314,18 @@ void cliSendQuit(SCliThrd* thrd) {
}
void cliWalkCb(uv_handle_t* handle, void* arg) {
if (!uv_is_closing(handle)) {
uv_read_stop((uv_stream_t*)handle);
if (uv_handle_get_type(handle) == UV_TIMER) {
// SCliConn* pConn = handle->data;
// if (pConn != NULL && pConn->timer != NULL) {
// SCliThrd* pThrd = pConn->hostThrd;
// uv_timer_stop((uv_timer_t*)handle);
// handle->data = NULL;
// taosArrayPush(pThrd->timerList, &pConn->timer);
// pConn->timer = NULL;
// }
} else {
uv_read_stop((uv_stream_t*)handle);
}
uv_close(handle, cliDestroy);
}
}
......
......@@ -497,7 +497,7 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) {
SDelayTask* task = container_of(minNode, SDelayTask, node);
STaskArg* arg = task->arg;
if (freeFunc) freeFunc(arg->param1);
if (freeFunc) freeFunc(arg);
taosMemoryFree(arg);
taosMemoryFree(task);
......
......@@ -435,7 +435,7 @@ static inline int32_t taosBuildLogHead(char *buffer, const char *flags) {
taosGetTimeOfDay(&timeSecs);
time_t curTime = timeSecs.tv_sec;
ptm = taosLocalTimeNolock(&Tm, &curTime, taosGetDaylight());
ptm = taosLocalTime(&curTime, &Tm);
return sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d %08" PRId64 " %s", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour,
ptm->tm_min, ptm->tm_sec, (int32_t)timeSecs.tv_usec, taosGetSelfPthreadId(), flags);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册