提交 affa8a7e 编写于 作者: dengyihao's avatar dengyihao

refactor rpc

上级 f577c418
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
#include "transComm.h" #include "transComm.h"
#define CONN_PERSIST_TIME(para) (para * 1000 * 100)
typedef struct SCliConn { typedef struct SCliConn {
uv_connect_t connReq; uv_connect_t connReq;
uv_stream_t* stream; uv_stream_t* stream;
...@@ -102,6 +104,9 @@ static void clientProcessData(SCliConn* conn) { ...@@ -102,6 +104,9 @@ static void clientProcessData(SCliConn* conn) {
SCliThrdObj* pThrd = conn->hostThrd; SCliThrdObj* pThrd = conn->hostThrd;
addConnToCache(pThrd->cache, pCtx->ip, pCtx->port, conn); addConnToCache(pThrd->cache, pCtx->ip, pCtx->port, conn);
if (!uv_is_active((uv_handle_t*)pThrd->pTimer) && pRpc->idleTime > 0) {
uv_timer_start((uv_timer_t*)pThrd->pTimer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
}
free(pCtx->ip); free(pCtx->ip);
free(pCtx); free(pCtx);
// impl // impl
...@@ -112,6 +117,7 @@ static void clientTimeoutCb(uv_timer_t* handle) { ...@@ -112,6 +117,7 @@ static void clientTimeoutCb(uv_timer_t* handle) {
SCliThrdObj* pThrd = handle->data; SCliThrdObj* pThrd = handle->data;
SRpcInfo* pRpc = pThrd->pTransInst; SRpcInfo* pRpc = pThrd->pTransInst;
int64_t currentTime = pThrd->nextTimeout; int64_t currentTime = pThrd->nextTimeout;
tDebug("timeout, try to remove expire conn from connCache");
SConnList* p = taosHashIterate((SHashObj*)pThrd->cache, NULL); SConnList* p = taosHashIterate((SHashObj*)pThrd->cache, NULL);
while (p != NULL) { while (p != NULL) {
...@@ -128,8 +134,8 @@ static void clientTimeoutCb(uv_timer_t* handle) { ...@@ -128,8 +134,8 @@ static void clientTimeoutCb(uv_timer_t* handle) {
p = taosHashIterate((SHashObj*)pThrd->cache, p); p = taosHashIterate((SHashObj*)pThrd->cache, p);
} }
pThrd->nextTimeout = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10; pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
uv_timer_start(handle, clientTimeoutCb, pRpc->idleTime * 10, 0); uv_timer_start(handle, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
} }
static void* connCacheCreate(int size) { static void* connCacheCreate(int size) {
SHashObj* cache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); SHashObj* cache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
...@@ -158,8 +164,7 @@ static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) { ...@@ -158,8 +164,7 @@ static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) {
SConnList* plist = taosHashGet(pCache, key, strlen(key)); SConnList* plist = taosHashGet(pCache, key, strlen(key));
if (plist == NULL) { if (plist == NULL) {
SConnList list; SConnList list;
plist = &list; taosHashPut(pCache, key, strlen(key), (void*)&list, sizeof(list));
taosHashPut(pCache, key, strlen(key), plist, sizeof(*plist));
plist = taosHashGet(pCache, key, strlen(key)); plist = taosHashGet(pCache, key, strlen(key));
QUEUE_INIT(&plist->conn); QUEUE_INIT(&plist->conn);
} }
...@@ -177,7 +182,7 @@ static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn) ...@@ -177,7 +182,7 @@ static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn)
tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst; SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
conn->expireTime = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10; conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
SConnList* plist = taosHashGet((SHashObj*)cache, key, strlen(key)); SConnList* plist = taosHashGet((SHashObj*)cache, key, strlen(key));
// list already create before // list already create before
assert(plist != NULL); assert(plist != NULL);
...@@ -374,14 +379,13 @@ static void clientAsyncCb(uv_async_t* handle) { ...@@ -374,14 +379,13 @@ static void clientAsyncCb(uv_async_t* handle) {
clientHandleReq(pMsg, pThrd); clientHandleReq(pMsg, pThrd);
count++; count++;
if (count >= 2) { if (count >= 2) {
tError("send batch size: %d", count); tDebug("send batch size: %d", count);
} }
} }
} }
static void* clientThread(void* arg) { static void* clientThread(void* arg) {
SCliThrdObj* pThrd = (SCliThrdObj*)arg; SCliThrdObj* pThrd = (SCliThrdObj*)arg;
uv_run(pThrd->loop, UV_RUN_DEFAULT); uv_run(pThrd->loop, UV_RUN_DEFAULT);
} }
...@@ -409,8 +413,8 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, ...@@ -409,8 +413,8 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
pThrd->pTimer = malloc(sizeof(uv_timer_t)); pThrd->pTimer = malloc(sizeof(uv_timer_t));
uv_timer_init(pThrd->loop, pThrd->pTimer); uv_timer_init(pThrd->loop, pThrd->pTimer);
pThrd->pTimer->data = pThrd; pThrd->pTimer->data = pThrd;
pThrd->nextTimeout = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10;
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
pThrd->cache = connCacheCreate(1); pThrd->cache = connCacheCreate(1);
pThrd->pTransInst = shandle; pThrd->pTransInst = shandle;
...@@ -426,16 +430,19 @@ static void clientMsgDestroy(SCliMsg* pMsg) { ...@@ -426,16 +430,19 @@ static void clientMsgDestroy(SCliMsg* pMsg) {
// impl later // impl later
free(pMsg); free(pMsg);
} }
static void destroyThrdObj(SCliThrdObj* pThrd) {
pthread_join(pThrd->thread, NULL);
pthread_mutex_destroy(&pThrd->msgMtx);
free(pThrd->cliAsync);
free(pThrd->loop);
free(pThrd);
}
//
void taosCloseClient(void* arg) { void taosCloseClient(void* arg) {
// impl later // impl later
SClientObj* cli = arg; SClientObj* cli = arg;
for (int i = 0; i < cli->numOfThreads; i++) { for (int i = 0; i < cli->numOfThreads; i++) {
SCliThrdObj* pThrd = cli->pThreadObj[i]; destroyThrdObj(cli->pThreadObj[i]);
pthread_join(pThrd->thread, NULL);
pthread_mutex_destroy(&pThrd->msgMtx);
free(pThrd->cliAsync);
free(pThrd->loop);
free(pThrd);
} }
free(cli->pThreadObj); free(cli->pThreadObj);
free(cli); free(cli);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册