提交 bcd6df15 编写于 作者: dengyihao's avatar dengyihao

refactor rpc

上级 906e0e78
...@@ -52,7 +52,14 @@ typedef struct SClientObj { ...@@ -52,7 +52,14 @@ typedef struct SClientObj {
SCliThrdObj** pThreadObj; SCliThrdObj** pThreadObj;
} SClientObj; } SClientObj;
typedef struct SConnList {
queue conn;
} SConnList;
// conn pool // conn pool
// add expire timeout and capacity limit
static void* connCacheCreate(int size);
static void* connCacheDestroy(void* cache);
static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port); static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port);
static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn); static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn);
...@@ -81,6 +88,53 @@ static void clientProcessData(SCliConn* conn) { ...@@ -81,6 +88,53 @@ static void clientProcessData(SCliConn* conn) {
} }
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void* connCacheCreate(int size) {
SHashObj* cache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
return false;
}
static void* connCacheDestroy(void* cache) {
SConnList* connList = taosHashIterate((SHashObj*)cache, NULL);
while (!QUEUE_IS_EMPTY(&connList->conn)) {
queue* h = QUEUE_HEAD(&connList->conn);
QUEUE_REMOVE(h);
SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
clientConnDestroy(c);
}
taosHashClear(cache);
}
static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) {
char key[128] = {0};
tstrncpy(key, ip, strlen(ip));
tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
SHashObj* pCache = cache;
SConnList* plist = taosHashGet(pCache, key, strlen(key));
if (plist == NULL) {
SConnList list;
plist = &list;
QUEUE_INIT(&plist->conn);
taosHashPut(pCache, key, strlen(key), plist, sizeof(*plist));
}
if (QUEUE_IS_EMPTY(&plist->conn)) {
return NULL;
}
queue* h = QUEUE_HEAD(&plist->conn);
QUEUE_REMOVE(h);
return QUEUE_DATA(h, SCliConn, conn);
}
static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn) {
char key[128] = {0};
tstrncpy(key, ip, strlen(ip));
tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
SHashObj* pCache = cache;
SConnList* plist = taosHashGet(pCache, key, strlen(key));
// list already create before
assert(plist != NULL);
QUEUE_PUSH(&plist->conn, &conn->conn);
}
static bool clientReadComplete(SConnBuffer* data) { static bool clientReadComplete(SConnBuffer* data) {
STransMsgHead head; STransMsgHead head;
int32_t headLen = sizeof(head); int32_t headLen = sizeof(head);
...@@ -206,15 +260,6 @@ static void clientConnCb(uv_connect_t* req, int status) { ...@@ -206,15 +260,6 @@ static void clientConnCb(uv_connect_t* req, int status) {
clientWrite(pConn); clientWrite(pConn);
} }
static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) {
// impl later
return NULL;
}
static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn) {
// impl later
}
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
uint64_t et = taosGetTimestampUs(); uint64_t et = taosGetTimestampUs();
uint64_t el = et - pMsg->st; uint64_t el = et - pMsg->st;
......
...@@ -210,8 +210,8 @@ static int uvAuthMsg(SConn* pConn, char* msg, int len) { ...@@ -210,8 +210,8 @@ static int uvAuthMsg(SConn* pConn, char* msg, int len) {
// refers specifically to query or insert timeout // refers specifically to query or insert timeout
static void uvHandleActivityTimeout(uv_timer_t* handle) { static void uvHandleActivityTimeout(uv_timer_t* handle) {
// impl later
SConn* conn = handle->data; SConn* conn = handle->data;
tDebug("%p timeout since no activity", conn);
} }
static void uvProcessData(SConn* pConn) { static void uvProcessData(SConn* pConn) {
...@@ -232,12 +232,13 @@ static void uvProcessData(SConn* pConn) { ...@@ -232,12 +232,13 @@ static void uvProcessData(SConn* pConn) {
SRpcInfo* pRpc = (SRpcInfo*)p->shandle; SRpcInfo* pRpc = (SRpcInfo*)p->shandle;
// auth here // auth here
// auth should not do in rpc thread
int8_t code = uvAuthMsg(pConn, (char*)pHead, p->msgLen); // int8_t code = uvAuthMsg(pConn, (char*)pHead, p->msgLen);
if (code != 0) { // if (code != 0) {
terrno = code; // terrno = code;
return; // return;
} //}
pHead->code = htonl(pHead->code); pHead->code = htonl(pHead->code);
int32_t dlen = 0; int32_t dlen = 0;
...@@ -248,7 +249,7 @@ static void uvProcessData(SConn* pConn) { ...@@ -248,7 +249,7 @@ static void uvProcessData(SConn* pConn) {
} else { } else {
// impl later // impl later
} }
rpcMsg.contLen = rpcContLenFromMsg(pHead->msgLen); rpcMsg.contLen = transContLenFromMsg(pHead->msgLen);
rpcMsg.pCont = pHead->content; rpcMsg.pCont = pHead->content;
rpcMsg.msgType = pHead->msgType; rpcMsg.msgType = pHead->msgType;
rpcMsg.code = pHead->code; rpcMsg.code = pHead->code;
...@@ -318,6 +319,9 @@ void uvWorkerAsyncCb(uv_async_t* handle) { ...@@ -318,6 +319,9 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
return; return;
} }
uv_buf_t wb = uv_buf_init(conn->writeBuf.buf, conn->writeBuf.len); uv_buf_t wb = uv_buf_init(conn->writeBuf.buf, conn->writeBuf.len);
uv_timer_stop(conn->pTimer);
uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb); uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb);
} }
} }
......
...@@ -40,6 +40,7 @@ static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -40,6 +40,7 @@ static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
if (pEpSet) pInfo->epSet = *pEpSet; if (pEpSet) pInfo->epSet = *pEpSet;
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
// tsem_post(&pInfo->rspSem);
tsem_post(&pInfo->rspSem); tsem_post(&pInfo->rspSem);
} }
...@@ -60,6 +61,7 @@ static void *sendRequest(void *param) { ...@@ -60,6 +61,7 @@ static void *sendRequest(void *param) {
// tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); // tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL); rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
// tsem_wait(&pInfo->rspSem);
tsem_wait(&pInfo->rspSem); tsem_wait(&pInfo->rspSem);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册