未验证 提交 288f3600 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #10028 from taosdata/feature/trans

refactor rpc
......@@ -95,8 +95,10 @@ static void clientHandleExcept(SCliConn* conn);
// handle req from app
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void clientMsgDestroy(SCliMsg* pMsg);
static void destroyTransConnCtx(STransConnCtx* ctx);
static void destroyUserdata(SRpcMsg* userdata);
static void destroyCmsg(SCliMsg* cmsg);
static void transDestroyConnCtx(STransConnCtx* ctx);
// thread obj
static SCliThrdObj* createThrdObj();
static void destroyThrdObj(SCliThrdObj* pThrd);
......@@ -104,7 +106,8 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
static void* clientThread(void* arg);
static void clientHandleResp(SCliConn* conn) {
STransConnCtx* pCtx = ((SCliMsg*)conn->data)->ctx;
SCliMsg* pMsg = conn->data;
STransConnCtx* pCtx = pMsg->ctx;
SRpcInfo* pRpc = pCtx->pTransInst;
STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf);
......@@ -122,41 +125,44 @@ static void clientHandleResp(SCliConn* conn) {
(pRpc->cfp)(NULL, &rpcMsg, NULL);
conn->notifyCount += 1;
SCliThrdObj* pThrd = conn->hostThrd;
tfree(conn->data);
// buf alread translated to rpcMsg.pCont
// buf's mem alread translated to rpcMsg.pCont
transClearBuffer(&conn->readBuf);
uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb);
SCliThrdObj* pThrd = conn->hostThrd;
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
destroyCmsg(pMsg);
conn->data = NULL;
// start thread's timer of conn pool if not active
if (!uv_is_active((uv_handle_t*)pThrd->pTimer) && pRpc->idleTime > 0) {
uv_timer_start((uv_timer_t*)pThrd->pTimer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
}
destroyTransConnCtx(pCtx);
}
static void clientHandleExcept(SCliConn* pConn) {
if (pConn->data == NULL) {
// handle conn except in conn pool
clientConnDestroy(pConn, true);
return;
}
tDebug("conn %p destroy", pConn);
tDebug("conn %p start to destroy", pConn);
SCliMsg* pMsg = pConn->data;
transFreeMsg((pMsg->msg.pCont));
pMsg->msg.pCont = NULL;
destroyUserdata(&pMsg->msg);
STransConnCtx* pCtx = pMsg->ctx;
SRpcInfo* pRpc = pCtx->pTransInst;
SRpcMsg rpcMsg = {0};
rpcMsg.ahandle = pCtx->ahandle;
rpcMsg.code = -1;
// SRpcInfo* pRpc = pMsg->ctx->pRpc;
(pRpc->cfp)(NULL, &rpcMsg, NULL);
tfree(pConn->data);
(pCtx->pTransInst->cfp)(NULL, &rpcMsg, NULL);
pConn->notifyCount += 1;
destroyTransConnCtx(pCtx);
destroyCmsg(pMsg);
pConn->data = NULL;
// transDestroyConnCtx(pCtx);
clientConnDestroy(pConn, true);
}
......@@ -236,6 +242,7 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
tDebug("conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
conn->notifyCount = 0;
......@@ -282,6 +289,9 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
}
assert(nread <= 0);
if (nread == 0) {
// ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb
// nread might be 0, which does not indicate an error or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under
// read(2).
return;
}
if (nread < 0 || nread == UV_EOF) {
......@@ -321,11 +331,12 @@ static void clientWriteCb(uv_write_t* req, int status) {
if (status == 0) {
tDebug("conn %p data already was written out", pConn);
SCliMsg* pMsg = pConn->data;
if (pMsg != NULL) {
transFreeMsg((pMsg->msg.pCont));
pMsg->msg.pCont = NULL;
if (pMsg == NULL) {
destroy
// handle
return;
}
destroyUserdata(&pMsg->msg);
} else {
tError("conn %p failed to write: %s", pConn, uv_err_name(status));
clientHandleExcept(pConn);
......@@ -453,8 +464,20 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
}
return cli;
}
static void clientMsgDestroy(SCliMsg* pMsg) {
// impl later
static void destroyUserdata(SRpcMsg* userdata) {
if (userdata->pCont == NULL) {
return;
}
transFreeMsg(userdata->pCont);
userdata->pCont = NULL;
}
static void destroyCmsg(SCliMsg* pMsg) {
if (pMsg == NULL) {
return;
}
transDestroyConnCtx(pMsg->ctx);
destroyUserdata(&pMsg->msg);
free(pMsg);
}
static SCliThrdObj* createThrdObj() {
......@@ -487,7 +510,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd) {
free(pThrd);
}
static void destroyTransConnCtx(STransConnCtx* ctx) {
static void transDestroyConnCtx(STransConnCtx* ctx) {
if (ctx != NULL) {
free(ctx->ip);
}
......
......@@ -91,7 +91,7 @@ static void uvWorkerAsyncCb(uv_async_t* handle);
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSrvMsg* msg);
static void destroySrvMsg(SSrvConn* conn);
static void destroySmsg(SSrvMsg* smsg);
// check whether already read complete packet
static bool readComplete(SConnBuffer* buf);
static SSrvConn* createConn();
......@@ -305,8 +305,10 @@ void uvOnTimeoutCb(uv_timer_t* handle) {
void uvOnWriteCb(uv_write_t* req, int status) {
SSrvConn* conn = req->data;
SSrvMsg* smsg = conn->pSrvMsg;
destroySrvMsg(conn);
SSrvMsg* smsg = conn->pSrvMsg;
destroySmsg(smsg);
conn->pSrvMsg = NULL;
transClearBuffer(&conn->readBuf);
if (status == 0) {
......@@ -362,14 +364,12 @@ static void uvStartSendResp(SSrvMsg* smsg) {
return;
}
static void destroySrvMsg(SSrvConn* conn) {
SSrvMsg* smsg = conn->pSrvMsg;
static void destroySmsg(SSrvMsg* smsg) {
if (smsg == NULL) {
return;
}
transFreeMsg(smsg->msg.pCont);
free(conn->pSrvMsg);
conn->pSrvMsg = NULL;
free(smsg);
}
void uvWorkerAsyncCb(uv_async_t* handle) {
SWorkThrdObj* pThrd = handle->data;
......@@ -555,7 +555,8 @@ static void destroyConn(SSrvConn* conn, bool clear) {
return;
}
transDestroyBuffer(&conn->readBuf);
destroySrvMsg(conn);
destroySmsg(conn->pSrvMsg);
conn->pSrvMsg = NULL;
if (clear) {
uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn);
......
......@@ -165,6 +165,7 @@ int main(int argc, char *argv[]) {
tError("failed to start RPC server");
return -1;
}
// sleep(5);
tInfo("RPC server is running, ctrl-c to exit");
......@@ -172,7 +173,6 @@ int main(int argc, char *argv[]) {
dataFd = open(dataName, O_APPEND | O_CREAT | O_WRONLY, S_IRWXU | S_IRWXG | S_IRWXO);
if (dataFd < 0) tInfo("failed to open data file, reason:%s", strerror(errno));
}
qhandle = taosOpenQueue();
qset = taosOpenQset();
taosAddIntoQset(qset, qhandle, NULL);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册