提交 b65bbcfd 编写于 作者: dengyihao's avatar dengyihao

refactor rpc

上级 5136d644
......@@ -30,7 +30,8 @@ void* rpcOpen(const SRpcInit* pInit) {
tstrncpy(pRpc->label, pInit->label, strlen(pInit->label));
}
pRpc->cfp = pInit->cfp;
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
// pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
pRpc->numOfThreads = pInit->numOfThreads;
pRpc->connType = pInit->connType;
pRpc->idleTime = pInit->idleTime;
pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
......
......@@ -139,18 +139,16 @@ static void clientHandleResp(SCliConn* conn) {
static void clientHandleExcept(SCliConn* pConn) {
if (pConn->data == NULL) {
clientConnDestroy(pConn, true);
return;
}
tDebug("conn %p destroy", pConn);
SCliMsg* pMsg = pConn->data;
transFreeMsg((pMsg->msg.pCont));
pMsg->msg.pCont = NULL;
STransConnCtx* pCtx = pMsg->ctx;
SRpcInfo* pRpc = pCtx->pTransInst;
transFreeMsg((pMsg->msg.pCont));
pMsg->msg.pCont = NULL;
SRpcMsg rpcMsg = {0};
rpcMsg.ahandle = pCtx->ahandle;
rpcMsg.code = -1;
......@@ -254,7 +252,8 @@ static bool clientReadComplete(SConnBuffer* data) {
if (msgLen > data->len) {
data->left = msgLen - data->len;
return false;
} else {
} else if (msgLen == data->len) {
data->left = 0;
return true;
}
} else {
......@@ -321,19 +320,19 @@ static void clientWriteCb(uv_write_t* req, int status) {
SCliConn* pConn = req->data;
if (status == 0) {
tDebug("conn %p data already was written out", pConn);
SCliMsg* pMsg = pConn->data;
if (pMsg != NULL) {
transFreeMsg((pMsg->msg.pCont));
pMsg->msg.pCont = NULL;
}
} else {
tError("conn %p failed to write: %s", pConn, uv_err_name(status));
clientHandleExcept(pConn);
return;
}
SCliThrdObj* pThrd = pConn->hostThrd;
// if (pConn->stream == NULL) {
// pConn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
// uv_tcp_init(pThrd->loop, (uv_tcp_t*)pConn->stream);
// pConn->stream->data = pConn;
//}
uv_read_start((uv_stream_t*)pConn->stream, clientAllocBufferCb, clientReadCb);
// impl later
}
static void clientWrite(SCliConn* pConn) {
......@@ -378,7 +377,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
tDebug("conn %p get from conn pool", conn);
conn->data = pMsg;
conn->writeReq->data = conn;
// transDestroyBuffer(&conn->readBuf);
transDestroyBuffer(&conn->readBuf);
clientWrite(conn);
} else {
SCliConn* conn = calloc(1, sizeof(SCliConn));
......
......@@ -306,6 +306,7 @@ void uvOnTimeoutCb(uv_timer_t* handle) {
void uvOnWriteCb(uv_write_t* req, int status) {
SSrvConn* conn = req->data;
SSrvMsg* smsg = conn->pSrvMsg;
destroySrvMsg(conn);
transClearBuffer(&conn->readBuf);
if (status == 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册