未验证 提交 f14511e5 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #15319 from taosdata/feat/removeIdleConn

feat: remove idle conn
......@@ -396,6 +396,7 @@ typedef struct SDelayQueue {
int transDQCreate(uv_loop_t* loop, SDelayQueue** queue);
void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg));
SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs);
void transDQCancel(SDelayQueue* queue, SDelayTask* task);
bool transEpSetIsEqual(SEpSet* a, SEpSet* b);
/*
......
......@@ -27,7 +27,6 @@ typedef struct SCliConn {
SConnBuffer readBuf;
STransQueue cliMsgs;
queue q;
uint64_t expireTime;
STransCtx ctx;
bool broken; // link broken or not
......@@ -37,6 +36,7 @@ typedef struct SCliConn {
char* ip;
uint32_t port;
SDelayTask* task;
// debug and log info
struct sockaddr_in addr;
struct sockaddr_in localAddr;
......@@ -65,6 +65,7 @@ typedef struct SCliThrd {
queue msg;
TdThreadMutex msgMtx;
SDelayQueue* delayQueue;
SDelayQueue* timeoutQueue;
uint64_t nextTimeout; // next timeout
void* pTransInst; //
......@@ -92,9 +93,10 @@ static void* createConnPool(int size);
static void* destroyConnPool(void* pool);
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
static void addConnToPool(void* pool, SCliConn* conn);
static void doCloseIdleConn(void* param);
// register timer in each thread to clear expire conn
static void cliTimeoutCb(uv_timer_t* handle);
// static void cliTimeoutCb(uv_timer_t* handle);
// alloc buf for recv
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
// callback after read nbytes from socket
......@@ -184,7 +186,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
pThrd = (SCliThrd*)(exh)->pThrd; \
} \
} while (0)
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
#define CONN_PERSIST_TIME(para) ((para) == 0 ? 3 * 1000 : (para))
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label)
#define CONN_SHOULD_RELEASE(conn, head) \
......@@ -384,10 +386,6 @@ void cliHandleResp(SCliConn* conn) {
}
uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
// start thread's timer of conn pool if not active
if (!uv_is_active((uv_handle_t*)&pThrd->timer) && pTransInst->idleTime > 0) {
// uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
}
}
void cliHandleExcept(SCliConn* pConn) {
......@@ -441,30 +439,30 @@ void cliHandleExcept(SCliConn* pConn) {
transUnrefCliHandle(pConn);
}
void cliTimeoutCb(uv_timer_t* handle) {
SCliThrd* pThrd = handle->data;
STrans* pTransInst = pThrd->pTransInst;
int64_t currentTime = pThrd->nextTimeout;
tTrace("%s conn timeout, try to remove expire conn from conn pool", pTransInst->label);
SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL);
while (p != NULL) {
while (!QUEUE_IS_EMPTY(&p->conn)) {
queue* h = QUEUE_HEAD(&p->conn);
SCliConn* c = QUEUE_DATA(h, SCliConn, q);
if (c->expireTime < currentTime) {
QUEUE_REMOVE(h);
transUnrefCliHandle(c);
} else {
break;
}
}
p = taosHashIterate((SHashObj*)pThrd->pool, p);
}
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pTransInst->idleTime) / 2, 0);
}
// void cliTimeoutCb(uv_timer_t* handle) {
// SCliThrd* pThrd = handle->data;
// STrans* pTransInst = pThrd->pTransInst;
// int64_t currentTime = pThrd->nextTimeout;
// tTrace("%s conn timeout, try to remove expire conn from conn pool", pTransInst->label);
//
// SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL);
// while (p != NULL) {
// while (!QUEUE_IS_EMPTY(&p->conn)) {
// queue* h = QUEUE_HEAD(&p->conn);
// SCliConn* c = QUEUE_DATA(h, SCliConn, q);
// if (c->expireTime < currentTime) {
// QUEUE_REMOVE(h);
// transUnrefCliHandle(c);
// } else {
// break;
// }
// }
// p = taosHashIterate((SHashObj*)pThrd->pool, p);
// }
//
// pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
// uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pTransInst->idleTime) / 2, 0);
// }
void* createConnPool(int size) {
// thread local, no lock
......@@ -506,6 +504,10 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
QUEUE_REMOVE(&conn->q);
QUEUE_INIT(&conn->q);
assert(h == &conn->q);
transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
conn->task = NULL;
return conn;
}
static int32_t allocConnRef(SCliConn* conn, bool update) {
......@@ -537,6 +539,7 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
transReleaseExHandle(transGetRefMgt(), handle);
return 0;
}
static void addConnToPool(void* pool, SCliConn* conn) {
if (conn->status == ConnInPool) {
return;
......@@ -547,7 +550,6 @@ static void addConnToPool(void* pool, SCliConn* conn) {
allocConnRef(conn, true);
STrans* pTransInst = thrd->pTransInst;
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
cliReleaseUnfinishedMsg(conn);
transQueueClear(&conn->cliMsgs);
transCtxCleanup(&conn->ctx);
......@@ -562,7 +564,13 @@ static void addConnToPool(void* pool, SCliConn* conn) {
assert(plist != NULL);
QUEUE_INIT(&conn->q);
QUEUE_PUSH(&plist->conn, &conn->q);
assert(!QUEUE_IS_EMPTY(&plist->conn));
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
arg->param1 = conn;
arg->param2 = thrd;
conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
}
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
SCliConn* conn = handle->data;
......@@ -631,6 +639,8 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
transRemoveExHandle(transGetRefMgt(), conn->refId);
conn->refId = -1;
if (conn->task != NULL) transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
if (clear) {
if (!uv_is_closing((uv_handle_t*)conn->stream)) {
uv_read_stop(conn->stream);
......@@ -997,6 +1007,8 @@ static SCliThrd* createThrdObj() {
pThrd->pool = createConnPool(4);
transDQCreate(pThrd->loop, &pThrd->delayQueue);
transDQCreate(pThrd->loop, &pThrd->timeoutQueue);
pThrd->quit = false;
return pThrd;
}
......@@ -1012,6 +1024,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
transAsyncPoolDestroy(pThrd->asyncPool);
transDQDestroy(pThrd->delayQueue, destroyCmsg);
transDQDestroy(pThrd->timeoutQueue, NULL);
taosMemoryFree(pThrd->loop);
taosMemoryFree(pThrd);
}
......@@ -1058,6 +1071,10 @@ static void doCloseIdleConn(void* param) {
STaskArg* arg = param;
SCliConn* conn = arg->param1;
SCliThrd* pThrd = arg->param2;
tTrace("%s conn %p idle, close it", CONN_GET_INST_LABEL(conn), conn);
conn->task = NULL;
cliDestroyConn(conn, true);
taosMemoryFree(arg);
}
static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
......@@ -1248,11 +1265,17 @@ int transReleaseCliHandle(void* handle) {
if (pThrd == NULL) {
return -1;
}
STransMsg tmsg = {.info.handle = handle};
SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg));
TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64());
SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg));
cmsg->msg = tmsg;
cmsg->type = Release;
STraceId* trace = &tmsg.info.traceId;
tGDebug("send release request at thread:%08" PRId64 "", pThrd->pid);
if (0 != transAsyncSend(pThrd->asyncPool, &cmsg->q)) {
return -1;
}
......
......@@ -480,7 +480,7 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) {
SDelayTask* task = container_of(minNode, SDelayTask, node);
STaskArg* arg = task->arg;
freeFunc(arg->param1);
if (freeFunc) freeFunc(arg->param1);
taosMemoryFree(arg);
taosMemoryFree(task);
......@@ -491,9 +491,16 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) {
void transDQCancel(SDelayQueue* queue, SDelayTask* task) {
uv_timer_stop(queue->timer);
if (heapSize(queue->heap) <= 0) return;
if (heapSize(queue->heap) <= 0) {
taosMemoryFree(task->arg);
taosMemoryFree(task);
return;
}
heapRemove(queue->heap, &task->node);
taosMemoryFree(task->arg);
taosMemoryFree(task);
if (heapSize(queue->heap) != 0) {
HeapNode* minNode = heapMin(queue->heap);
if (minNode != NULL) return;
......
......@@ -149,32 +149,34 @@ static void* transAcceptThread(void* arg);
static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName);
static bool addHandleToAcceptloop(void* arg);
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \
tTrace("conn %p received release request", conn); \
\
STransMsg tmsg = {.code = 0, .info.handle = (void*)conn, .info.ahandle = NULL}; \
SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); \
srvMsg->msg = tmsg; \
srvMsg->type = Release; \
srvMsg->pConn = conn; \
reallocConnRef(conn); \
if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
return; \
} \
if (conn->regArg.init) { \
tTrace("conn %p release, notify server app", conn); \
STrans* pTransInst = conn->pTransInst; \
(*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); \
memset(&conn->regArg, 0, sizeof(conn->regArg)); \
} \
uvStartSendRespInternal(srvMsg); \
return; \
} \
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
reallocConnRef(conn); \
tTrace("conn %p received release request", conn); \
\
STraceId traceId = head->traceId; \
conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \
\
STransMsg tmsg = {.code = 0, .info.handle = (void*)conn, .info.traceId = traceId, .info.ahandle = NULL}; \
SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); \
srvMsg->msg = tmsg; \
srvMsg->type = Release; \
srvMsg->pConn = conn; \
if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
return; \
} \
if (conn->regArg.init) { \
tTrace("conn %p release, notify server app", conn); \
STrans* pTransInst = conn->pTransInst; \
(*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); \
memset(&conn->regArg, 0, sizeof(conn->regArg)); \
} \
uvStartSendRespInternal(srvMsg); \
return; \
} \
} while (0)
#define SRV_RELEASE_UV(loop) \
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册