提交 5e8cb50a 编写于 作者: dengyihao's avatar dengyihao

refactor rpc

上级 37dcc3de
...@@ -199,4 +199,6 @@ void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey); ...@@ -199,4 +199,6 @@ void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
bool transCompressMsg(char* msg, int32_t len, int32_t* flen); bool transCompressMsg(char* msg, int32_t len, int32_t* flen);
bool transDecompressMsg(char* msg, int32_t len, int32_t* flen); bool transDecompressMsg(char* msg, int32_t len, int32_t* flen);
void transConnCtxDestroy(STransConnCtx* ctx);
#endif #endif
...@@ -63,6 +63,8 @@ static void clientAsyncCb(uv_async_t* handle); ...@@ -63,6 +63,8 @@ static void clientAsyncCb(uv_async_t* handle);
static void clientDestroy(uv_handle_t* handle); static void clientDestroy(uv_handle_t* handle);
static void clientConnDestroy(SCliConn* pConn); static void clientConnDestroy(SCliConn* pConn);
static void clientMsgDestroy(SCliMsg* pMsg);
static void* clientThread(void* arg); static void* clientThread(void* arg);
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
...@@ -235,9 +237,22 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, ...@@ -235,9 +237,22 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
} }
return cli; return cli;
} }
static void clientMsgDestroy(SCliMsg* pMsg) {
// impl later
free(pMsg);
}
void taosCloseClient(void* arg) { void taosCloseClient(void* arg) {
// impl later // impl later
SClientObj* cli = arg; SClientObj* cli = arg;
for (int i = 0; i < cli->numOfThreads; i++) {
SCliThrdObj* pThrd = cli->pThreadObj[i];
pthread_join(pThrd->thread, NULL);
pthread_mutex_destroy(&pThrd->msgMtx);
free(pThrd->loop);
free(pThrd);
}
free(cli->pThreadObj);
free(cli);
} }
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
...@@ -247,19 +262,18 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* ...@@ -247,19 +262,18 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
SRpcInfo* pRpc = (SRpcInfo*)shandle; SRpcInfo* pRpc = (SRpcInfo*)shandle;
int len = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen); int32_t flen = 0;
if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) {
// imp later
}
STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
pCtx->pRpc = (SRpcInfo*)shandle; pCtx->pRpc = (SRpcInfo*)shandle;
pCtx->ahandle = pMsg->ahandle; pCtx->ahandle = pMsg->ahandle;
// pContext->contLen = len;
// pContext->pCont = pMsg->pCont;
pCtx->msgType = pMsg->msgType; pCtx->msgType = pMsg->msgType;
pCtx->ip = strdup(ip); pCtx->ip = strdup(ip);
pCtx->port = port; pCtx->port = port;
// pContext->epSet = *pEpSet;
// pContext->oldInUse = pEpSet->inUse;
assert(pRpc->connType == TAOS_CONN_CLIENT); assert(pRpc->connType == TAOS_CONN_CLIENT);
// atomic or not // atomic or not
......
...@@ -107,6 +107,7 @@ int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { ...@@ -107,6 +107,7 @@ int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
} }
bool transCompressMsg(char* msg, int32_t len, int32_t* flen) { bool transCompressMsg(char* msg, int32_t len, int32_t* flen) {
return false;
// SRpcHead* pHead = rpcHeadFromCont(pCont); // SRpcHead* pHead = rpcHeadFromCont(pCont);
bool succ = false; bool succ = false;
int overhead = sizeof(STransCompMsg); int overhead = sizeof(STransCompMsg);
...@@ -186,4 +187,8 @@ SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead) { ...@@ -186,4 +187,8 @@ SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead) {
return pHead; return pHead;
} }
void transConnCtxDestroy(STransConnCtx* ctx) {
free(ctx->ip);
free(ctx);
}
#endif #endif
...@@ -496,6 +496,7 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, ...@@ -496,6 +496,7 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
for (int i = 0; i < srv->numOfThreads; i++) { for (int i = 0; i < srv->numOfThreads; i++) {
SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj)); SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj));
srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t)); srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
int fds[2]; int fds[2];
if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
...@@ -530,6 +531,17 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, ...@@ -530,6 +531,17 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
void taosCloseServer(void* arg) { void taosCloseServer(void* arg) {
// impl later // impl later
SServerObj* srv = arg; SServerObj* srv = arg;
for (int i = 0; i < srv->numOfThreads; i++) {
SWorkThrdObj* pThrd = srv->pThreadObj[i];
pthread_join(pThrd->thread, NULL);
free(srv->pipe[i]);
free(pThrd->loop);
free(pThrd);
}
free(srv->loop);
free(srv->pipe);
free(srv->pThreadObj);
pthread_join(srv->thread, NULL);
} }
void rpcSendResponse(const SRpcMsg* pMsg) { void rpcSendResponse(const SRpcMsg* pMsg) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册