提交 04679404 编写于 作者: dengyihao's avatar dengyihao

refactor uv code

上级 affa8a7e
...@@ -44,7 +44,7 @@ typedef struct SCliThrdObj { ...@@ -44,7 +44,7 @@ typedef struct SCliThrdObj {
uv_loop_t* loop; uv_loop_t* loop;
uv_async_t* cliAsync; // uv_async_t* cliAsync; //
uv_timer_t* pTimer; uv_timer_t* pTimer;
void* cache; // conn pool void* pool; // conn pool
queue msg; queue msg;
pthread_mutex_t msgMtx; pthread_mutex_t msgMtx;
uint64_t nextTimeout; // next timeout uint64_t nextTimeout; // next timeout
...@@ -65,10 +65,10 @@ typedef struct SConnList { ...@@ -65,10 +65,10 @@ typedef struct SConnList {
// conn pool // conn pool
// add expire timeout and capacity limit // add expire timeout and capacity limit
static void* connCacheCreate(int size); static void* connPoolCreate(int size);
static void* connCacheDestroy(void* cache); static void* connPoolDestroy(void* pool);
static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port); static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn); static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn);
// register timer in each thread to clear expire conn // register timer in each thread to clear expire conn
static void clientTimeoutCb(uv_timer_t* handle); static void clientTimeoutCb(uv_timer_t* handle);
...@@ -90,8 +90,14 @@ static void clientConnDestroy(SCliConn* pConn); ...@@ -90,8 +90,14 @@ static void clientConnDestroy(SCliConn* pConn);
static void clientMsgDestroy(SCliMsg* pMsg); static void clientMsgDestroy(SCliMsg* pMsg);
// thread obj
static SCliThrdObj* createThrdObj();
static void destroyThrdObj(SCliThrdObj* pThrd);
// thread
static void* clientThread(void* arg); static void* clientThread(void* arg);
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void clientProcessData(SCliConn* conn) { static void clientProcessData(SCliConn* conn) {
STransConnCtx* pCtx = ((SCliMsg*)conn->data)->ctx; STransConnCtx* pCtx = ((SCliMsg*)conn->data)->ctx;
SRpcInfo* pRpc = pCtx->pRpc; SRpcInfo* pRpc = pCtx->pRpc;
...@@ -103,7 +109,7 @@ static void clientProcessData(SCliConn* conn) { ...@@ -103,7 +109,7 @@ static void clientProcessData(SCliConn* conn) {
(pRpc->cfp)(NULL, &rpcMsg, NULL); (pRpc->cfp)(NULL, &rpcMsg, NULL);
SCliThrdObj* pThrd = conn->hostThrd; SCliThrdObj* pThrd = conn->hostThrd;
addConnToCache(pThrd->cache, pCtx->ip, pCtx->port, conn); addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
if (!uv_is_active((uv_handle_t*)pThrd->pTimer) && pRpc->idleTime > 0) { if (!uv_is_active((uv_handle_t*)pThrd->pTimer) && pRpc->idleTime > 0) {
uv_timer_start((uv_timer_t*)pThrd->pTimer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); uv_timer_start((uv_timer_t*)pThrd->pTimer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
} }
...@@ -111,15 +117,14 @@ static void clientProcessData(SCliConn* conn) { ...@@ -111,15 +117,14 @@ static void clientProcessData(SCliConn* conn) {
free(pCtx); free(pCtx);
// impl // impl
} }
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void clientTimeoutCb(uv_timer_t* handle) { static void clientTimeoutCb(uv_timer_t* handle) {
SCliThrdObj* pThrd = handle->data; SCliThrdObj* pThrd = handle->data;
SRpcInfo* pRpc = pThrd->pTransInst; SRpcInfo* pRpc = pThrd->pTransInst;
int64_t currentTime = pThrd->nextTimeout; int64_t currentTime = pThrd->nextTimeout;
tDebug("timeout, try to remove expire conn from connCache"); tDebug("timeout, try to remove expire conn from conn pool");
SConnList* p = taosHashIterate((SHashObj*)pThrd->cache, NULL); SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL);
while (p != NULL) { while (p != NULL) {
while (!QUEUE_IS_EMPTY(&p->conn)) { while (!QUEUE_IS_EMPTY(&p->conn)) {
queue* h = QUEUE_HEAD(&p->conn); queue* h = QUEUE_HEAD(&p->conn);
...@@ -131,18 +136,18 @@ static void clientTimeoutCb(uv_timer_t* handle) { ...@@ -131,18 +136,18 @@ static void clientTimeoutCb(uv_timer_t* handle) {
break; break;
} }
} }
p = taosHashIterate((SHashObj*)pThrd->cache, p); p = taosHashIterate((SHashObj*)pThrd->pool, p);
} }
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
uv_timer_start(handle, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); uv_timer_start(handle, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
} }
static void* connCacheCreate(int size) { static void* connPoolCreate(int size) {
SHashObj* cache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); SHashObj* pool = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
return cache; return pool;
} }
static void* connCacheDestroy(void* cache) { static void* connPoolDestroy(void* pool) {
SConnList* connList = taosHashIterate((SHashObj*)cache, NULL); SConnList* connList = taosHashIterate((SHashObj*)pool, NULL);
while (connList != NULL) { while (connList != NULL) {
while (!QUEUE_IS_EMPTY(&connList->conn)) { while (!QUEUE_IS_EMPTY(&connList->conn)) {
queue* h = QUEUE_HEAD(&connList->conn); queue* h = QUEUE_HEAD(&connList->conn);
...@@ -150,22 +155,22 @@ static void* connCacheDestroy(void* cache) { ...@@ -150,22 +155,22 @@ static void* connCacheDestroy(void* cache) {
SCliConn* c = QUEUE_DATA(h, SCliConn, conn); SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
clientConnDestroy(c); clientConnDestroy(c);
} }
connList = taosHashIterate((SHashObj*)cache, connList); connList = taosHashIterate((SHashObj*)pool, connList);
} }
taosHashClear(cache); taosHashClear(pool);
} }
static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) { static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
char key[128] = {0}; char key[128] = {0};
tstrncpy(key, ip, strlen(ip)); tstrncpy(key, ip, strlen(ip));
tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
SHashObj* pCache = cache; SHashObj* pPool = pool;
SConnList* plist = taosHashGet(pCache, key, strlen(key)); SConnList* plist = taosHashGet(pPool, key, strlen(key));
if (plist == NULL) { if (plist == NULL) {
SConnList list; SConnList list;
taosHashPut(pCache, key, strlen(key), (void*)&list, sizeof(list)); taosHashPut(pPool, key, strlen(key), (void*)&list, sizeof(list));
plist = taosHashGet(pCache, key, strlen(key)); plist = taosHashGet(pPool, key, strlen(key));
QUEUE_INIT(&plist->conn); QUEUE_INIT(&plist->conn);
} }
...@@ -176,14 +181,14 @@ static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) { ...@@ -176,14 +181,14 @@ static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) {
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
return QUEUE_DATA(h, SCliConn, conn); return QUEUE_DATA(h, SCliConn, conn);
} }
static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn) { static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
char key[128] = {0}; char key[128] = {0};
tstrncpy(key, ip, strlen(ip)); tstrncpy(key, ip, strlen(ip));
tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst; SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
SConnList* plist = taosHashGet((SHashObj*)cache, key, strlen(key)); SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
// list already create before // list already create before
assert(plist != NULL); assert(plist != NULL);
QUEUE_PUSH(&plist->conn, &conn->conn); QUEUE_PUSH(&plist->conn, &conn->conn);
...@@ -327,7 +332,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -327,7 +332,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
et = taosGetTimestampUs(); et = taosGetTimestampUs();
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
SCliConn* conn = getConnFromCache(pThrd->cache, pCtx->ip, pCtx->port); SCliConn* conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
if (conn != NULL) { if (conn != NULL) {
// impl later // impl later
conn->data = pMsg; conn->data = pMsg;
...@@ -378,9 +383,9 @@ static void clientAsyncCb(uv_async_t* handle) { ...@@ -378,9 +383,9 @@ static void clientAsyncCb(uv_async_t* handle) {
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
clientHandleReq(pMsg, pThrd); clientHandleReq(pMsg, pThrd);
count++; count++;
if (count >= 2) { }
tDebug("send batch size: %d", count); if (count >= 2) {
} tDebug("already process batch size: %d", count);
} }
} }
...@@ -398,24 +403,8 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, ...@@ -398,24 +403,8 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*)); cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*));
for (int i = 0; i < cli->numOfThreads; i++) { for (int i = 0; i < cli->numOfThreads; i++) {
SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj)); SCliThrdObj* pThrd = createThrdObj();
QUEUE_INIT(&pThrd->msg);
pthread_mutex_init(&pThrd->msgMtx, NULL);
pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
uv_loop_init(pThrd->loop);
pThrd->cliAsync = malloc(sizeof(uv_async_t));
uv_async_init(pThrd->loop, pThrd->cliAsync, clientAsyncCb);
pThrd->cliAsync->data = pThrd;
pThrd->pTimer = malloc(sizeof(uv_timer_t));
uv_timer_init(pThrd->loop, pThrd->pTimer);
pThrd->pTimer->data = pThrd;
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
pThrd->cache = connCacheCreate(1);
pThrd->pTransInst = shandle; pThrd->pTransInst = shandle;
int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd)); int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd));
...@@ -430,7 +419,30 @@ static void clientMsgDestroy(SCliMsg* pMsg) { ...@@ -430,7 +419,30 @@ static void clientMsgDestroy(SCliMsg* pMsg) {
// impl later // impl later
free(pMsg); free(pMsg);
} }
static SCliThrdObj* createThrdObj() {
SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj));
QUEUE_INIT(&pThrd->msg);
pthread_mutex_init(&pThrd->msgMtx, NULL);
pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
uv_loop_init(pThrd->loop);
pThrd->cliAsync = malloc(sizeof(uv_async_t));
uv_async_init(pThrd->loop, pThrd->cliAsync, clientAsyncCb);
pThrd->cliAsync->data = pThrd;
pThrd->pTimer = malloc(sizeof(uv_timer_t));
uv_timer_init(pThrd->loop, pThrd->pTimer);
pThrd->pTimer->data = pThrd;
pThrd->pool = connPoolCreate(1);
return pThrd;
}
static void destroyThrdObj(SCliThrdObj* pThrd) { static void destroyThrdObj(SCliThrdObj* pThrd) {
if (pThrd == NULL) {
return;
}
pthread_join(pThrd->thread, NULL); pthread_join(pThrd->thread, NULL);
pthread_mutex_destroy(&pThrd->msgMtx); pthread_mutex_destroy(&pThrd->msgMtx);
free(pThrd->cliAsync); free(pThrd->cliAsync);
......
...@@ -91,10 +91,14 @@ static SConn* connCreate(); ...@@ -91,10 +91,14 @@ static SConn* connCreate();
static void connDestroy(SConn* conn); static void connDestroy(SConn* conn);
static void uvConnDestroy(uv_handle_t* handle); static void uvConnDestroy(uv_handle_t* handle);
// server worke thread // server and worker thread
static void* workerThread(void* arg); static void* workerThread(void* arg);
static void* acceptThread(void* arg); static void* acceptThread(void* arg);
// add handle loop
static bool addHandleToWorkloop(void* arg);
static bool addHandleToAcceptloop(void* arg);
void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
/* /*
* formate of data buffer: * formate of data buffer:
...@@ -268,7 +272,7 @@ static void uvProcessData(SConn* pConn) { ...@@ -268,7 +272,7 @@ static void uvProcessData(SConn* pConn) {
rpcMsg.handle = pConn; rpcMsg.handle = pConn;
(*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL); (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); // uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
// auth // auth
// validate msg type // validate msg type
} }
...@@ -451,22 +455,14 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { ...@@ -451,22 +455,14 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
void* acceptThread(void* arg) { void* acceptThread(void* arg) {
// opt // opt
SServerObj* srv = (SServerObj*)arg; SServerObj* srv = (SServerObj*)arg;
uv_tcp_init(srv->loop, &srv->server);
struct sockaddr_in bind_addr;
uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0);
int err = 0;
if ((err = uv_listen((uv_stream_t*)&srv->server, 128, uvOnAcceptCb)) != 0) {
tError("Listen error %s\n", uv_err_name(err));
return NULL;
}
uv_run(srv->loop, UV_RUN_DEFAULT); uv_run(srv->loop, UV_RUN_DEFAULT);
} }
static void initWorkThrdObj(SWorkThrdObj* pThrd) { static bool addHandleToWorkloop(void* arg) {
SWorkThrdObj* pThrd = arg;
pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
uv_loop_init(pThrd->loop); if (0 != uv_loop_init(pThrd->loop)) {
return false;
}
// SRpcInfo* pRpc = pThrd->shandle; // SRpcInfo* pRpc = pThrd->shandle;
uv_pipe_init(pThrd->loop, pThrd->pipe, 1); uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
...@@ -482,6 +478,31 @@ static void initWorkThrdObj(SWorkThrdObj* pThrd) { ...@@ -482,6 +478,31 @@ static void initWorkThrdObj(SWorkThrdObj* pThrd) {
pThrd->workerAsync->data = pThrd; pThrd->workerAsync->data = pThrd;
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
return true;
}
static bool addHandleToAcceptloop(void* arg) {
// impl later
SServerObj* srv = arg;
int err = 0;
if ((err = uv_tcp_init(srv->loop, &srv->server)) != 0) {
tError("failed to init accept server: %s", uv_err_name(err));
return false;
}
struct sockaddr_in bind_addr;
uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) {
tError("failed to bind: %s", uv_err_name(err));
return false;
}
if ((err = uv_listen((uv_stream_t*)&srv->server, 128, uvOnAcceptCb)) != 0) {
tError("failed to listen: %s", uv_err_name(err));
return false;
}
return true;
} }
void* workerThread(void* arg) { void* workerThread(void* arg) {
SWorkThrdObj* pThrd = (SWorkThrdObj*)arg; SWorkThrdObj* pThrd = (SWorkThrdObj*)arg;
...@@ -546,11 +567,12 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, ...@@ -546,11 +567,12 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
for (int i = 0; i < srv->numOfThreads; i++) { for (int i = 0; i < srv->numOfThreads; i++) {
SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj)); SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj));
srv->pThreadObj[i] = thrd;
srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t)); srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
int fds[2]; int fds[2];
if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
return NULL; goto End;
} }
uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write
...@@ -559,7 +581,9 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, ...@@ -559,7 +581,9 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
thrd->fd = fds[0]; thrd->fd = fds[0];
thrd->pipe = &(srv->pipe[i][1]); // init read thrd->pipe = &(srv->pipe[i][1]); // init read
initWorkThrdObj(thrd); if (false == addHandleToWorkloop(thrd)) {
goto End;
}
int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd)); int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd));
if (err == 0) { if (err == 0) {
tDebug("sucess to create worker-thread %d", i); tDebug("sucess to create worker-thread %d", i);
...@@ -568,9 +592,10 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, ...@@ -568,9 +592,10 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
// TODO: clear all other resource later // TODO: clear all other resource later
tError("failed to create worker-thread %d", i); tError("failed to create worker-thread %d", i);
} }
srv->pThreadObj[i] = thrd;
} }
if (false == addHandleToAcceptloop(srv)) {
goto End;
}
int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv); int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv);
if (err == 0) { if (err == 0) {
tDebug("success to create accept-thread"); tDebug("success to create accept-thread");
...@@ -579,16 +604,25 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, ...@@ -579,16 +604,25 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
} }
return srv; return srv;
End:
taosCloseServer(srv);
return NULL;
}
void destroyWorkThrd(SWorkThrdObj* pThrd) {
if (pThrd == NULL) {
return;
}
pthread_join(pThrd->thread, NULL);
// free(srv->pipe[i]);
free(pThrd->loop);
free(pThrd);
} }
void taosCloseServer(void* arg) { void taosCloseServer(void* arg) {
// impl later // impl later
SServerObj* srv = arg; SServerObj* srv = arg;
for (int i = 0; i < srv->numOfThreads; i++) { for (int i = 0; i < srv->numOfThreads; i++) {
SWorkThrdObj* pThrd = srv->pThreadObj[i]; destroyWorkThrd(srv->pThreadObj[i]);
pthread_join(pThrd->thread, NULL);
free(srv->pipe[i]);
free(pThrd->loop);
free(pThrd);
} }
free(srv->loop); free(srv->loop);
free(srv->pipe); free(srv->pipe);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册