提交 3b5f9215 编写于 作者: dengyihao's avatar dengyihao

refactor rpc

上级 4cfe9a48
...@@ -202,6 +202,8 @@ bool transDecompressMsg(char* msg, int32_t len, int32_t* flen); ...@@ -202,6 +202,8 @@ bool transDecompressMsg(char* msg, int32_t len, int32_t* flen);
void transConnCtxDestroy(STransConnCtx* ctx); void transConnCtxDestroy(STransConnCtx* ctx);
void transFreeMsg(void* msg); void transFreeMsg(void* msg);
//
typedef struct SConnBuffer { typedef struct SConnBuffer {
char* buf; char* buf;
int len; int len;
...@@ -209,4 +211,9 @@ typedef struct SConnBuffer { ...@@ -209,4 +211,9 @@ typedef struct SConnBuffer {
int left; int left;
} SConnBuffer; } SConnBuffer;
int transInitBuffer(SConnBuffer* buf);
int transClearBuffer(SConnBuffer* buf);
int transDestroyBuffer(SConnBuffer* buf);
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf);
#endif #endif
...@@ -55,7 +55,13 @@ void* rpcMallocCont(int contLen) { ...@@ -55,7 +55,13 @@ void* rpcMallocCont(int contLen) {
} }
return start + sizeof(STransMsgHead); return start + sizeof(STransMsgHead);
} }
void rpcFreeCont(void* cont) { return; } void rpcFreeCont(void* cont) {
// impl
if (cont == NULL) {
return;
}
free((char*)cont - TRANS_MSG_OVERHEAD);
}
void* rpcReallocCont(void* ptr, int contLen) { return NULL; } void* rpcReallocCont(void* ptr, int contLen) { return NULL; }
void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {} void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {}
......
...@@ -112,16 +112,20 @@ static void clientHandleResp(SCliConn* conn) { ...@@ -112,16 +112,20 @@ static void clientHandleResp(SCliConn* conn) {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
rpcMsg.contLen = transContLenFromMsg(pHead->msgLen); rpcMsg.contLen = transContLenFromMsg(pHead->msgLen);
rpcMsg.pCont = transContFromHead(pHead); rpcMsg.pCont = transContFromHead((char*)pHead);
rpcMsg.code = pHead->code; rpcMsg.code = pHead->code;
rpcMsg.msgType = pHead->msgType; rpcMsg.msgType = pHead->msgType;
rpcMsg.ahandle = pCtx->ahandle; rpcMsg.ahandle = pCtx->ahandle;
tDebug("conn %p handle resp", conn);
(pRpc->cfp)(NULL, &rpcMsg, NULL); (pRpc->cfp)(NULL, &rpcMsg, NULL);
conn->notifyCount += 1; conn->notifyCount += 1;
SCliThrdObj* pThrd = conn->hostThrd; SCliThrdObj* pThrd = conn->hostThrd;
tfree(conn->data); tfree(conn->data);
// buf alread translated to rpcMsg.pCont
transClearBuffer(&conn->readBuf);
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
// start thread's timer of conn pool if not active // start thread's timer of conn pool if not active
...@@ -131,6 +135,11 @@ static void clientHandleResp(SCliConn* conn) { ...@@ -131,6 +135,11 @@ static void clientHandleResp(SCliConn* conn) {
destroyTransConnCtx(pCtx); destroyTransConnCtx(pCtx);
} }
static void clientHandleExcept(SCliConn* pConn) { static void clientHandleExcept(SCliConn* pConn) {
if (pConn->data == NULL) {
clientConnDestroy(pConn, true);
return;
}
tDebug("conn %p destroy", pConn);
SCliMsg* pMsg = pConn->data; SCliMsg* pMsg = pConn->data;
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
...@@ -213,12 +222,17 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { ...@@ -213,12 +222,17 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
} }
queue* h = QUEUE_HEAD(&plist->conn); queue* h = QUEUE_HEAD(&plist->conn);
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
return QUEUE_DATA(h, SCliConn, conn);
SCliConn* conn = QUEUE_DATA(h, SCliConn, conn);
QUEUE_INIT(&conn->conn);
return conn;
} }
static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) { static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
char key[128] = {0}; char key[128] = {0};
tstrncpy(key, ip, strlen(ip)); tstrncpy(key, ip, strlen(ip));
tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
tDebug("conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst; SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
...@@ -245,32 +259,9 @@ static bool clientReadComplete(SConnBuffer* data) { ...@@ -245,32 +259,9 @@ static bool clientReadComplete(SConnBuffer* data) {
} }
} }
static void clientAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { static void clientAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
// impl later
static const int CAPACITY = 512;
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf; SConnBuffer* pBuf = &conn->readBuf;
if (pBuf->cap == 0) { transAllocBuffer(pBuf, buf);
pBuf->buf = (char*)calloc(1, CAPACITY * sizeof(char));
pBuf->len = 0;
pBuf->cap = CAPACITY;
pBuf->left = -1;
buf->base = pBuf->buf;
buf->len = CAPACITY;
} else {
if (pBuf->len >= pBuf->cap) {
if (pBuf->left == -1) {
pBuf->cap *= 2;
pBuf->buf = realloc(pBuf->buf, pBuf->cap);
} else if (pBuf->len + pBuf->left > pBuf->cap) {
pBuf->cap = pBuf->len + pBuf->left;
pBuf->buf = realloc(pBuf->buf, pBuf->cap);
}
}
buf->base = pBuf->buf + pBuf->len;
buf->len = pBuf->cap - pBuf->len;
}
} }
static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
// impl later // impl later
...@@ -288,10 +279,9 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf ...@@ -288,10 +279,9 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
} }
assert(nread <= 0); assert(nread <= 0);
if (nread == 0) { if (nread == 0) {
tError("conn %p closed", conn);
return; return;
} }
if (nread < 0) { if (nread < 0 || nread == UV_EOF) {
tError("conn %p read error: %s", conn, uv_err_name(nread)); tError("conn %p read error: %s", conn, uv_err_name(nread));
clientHandleExcept(conn); clientHandleExcept(conn);
} }
...@@ -300,28 +290,28 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf ...@@ -300,28 +290,28 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
} }
static void clientConnDestroy(SCliConn* conn, bool clear) { static void clientConnDestroy(SCliConn* conn, bool clear) {
tDebug("conn %p destroy", conn); //
tDebug("conn %p remove from conn pool", conn);
QUEUE_REMOVE(&conn->conn);
tDebug("conn %p remove from conn pool successfully", conn);
if (clear) { if (clear) {
uv_close((uv_handle_t*)conn->stream, NULL); uv_close((uv_handle_t*)conn->stream, clientDestroy);
} }
free(conn->stream);
free(conn->readBuf.buf);
free(conn->writeReq);
free(conn);
} }
static void clientDestroy(uv_handle_t* handle) { static void clientDestroy(uv_handle_t* handle) {
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
// QUEUE_REMOVE(&conn->conn); transDestroyBuffer(&conn->readBuf);
clientConnDestroy(conn, false);
free(conn->stream);
free(conn->writeReq);
tDebug("conn %p destroy successfully", conn);
free(conn);
// clientConnDestroy(conn, false);
} }
static void clientWriteCb(uv_write_t* req, int status) { static void clientWriteCb(uv_write_t* req, int status) {
SCliConn* pConn = req->data; SCliConn* pConn = req->data;
SCliMsg* pMsg = pConn->data;
transFreeMsg((pMsg->msg.pCont));
pMsg->msg.pCont = NULL;
if (status == 0) { if (status == 0) {
tDebug("conn %p data already was written out", pConn); tDebug("conn %p data already was written out", pConn);
} else { } else {
...@@ -381,10 +371,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -381,10 +371,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
tDebug("conn %p get from conn pool", conn); tDebug("conn %p get from conn pool", conn);
conn->data = pMsg; conn->data = pMsg;
conn->writeReq->data = conn; conn->writeReq->data = conn;
transDestroyBuffer(&conn->readBuf);
conn->readBuf.len = 0;
memset(conn->readBuf.buf, 0, conn->readBuf.cap);
conn->readBuf.left = -1;
clientWrite(conn); clientWrite(conn);
} else { } else {
SCliConn* conn = calloc(1, sizeof(SCliConn)); SCliConn* conn = calloc(1, sizeof(SCliConn));
...@@ -397,6 +384,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -397,6 +384,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
// write req handle // write req handle
conn->writeReq = malloc(sizeof(uv_write_t)); conn->writeReq = malloc(sizeof(uv_write_t));
conn->writeReq->data = conn; conn->writeReq->data = conn;
QUEUE_INIT(&conn->conn); QUEUE_INIT(&conn->conn);
conn->connReq.data = conn; conn->connReq.data = conn;
......
...@@ -198,4 +198,51 @@ void transFreeMsg(void* msg) { ...@@ -198,4 +198,51 @@ void transFreeMsg(void* msg) {
} }
free((char*)msg - sizeof(STransMsgHead)); free((char*)msg - sizeof(STransMsgHead));
} }
int transInitBuffer(SConnBuffer* buf) {
transClearBuffer(buf);
return 0;
}
int transClearBuffer(SConnBuffer* buf) {
memset(buf, 0, sizeof(*buf));
return 0;
}
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
/*
* formate of data buffer:
* |<--------------------------data from socket------------------------------->|
* |<------STransMsgHead------->|<-------------------other data--------------->|
*/
static const int CAPACITY = 1024;
SConnBuffer* p = connBuf;
if (p->cap == 0) {
p->buf = (char*)calloc(CAPACITY, sizeof(char));
p->len = 0;
p->cap = CAPACITY;
p->left = -1;
uvBuf->base = p->buf;
uvBuf->len = CAPACITY;
} else {
if (p->len >= p->cap) {
if (p->left == -1) {
p->cap *= 2;
p->buf = realloc(p->buf, p->cap);
} else if (p->len + p->left > p->cap) {
p->cap = p->len + p->left;
p->buf = realloc(p->buf, p->len + p->left);
}
}
uvBuf->base = p->buf + p->len;
uvBuf->len = p->cap - p->len;
}
return 0;
}
int transDestroyBuffer(SConnBuffer* buf) {
if (buf->cap > 0) {
tfree(buf->buf);
}
transClearBuffer(buf);
}
#endif #endif
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
#include "transComm.h" #include "transComm.h"
typedef struct SConn { typedef struct SSrvConn {
uv_tcp_t* pTcp; uv_tcp_t* pTcp;
uv_write_t* pWriter; uv_write_t* pWriter;
uv_timer_t* pTimer; uv_timer_t* pTimer;
...@@ -26,13 +26,14 @@ typedef struct SConn { ...@@ -26,13 +26,14 @@ typedef struct SConn {
queue queue; queue queue;
int ref; int ref;
int persist; // persist connection or not int persist; // persist connection or not
SConnBuffer connBuf; // read buf, SConnBuffer readBuf; // read buf,
int inType; int inType;
void* pTransInst; // rpc init void* pTransInst; // rpc init
void* ahandle; // void* ahandle; //
void* hostThrd; void* hostThrd;
void* pSrvMsg;
SRpcMsg sendMsg; // SRpcMsg sendMsg;
// del later // del later
char secured; char secured;
int spi; int spi;
...@@ -40,7 +41,13 @@ typedef struct SConn { ...@@ -40,7 +41,13 @@ typedef struct SConn {
char user[TSDB_UNI_LEN]; // user ID for the link char user[TSDB_UNI_LEN]; // user ID for the link
char secret[TSDB_PASSWORD_LEN]; char secret[TSDB_PASSWORD_LEN];
char ckey[TSDB_PASSWORD_LEN]; // ciphering key char ckey[TSDB_PASSWORD_LEN]; // ciphering key
} SConn; } SSrvConn;
typedef struct SSrvMsg {
SSrvConn* pConn;
SRpcMsg msg;
queue q;
} SSrvMsg;
typedef struct SWorkThrdObj { typedef struct SWorkThrdObj {
pthread_t thread; pthread_t thread;
...@@ -48,8 +55,8 @@ typedef struct SWorkThrdObj { ...@@ -48,8 +55,8 @@ typedef struct SWorkThrdObj {
int fd; int fd;
uv_loop_t* loop; uv_loop_t* loop;
uv_async_t* workerAsync; // uv_async_t* workerAsync; //
queue conn; queue msg;
pthread_mutex_t connMtx; pthread_mutex_t msgMtx;
void* pTransInst; void* pTransInst;
} SWorkThrdObj; } SWorkThrdObj;
...@@ -68,9 +75,9 @@ typedef struct SServerObj { ...@@ -68,9 +75,9 @@ typedef struct SServerObj {
static const char* notify = "a"; static const char* notify = "a";
// refactor later // refactor later
static int transAddAuthPart(SConn* pConn, char* msg, int msgLen); static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen);
static int uvAuthMsg(SConn* pConn, char* msg, int msgLen); static int uvAuthMsg(SSrvConn* pConn, char* msg, int msgLen);
static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
...@@ -82,12 +89,13 @@ static void uvOnAcceptCb(uv_stream_t* stream, int status); ...@@ -82,12 +89,13 @@ static void uvOnAcceptCb(uv_stream_t* stream, int status);
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
static void uvWorkerAsyncCb(uv_async_t* handle); static void uvWorkerAsyncCb(uv_async_t* handle);
static void uvPrepareSendData(SConn* conn, uv_buf_t* wb); static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSrvMsg* msg);
static void destroySrvMsg(SSrvConn* conn);
// check whether already read complete packet // check whether already read complete packet
static bool readComplete(SConnBuffer* buf); static bool readComplete(SConnBuffer* buf);
static SConn* createConn(); static SSrvConn* createConn();
static void destroyConn(SConn* conn, bool clear /*clear handle or not*/); static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/);
static void uvDestroyConn(uv_handle_t* handle); static void uvDestroyConn(uv_handle_t* handle);
...@@ -105,31 +113,9 @@ void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b ...@@ -105,31 +113,9 @@ void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
* |<--------------------------data from socket------------------------------->| * |<--------------------------data from socket------------------------------->|
* |<------STransMsgHead------->|<-------------------other data--------------->| * |<------STransMsgHead------->|<-------------------other data--------------->|
*/ */
static const int CAPACITY = 1024; SSrvConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf;
SConn* conn = handle->data; transAllocBuffer(pBuf, buf);
SConnBuffer* pBuf = &conn->connBuf;
if (pBuf->cap == 0) {
pBuf->buf = (char*)calloc(CAPACITY, sizeof(char));
pBuf->len = 0;
pBuf->cap = CAPACITY;
pBuf->left = -1;
buf->base = pBuf->buf;
buf->len = CAPACITY;
} else {
if (pBuf->len >= pBuf->cap) {
if (pBuf->left == -1) {
pBuf->cap *= 2;
pBuf->buf = realloc(pBuf->buf, pBuf->cap);
} else if (pBuf->len + pBuf->left > pBuf->cap) {
pBuf->cap = pBuf->len + pBuf->left;
pBuf->buf = realloc(pBuf->buf, pBuf->len + pBuf->left);
}
}
buf->base = pBuf->buf + pBuf->len;
buf->len = pBuf->cap - pBuf->len;
}
} }
// check data read from socket completely or not // check data read from socket completely or not
...@@ -159,7 +145,7 @@ static bool readComplete(SConnBuffer* data) { ...@@ -159,7 +145,7 @@ static bool readComplete(SConnBuffer* data) {
// // impl later // // impl later
// STransMsgHead* pHead = (STransMsgHead*)pRecv->msg; // STransMsgHead* pHead = (STransMsgHead*)pRecv->msg;
// SRpcInfo* pRpc = (SRpcInfo*)pRecv->shandle; // SRpcInfo* pRpc = (SRpcInfo*)pRecv->shandle;
// SConn* pConn = pRecv->thandle; // SSrvConn* pConn = pRecv->thandle;
// tDump(pRecv->msg, pRecv->msgLen); // tDump(pRecv->msg, pRecv->msgLen);
// terrno = 0; // terrno = 0;
// // SRpcReqContext* pContest; // // SRpcReqContext* pContest;
...@@ -167,7 +153,7 @@ static bool readComplete(SConnBuffer* data) { ...@@ -167,7 +153,7 @@ static bool readComplete(SConnBuffer* data) {
// // do auth and check // // do auth and check
//} //}
static int uvAuthMsg(SConn* pConn, char* msg, int len) { static int uvAuthMsg(SSrvConn* pConn, char* msg, int len) {
STransMsgHead* pHead = (STransMsgHead*)msg; STransMsgHead* pHead = (STransMsgHead*)msg;
int code = 0; int code = 0;
...@@ -222,14 +208,14 @@ static int uvAuthMsg(SConn* pConn, char* msg, int len) { ...@@ -222,14 +208,14 @@ static int uvAuthMsg(SConn* pConn, char* msg, int len) {
// refers specifically to query or insert timeout // refers specifically to query or insert timeout
static void uvHandleActivityTimeout(uv_timer_t* handle) { static void uvHandleActivityTimeout(uv_timer_t* handle) {
SConn* conn = handle->data; SSrvConn* conn = handle->data;
tDebug("%p timeout since no activity", conn); tDebug("%p timeout since no activity", conn);
} }
static void uvHandleReq(SConn* pConn) { static void uvHandleReq(SSrvConn* pConn) {
SRecvInfo info; SRecvInfo info;
SRecvInfo* p = &info; SRecvInfo* p = &info;
SConnBuffer* pBuf = &pConn->connBuf; SConnBuffer* pBuf = &pConn->readBuf;
p->msg = pBuf->buf; p->msg = pBuf->buf;
p->msgLen = pBuf->len; p->msgLen = pBuf->len;
p->ip = 0; p->ip = 0;
...@@ -255,7 +241,6 @@ static void uvHandleReq(SConn* pConn) { ...@@ -255,7 +241,6 @@ static void uvHandleReq(SConn* pConn) {
pHead->code = htonl(pHead->code); pHead->code = htonl(pHead->code);
int32_t dlen = 0; int32_t dlen = 0;
SRpcMsg rpcMsg;
if (transDecompressMsg(NULL, 0, NULL)) { if (transDecompressMsg(NULL, 0, NULL)) {
// add compress later // add compress later
// pHead = rpcDecompressRpcMsg(pHead); // pHead = rpcDecompressRpcMsg(pHead);
...@@ -264,6 +249,8 @@ static void uvHandleReq(SConn* pConn) { ...@@ -264,6 +249,8 @@ static void uvHandleReq(SConn* pConn) {
// impl later // impl later
// //
} }
SRpcMsg rpcMsg;
rpcMsg.contLen = transContLenFromMsg(pHead->msgLen); rpcMsg.contLen = transContLenFromMsg(pHead->msgLen);
rpcMsg.pCont = pHead->content; rpcMsg.pCont = pHead->content;
rpcMsg.msgType = pHead->msgType; rpcMsg.msgType = pHead->msgType;
...@@ -271,6 +258,7 @@ static void uvHandleReq(SConn* pConn) { ...@@ -271,6 +258,7 @@ static void uvHandleReq(SConn* pConn) {
rpcMsg.ahandle = NULL; rpcMsg.ahandle = NULL;
rpcMsg.handle = pConn; rpcMsg.handle = pConn;
transClearBuffer(&pConn->readBuf);
pConn->ref++; pConn->ref++;
(*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL); (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
// uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); // uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
...@@ -280,8 +268,8 @@ static void uvHandleReq(SConn* pConn) { ...@@ -280,8 +268,8 @@ static void uvHandleReq(SConn* pConn) {
void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
// opt // opt
SConn* conn = cli->data; SSrvConn* conn = cli->data;
SConnBuffer* pBuf = &conn->connBuf; SConnBuffer* pBuf = &conn->readBuf;
if (nread > 0) { if (nread > 0) {
pBuf->len += nread; pBuf->len += nread;
tDebug("conn %p read summroy, total read: %d, current read: %d", conn, pBuf->len, (int)nread); tDebug("conn %p read summroy, total read: %d, current read: %d", conn, pBuf->len, (int)nread);
...@@ -294,11 +282,12 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { ...@@ -294,11 +282,12 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
return; return;
} }
if (nread == 0) { if (nread == 0) {
tDebug("conn %p except read", conn);
// destroyConn(conn, true);
return; return;
} }
if (nread != UV_EOF) { if (nread < 0 || nread != UV_EOF) {
if (conn->ref > 1) {
conn->ref++; // ref > 1 signed that write is in progress
}
tDebug("conn %p read error: %s", conn, uv_err_name(nread)); tDebug("conn %p read error: %s", conn, uv_err_name(nread));
destroyConn(conn, true); destroyConn(conn, true);
} }
...@@ -310,25 +299,20 @@ void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b ...@@ -310,25 +299,20 @@ void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
void uvOnTimeoutCb(uv_timer_t* handle) { void uvOnTimeoutCb(uv_timer_t* handle) {
// opt // opt
SConn* pConn = handle->data; SSrvConn* pConn = handle->data;
tDebug("conn %p time out", pConn); tDebug("conn %p time out", pConn);
} }
void uvOnWriteCb(uv_write_t* req, int status) { void uvOnWriteCb(uv_write_t* req, int status) {
SConn* conn = req->data; SSrvConn* conn = req->data;
SSrvMsg* smsg = conn->pSrvMsg;
SConnBuffer* buf = &conn->connBuf;
buf->len = 0;
memset(buf->buf, 0, buf->cap);
buf->left = -1;
SRpcMsg* pMsg = &conn->sendMsg;
transFreeMsg(pMsg->pCont);
transClearBuffer(&conn->readBuf);
if (status == 0) { if (status == 0) {
tDebug("conn %p data already was written on stream", conn); tDebug("conn %p data already was written on stream", conn);
} else { } else {
tDebug("conn %p failed to write data, %s", conn, uv_err_name(status)); tDebug("conn %p failed to write data, %s", conn, uv_err_name(status));
//
destroyConn(conn, true); destroyConn(conn, true);
} }
// opt // opt
...@@ -341,16 +325,16 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) { ...@@ -341,16 +325,16 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
} }
} }
static void uvPrepareSendData(SConn* conn, uv_buf_t* wb) { static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
// impl later; // impl later;
tDebug("conn %p prepare to send resp", conn); tDebug("conn %p prepare to send resp", smsg->pConn);
SRpcMsg* pMsg = &conn->sendMsg; SRpcMsg* pMsg = &smsg->msg;
if (pMsg->pCont == 0) { if (pMsg->pCont == 0) {
pMsg->pCont = (void*)rpcMallocCont(0); pMsg->pCont = (void*)rpcMallocCont(0);
pMsg->contLen = 0; pMsg->contLen = 0;
} }
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
pHead->msgType = conn->inType + 1; pHead->msgType = smsg->pConn->inType + 1;
// add more info // add more info
char* msg = (char*)pHead; char* msg = (char*)pHead;
int32_t len = transMsgLenFromCont(pMsg->contLen); int32_t len = transMsgLenFromCont(pMsg->contLen);
...@@ -361,28 +345,55 @@ static void uvPrepareSendData(SConn* conn, uv_buf_t* wb) { ...@@ -361,28 +345,55 @@ static void uvPrepareSendData(SConn* conn, uv_buf_t* wb) {
wb->base = msg; wb->base = msg;
wb->len = len; wb->len = len;
} }
static void uvStartSendResp(SSrvMsg* smsg) {
// impl
uv_buf_t wb;
uvPrepareSendData(smsg, &wb);
SSrvConn* pConn = smsg->pConn;
uv_timer_stop(pConn->pTimer);
pConn->pSrvMsg = smsg;
// conn->pWriter->data = smsg;
uv_write(pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnWriteCb);
// SRpcMsg* rpcMsg = smsg->msg;
return;
}
static void destroySrvMsg(SSrvConn* conn) {
SSrvMsg* smsg = conn->pSrvMsg;
if (smsg == NULL) {
return;
}
transFreeMsg(smsg->msg.pCont);
free(conn->pSrvMsg);
conn->pSrvMsg = NULL;
}
void uvWorkerAsyncCb(uv_async_t* handle) { void uvWorkerAsyncCb(uv_async_t* handle) {
SWorkThrdObj* pThrd = handle->data; SWorkThrdObj* pThrd = handle->data;
SConn* conn = NULL; SSrvConn* conn = NULL;
queue wq; queue wq;
// batch process to avoid to lock/unlock frequently // batch process to avoid to lock/unlock frequently
pthread_mutex_lock(&pThrd->connMtx); pthread_mutex_lock(&pThrd->msgMtx);
QUEUE_MOVE(&pThrd->conn, &wq); QUEUE_MOVE(&pThrd->msg, &wq);
pthread_mutex_unlock(&pThrd->connMtx); pthread_mutex_unlock(&pThrd->msgMtx);
while (!QUEUE_IS_EMPTY(&wq)) { while (!QUEUE_IS_EMPTY(&wq)) {
queue* head = QUEUE_HEAD(&wq); queue* head = QUEUE_HEAD(&wq);
QUEUE_REMOVE(head); QUEUE_REMOVE(head);
SConn* conn = QUEUE_DATA(head, SConn, queue);
if (conn == NULL) { SSrvMsg* msg = QUEUE_DATA(head, SSrvMsg, q);
tError("except occurred, do nothing"); if (msg == NULL) {
return; tError("except occurred, continue");
continue;
} }
uv_buf_t wb; uvStartSendResp(msg);
uvPrepareSendData(conn, &wb); // uv_buf_t wb;
uv_timer_stop(conn->pTimer); // uvPrepareSendData(msg, &wb);
// uv_timer_stop(conn->pTimer);
uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb); // uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb);
} }
} }
...@@ -435,7 +446,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { ...@@ -435,7 +446,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
uv_handle_type pending = uv_pipe_pending_type(pipe); uv_handle_type pending = uv_pipe_pending_type(pipe);
assert(pending == UV_TCP); assert(pending == UV_TCP);
SConn* pConn = createConn(); SSrvConn* pConn = createConn();
pConn->pTransInst = pThrd->pTransInst; pConn->pTransInst = pThrd->pTransInst;
/* init conn timer*/ /* init conn timer*/
...@@ -484,8 +495,8 @@ static bool addHandleToWorkloop(void* arg) { ...@@ -484,8 +495,8 @@ static bool addHandleToWorkloop(void* arg) {
pThrd->pipe->data = pThrd; pThrd->pipe->data = pThrd;
QUEUE_INIT(&pThrd->conn); QUEUE_INIT(&pThrd->msg);
pthread_mutex_init(&pThrd->connMtx, NULL); pthread_mutex_init(&pThrd->msgMtx, NULL);
pThrd->workerAsync = malloc(sizeof(uv_async_t)); pThrd->workerAsync = malloc(sizeof(uv_async_t));
uv_async_init(pThrd->loop, pThrd->workerAsync, uvWorkerAsyncCb); uv_async_init(pThrd->loop, pThrd->workerAsync, uvWorkerAsyncCb);
...@@ -523,34 +534,42 @@ void* workerThread(void* arg) { ...@@ -523,34 +534,42 @@ void* workerThread(void* arg) {
uv_run(pThrd->loop, UV_RUN_DEFAULT); uv_run(pThrd->loop, UV_RUN_DEFAULT);
} }
static SConn* createConn() { static SSrvConn* createConn() {
SConn* pConn = (SConn*)calloc(1, sizeof(SConn)); SSrvConn* pConn = (SSrvConn*)calloc(1, sizeof(SSrvConn));
tDebug("conn %p created", pConn);
++pConn->ref; ++pConn->ref;
return pConn; return pConn;
} }
static void destroyConn(SConn* conn, bool clear) { static void destroyConn(SSrvConn* conn, bool clear) {
if (conn == NULL) { if (conn == NULL) {
return; return;
} }
if (--conn->ref == 0) { // SRpcMsg* pMsg = &conn->sendMsg;
// transFreeMsg(pMsg->pCont);
// pMsg->pCont = NULL;
tDebug("conn %p try to destroy", conn);
if (--conn->ref > 0) {
return; return;
} }
transDestroyBuffer(&conn->readBuf);
destroySrvMsg(conn);
if (clear) { if (clear) {
uv_close((uv_handle_t*)conn->pTcp, NULL); uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn);
} }
}
static void uvDestroyConn(uv_handle_t* handle) {
SSrvConn* conn = handle->data;
tDebug("conn %p destroy", conn);
uv_timer_stop(conn->pTimer); uv_timer_stop(conn->pTimer);
free(conn->pTimer); free(conn->pTimer);
free(conn->pTcp); // free(conn->pTcp);
free(conn->connBuf.buf);
free(conn->pWriter); free(conn->pWriter);
free(conn); free(conn);
} }
static void uvDestroyConn(uv_handle_t* handle) { static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen) {
SConn* conn = handle->data;
destroyConn(conn, false);
}
static int transAddAuthPart(SConn* pConn, char* msg, int msgLen) {
STransMsgHead* pHead = (STransMsgHead*)msg; STransMsgHead* pHead = (STransMsgHead*)msg;
if (pConn->spi && pConn->secured == 0) { if (pConn->spi && pConn->secured == 0) {
...@@ -632,6 +651,7 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) { ...@@ -632,6 +651,7 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) {
pthread_join(pThrd->thread, NULL); pthread_join(pThrd->thread, NULL);
// free(srv->pipe[i]); // free(srv->pipe[i]);
free(pThrd->loop); free(pThrd->loop);
pthread_mutex_destroy(&pThrd->msgMtx);
free(pThrd); free(pThrd);
} }
void taosCloseServer(void* arg) { void taosCloseServer(void* arg) {
...@@ -648,17 +668,20 @@ void taosCloseServer(void* arg) { ...@@ -648,17 +668,20 @@ void taosCloseServer(void* arg) {
} }
void rpcSendResponse(const SRpcMsg* pMsg) { void rpcSendResponse(const SRpcMsg* pMsg) {
SConn* pConn = pMsg->handle; SSrvConn* pConn = pMsg->handle;
SWorkThrdObj* pThrd = pConn->hostThrd; SWorkThrdObj* pThrd = pConn->hostThrd;
// opt later SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
pConn->sendMsg = *pMsg; srvMsg->pConn = pConn;
pthread_mutex_lock(&pThrd->connMtx); srvMsg->msg = *pMsg;
QUEUE_PUSH(&pThrd->conn, &pConn->queue);
pthread_mutex_unlock(&pThrd->connMtx); pthread_mutex_lock(&pThrd->msgMtx);
QUEUE_PUSH(&pThrd->msg, &srvMsg->q);
pthread_mutex_unlock(&pThrd->msgMtx);
tDebug("conn %p start to send resp", pConn); tDebug("conn %p start to send resp", pConn);
uv_async_send(pConn->pWorkerAsync); uv_async_send(pThrd->workerAsync);
} }
#endif #endif
...@@ -64,6 +64,7 @@ static void *sendRequest(void *param) { ...@@ -64,6 +64,7 @@ static void *sendRequest(void *param) {
// tsem_wait(&pInfo->rspSem); // tsem_wait(&pInfo->rspSem);
tsem_wait(&pInfo->rspSem); tsem_wait(&pInfo->rspSem);
tDebug("recv response succefully"); tDebug("recv response succefully");
// usleep(100000000); // usleep(100000000);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册