提交 0753087e 编写于 作者: dengyihao's avatar dengyihao

start timer for particular msg

上级 66adfa9e
......@@ -66,12 +66,13 @@ typedef struct SCliThrd {
int64_t pid; // pid
uv_loop_t* loop;
SAsyncPool* asyncPool;
uv_idle_t* idle;
uv_prepare_t* prepare;
uv_timer_t timer;
void* pool; // conn pool
SArray* timerList;
// msg queue
queue msg;
TdThreadMutex msgMtx;
SDelayQueue* delayQueue;
......@@ -333,9 +334,14 @@ void cliHandleResp(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
if (uv_is_active((uv_handle_t*)conn->timer)) {
tDebug("%s conn %p stop timer", CONN_GET_INST_LABEL(conn), conn);
uv_timer_stop(conn->timer);
if (conn->timer) {
if (uv_is_active((uv_handle_t*)conn->timer)) {
tDebug("%s conn %p stop timer", CONN_GET_INST_LABEL(conn), conn);
uv_timer_stop(conn->timer);
}
conn->timer->data = NULL;
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer = NULL;
}
STransMsgHead* pHead = NULL;
......@@ -468,7 +474,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
transUnrefCliHandle(pConn);
}
void cliHandleExcept(SCliConn* conn) {
tTrace("%s conn except ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
tTrace("%s conn %p except ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
cliHandleExceptImpl(conn, -1);
}
......@@ -632,11 +638,6 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
conn->connReq.data = conn;
// set read timeout
conn->timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
uv_timer_init(pThrd->loop, conn->timer);
conn->timer->data = conn;
transReqQueueInit(&conn->wreqQueue);
transQueueInit(&conn->cliMsgs, NULL);
......@@ -653,13 +654,24 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
return conn;
}
static void cliDestroyConn(SCliConn* conn, bool clear) {
SCliThrd* pThrd = conn->hostThrd;
tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
QUEUE_REMOVE(&conn->q);
QUEUE_INIT(&conn->q);
transRemoveExHandle(transGetRefMgt(), conn->refId);
conn->refId = -1;
if (conn->task != NULL) {
transDQCancel(pThrd->timeoutQueue, conn->task);
conn->task = NULL;
}
if (conn->timer != NULL) {
uv_timer_stop(conn->timer);
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer->data = NULL;
conn->timer = NULL;
}
if (clear) {
if (!uv_is_closing((uv_handle_t*)conn->stream)) {
uv_read_stop(conn->stream);
......@@ -671,17 +683,15 @@ static void cliDestroy(uv_handle_t* handle) {
if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) {
return;
}
SCliConn* conn = handle->data;
SCliThrd* pThrd = conn->hostThrd;
if (conn->timer != NULL) {
uv_timer_stop(conn->timer);
taosMemoryFree(conn->timer);
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer->data = NULL;
conn->timer = NULL;
}
if (conn->task != NULL) {
transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
conn->task = NULL;
}
transRemoveExHandle(transGetRefMgt(), conn->refId);
taosMemoryFree(conn->ip);
conn->stream->data = NULL;
......@@ -772,6 +782,15 @@ void cliSend(SCliConn* pConn) {
}
if (pTransInst->startTimer != NULL && pTransInst->startTimer(0, pMsg->msgType)) {
uv_timer_t* timer = taosArrayPop(pThrd->timerList);
if (timer == NULL) {
tDebug("no avaiable timer, create");
timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
uv_timer_init(pThrd->loop, timer);
}
timer->data = pConn;
pConn->timer = timer;
tGTrace("%s conn %p start timer for msg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType));
uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0);
}
......@@ -792,8 +811,8 @@ void cliConnCb(uv_connect_t* req, int status) {
}
// int addrlen = sizeof(pConn->addr);
struct sockaddr peername, sockname;
int addrlen = sizeof(peername);
int addrlen = sizeof(peername);
uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen);
transGetSockDebugInfo(&peername, pConn->dst);
......@@ -817,7 +836,6 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) {
tDebug("cli work thread %p start to quit", pThrd);
destroyCmsg(pMsg);
destroyConnPool(pThrd->pool);
uv_timer_stop(&pThrd->timer);
uv_walk(pThrd->loop, cliWalkCb, NULL);
}
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
......@@ -890,8 +908,8 @@ void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) {
}
}
void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
STransConnCtx* pCtx = pMsg->ctx;
STrans* pTransInst = pThrd->pTransInst;
STransConnCtx* pCtx = pMsg->ctx;
cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr);
if (!EPSET_IS_VALID(&pCtx->epSet)) {
......@@ -977,36 +995,6 @@ static void cliAsyncCb(uv_async_t* handle) {
}
if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd);
}
static void cliIdleCb(uv_idle_t* handle) {
SCliThrd* thrd = handle->data;
tTrace("do idle work");
SAsyncPool* pool = thrd->asyncPool;
for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
SAsyncItem* item = async->data;
queue wq;
taosThreadMutexLock(&item->mtx);
QUEUE_MOVE(&item->qmsg, &wq);
taosThreadMutexUnlock(&item->mtx);
int count = 0;
while (!QUEUE_IS_EMPTY(&wq)) {
queue* h = QUEUE_HEAD(&wq);
QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
if (pMsg == NULL) {
continue;
}
(*cliAsyncHandle[pMsg->type])(pMsg, thrd);
count++;
}
}
tTrace("prepare work end");
if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd);
}
static void cliPrepareCb(uv_prepare_t* handle) {
SCliThrd* thrd = handle->data;
tTrace("prepare work start");
......@@ -1096,19 +1084,20 @@ static SCliThrd* createThrdObj() {
uv_loop_init(pThrd->loop);
pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 5, pThrd, cliAsyncCb);
uv_timer_init(pThrd->loop, &pThrd->timer);
pThrd->timer.data = pThrd;
// pThrd->idle = taosMemoryCalloc(1, sizeof(uv_idle_t));
// uv_idle_init(pThrd->loop, pThrd->idle);
// pThrd->idle->data = pThrd;
// uv_idle_start(pThrd->idle, cliIdleCb);
pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
uv_prepare_init(pThrd->loop, pThrd->prepare);
pThrd->prepare->data = pThrd;
uv_prepare_start(pThrd->prepare, cliPrepareCb);
int32_t timerSize = 512;
pThrd->timerList = taosArrayInit(timerSize, sizeof(void*));
for (int i = 0; i < timerSize; i++) {
uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
uv_timer_init(pThrd->loop, timer);
taosArrayPush(pThrd->timerList, &timer);
}
pThrd->pool = createConnPool(4);
transDQCreate(pThrd->loop, &pThrd->delayQueue);
......@@ -1131,7 +1120,12 @@ static void destroyThrdObj(SCliThrd* pThrd) {
transDQDestroy(pThrd->delayQueue, destroyCmsg);
transDQDestroy(pThrd->timeoutQueue, NULL);
taosMemoryFree(pThrd->idle);
for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) {
uv_timer_t* timer = taosArrayGetP(pThrd->timerList, i);
taosMemoryFree(timer);
}
taosArrayDestroy(pThrd->timerList);
taosMemoryFree(pThrd->prepare);
taosMemoryFree(pThrd->loop);
taosMemoryFree(pThrd);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册