未验证 提交 157623f0 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #10019 from taosdata/feature/trans

refactor rpc
......@@ -202,6 +202,8 @@ bool transDecompressMsg(char* msg, int32_t len, int32_t* flen);
void transConnCtxDestroy(STransConnCtx* ctx);
void transFreeMsg(void* msg);
//
typedef struct SConnBuffer {
char* buf;
int len;
......@@ -209,4 +211,9 @@ typedef struct SConnBuffer {
int left;
} SConnBuffer;
int transInitBuffer(SConnBuffer* buf);
int transClearBuffer(SConnBuffer* buf);
int transDestroyBuffer(SConnBuffer* buf);
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf);
#endif
......@@ -30,7 +30,8 @@ void* rpcOpen(const SRpcInit* pInit) {
tstrncpy(pRpc->label, pInit->label, strlen(pInit->label));
}
pRpc->cfp = pInit->cfp;
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
// pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
pRpc->numOfThreads = pInit->numOfThreads;
pRpc->connType = pInit->connType;
pRpc->idleTime = pInit->idleTime;
pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
......@@ -55,7 +56,13 @@ void* rpcMallocCont(int contLen) {
}
return start + sizeof(STransMsgHead);
}
void rpcFreeCont(void* cont) { return; }
void rpcFreeCont(void* cont) {
// impl
if (cont == NULL) {
return;
}
free((char*)cont - TRANS_MSG_OVERHEAD);
}
void* rpcReallocCont(void* ptr, int contLen) { return NULL; }
void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {}
......
......@@ -31,6 +31,7 @@ typedef struct SCliConn {
char secured;
uint64_t expireTime;
int8_t notifyCount; // timers already notify to client
int32_t ref;
} SCliConn;
typedef struct SCliMsg {
......@@ -112,16 +113,21 @@ static void clientHandleResp(SCliConn* conn) {
SRpcMsg rpcMsg;
rpcMsg.contLen = transContLenFromMsg(pHead->msgLen);
rpcMsg.pCont = transContFromHead(pHead);
rpcMsg.pCont = transContFromHead((char*)pHead);
rpcMsg.code = pHead->code;
rpcMsg.msgType = pHead->msgType;
rpcMsg.ahandle = pCtx->ahandle;
tDebug("conn %p handle resp", conn);
(pRpc->cfp)(NULL, &rpcMsg, NULL);
conn->notifyCount += 1;
SCliThrdObj* pThrd = conn->hostThrd;
tfree(conn->data);
// buf alread translated to rpcMsg.pCont
transClearBuffer(&conn->readBuf);
uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb);
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
// start thread's timer of conn pool if not active
......@@ -131,14 +137,18 @@ static void clientHandleResp(SCliConn* conn) {
destroyTransConnCtx(pCtx);
}
static void clientHandleExcept(SCliConn* pConn) {
if (pConn->data == NULL) {
clientConnDestroy(pConn, true);
return;
}
tDebug("conn %p destroy", pConn);
SCliMsg* pMsg = pConn->data;
transFreeMsg((pMsg->msg.pCont));
pMsg->msg.pCont = NULL;
STransConnCtx* pCtx = pMsg->ctx;
SRpcInfo* pRpc = pCtx->pTransInst;
transFreeMsg((pMsg->msg.pCont));
pMsg->msg.pCont = NULL;
SRpcMsg rpcMsg = {0};
rpcMsg.ahandle = pCtx->ahandle;
rpcMsg.code = -1;
......@@ -213,12 +223,17 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
}
queue* h = QUEUE_HEAD(&plist->conn);
QUEUE_REMOVE(h);
return QUEUE_DATA(h, SCliConn, conn);
SCliConn* conn = QUEUE_DATA(h, SCliConn, conn);
QUEUE_INIT(&conn->conn);
return conn;
}
static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
char key[128] = {0};
tstrncpy(key, ip, strlen(ip));
tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
tDebug("conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
......@@ -237,40 +252,18 @@ static bool clientReadComplete(SConnBuffer* data) {
if (msgLen > data->len) {
data->left = msgLen - data->len;
return false;
} else {
} else if (msgLen == data->len) {
data->left = 0;
return true;
}
} else {
return false;
}
}
static void clientAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
// impl later
static const int CAPACITY = 512;
static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
SCliConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf;
if (pBuf->cap == 0) {
pBuf->buf = (char*)calloc(1, CAPACITY * sizeof(char));
pBuf->len = 0;
pBuf->cap = CAPACITY;
pBuf->left = -1;
buf->base = pBuf->buf;
buf->len = CAPACITY;
} else {
if (pBuf->len >= pBuf->cap) {
if (pBuf->left == -1) {
pBuf->cap *= 2;
pBuf->buf = realloc(pBuf->buf, pBuf->cap);
} else if (pBuf->len + pBuf->left > pBuf->cap) {
pBuf->cap = pBuf->len + pBuf->left;
pBuf->buf = realloc(pBuf->buf, pBuf->cap);
}
}
buf->base = pBuf->buf + pBuf->len;
buf->len = pBuf->cap - pBuf->len;
}
transAllocBuffer(pBuf, buf);
}
static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
// impl later
......@@ -279,6 +272,7 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
if (nread > 0) {
pBuf->len += nread;
if (clientReadComplete(pBuf)) {
uv_read_stop((uv_stream_t*)conn->stream);
tDebug("conn %p read complete", conn);
clientHandleResp(conn);
} else {
......@@ -288,10 +282,9 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
}
assert(nread <= 0);
if (nread == 0) {
tError("conn %p closed", conn);
return;
}
if (nread < 0) {
if (nread < 0 || nread == UV_EOF) {
tError("conn %p read error: %s", conn, uv_err_name(nread));
clientHandleExcept(conn);
}
......@@ -300,43 +293,46 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
}
static void clientConnDestroy(SCliConn* conn, bool clear) {
tDebug("conn %p destroy", conn);
if (clear) {
uv_close((uv_handle_t*)conn->stream, NULL);
//
conn->ref--;
if (conn->ref == 0) {
tDebug("conn %p remove from conn pool", conn);
QUEUE_REMOVE(&conn->conn);
tDebug("conn %p remove from conn pool successfully", conn);
if (clear) {
uv_close((uv_handle_t*)conn->stream, clientDestroy);
}
}
free(conn->stream);
free(conn->readBuf.buf);
free(conn->writeReq);
free(conn);
}
static void clientDestroy(uv_handle_t* handle) {
SCliConn* conn = handle->data;
// QUEUE_REMOVE(&conn->conn);
clientConnDestroy(conn, false);
// transDestroyBuffer(&conn->readBuf);
free(conn->stream);
free(conn->writeReq);
tDebug("conn %p destroy successfully", conn);
free(conn);
// clientConnDestroy(conn, false);
}
static void clientWriteCb(uv_write_t* req, int status) {
SCliConn* pConn = req->data;
SCliMsg* pMsg = pConn->data;
transFreeMsg((pMsg->msg.pCont));
pMsg->msg.pCont = NULL;
if (status == 0) {
tDebug("conn %p data already was written out", pConn);
SCliMsg* pMsg = pConn->data;
if (pMsg != NULL) {
transFreeMsg((pMsg->msg.pCont));
pMsg->msg.pCont = NULL;
}
} else {
tError("conn %p failed to write: %s", pConn, uv_err_name(status));
clientHandleExcept(pConn);
return;
}
SCliThrdObj* pThrd = pConn->hostThrd;
// if (pConn->stream == NULL) {
// pConn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
// uv_tcp_init(pThrd->loop, (uv_tcp_t*)pConn->stream);
// pConn->stream->data = pConn;
//}
uv_read_start((uv_stream_t*)pConn->stream, clientAllocReadBufferCb, clientReadCb);
// impl later
uv_read_start((uv_stream_t*)pConn->stream, clientAllocBufferCb, clientReadCb);
}
static void clientWrite(SCliConn* pConn) {
......@@ -381,14 +377,11 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
tDebug("conn %p get from conn pool", conn);
conn->data = pMsg;
conn->writeReq->data = conn;
conn->readBuf.len = 0;
memset(conn->readBuf.buf, 0, conn->readBuf.cap);
conn->readBuf.left = -1;
transDestroyBuffer(&conn->readBuf);
clientWrite(conn);
} else {
SCliConn* conn = calloc(1, sizeof(SCliConn));
conn->ref++;
// read/write stream handle
conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
......@@ -397,6 +390,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
// write req handle
conn->writeReq = malloc(sizeof(uv_write_t));
conn->writeReq->data = conn;
QUEUE_INIT(&conn->conn);
conn->connReq.data = conn;
......
......@@ -198,4 +198,51 @@ void transFreeMsg(void* msg) {
}
free((char*)msg - sizeof(STransMsgHead));
}
int transInitBuffer(SConnBuffer* buf) {
transClearBuffer(buf);
return 0;
}
int transClearBuffer(SConnBuffer* buf) {
memset(buf, 0, sizeof(*buf));
return 0;
}
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
/*
* formate of data buffer:
* |<--------------------------data from socket------------------------------->|
* |<------STransMsgHead------->|<-------------------other data--------------->|
*/
static const int CAPACITY = 1024;
SConnBuffer* p = connBuf;
if (p->cap == 0) {
p->buf = (char*)calloc(CAPACITY, sizeof(char));
p->len = 0;
p->cap = CAPACITY;
p->left = -1;
uvBuf->base = p->buf;
uvBuf->len = CAPACITY;
} else {
if (p->len >= p->cap) {
if (p->left == -1) {
p->cap *= 2;
p->buf = realloc(p->buf, p->cap);
} else if (p->len + p->left > p->cap) {
p->cap = p->len + p->left;
p->buf = realloc(p->buf, p->len + p->left);
}
}
uvBuf->base = p->buf + p->len;
uvBuf->len = p->cap - p->len;
}
return 0;
}
int transDestroyBuffer(SConnBuffer* buf) {
if (buf->cap > 0) {
tfree(buf->buf);
}
transClearBuffer(buf);
}
#endif
......@@ -17,7 +17,7 @@
#include "transComm.h"
typedef struct SConn {
typedef struct SSrvConn {
uv_tcp_t* pTcp;
uv_write_t* pWriter;
uv_timer_t* pTimer;
......@@ -26,13 +26,14 @@ typedef struct SConn {
queue queue;
int ref;
int persist; // persist connection or not
SConnBuffer connBuf; // read buf,
SConnBuffer readBuf; // read buf,
int inType;
void* pTransInst; // rpc init
void* ahandle; //
void* hostThrd;
void* pSrvMsg;
SRpcMsg sendMsg;
// SRpcMsg sendMsg;
// del later
char secured;
int spi;
......@@ -40,7 +41,13 @@ typedef struct SConn {
char user[TSDB_UNI_LEN]; // user ID for the link
char secret[TSDB_PASSWORD_LEN];
char ckey[TSDB_PASSWORD_LEN]; // ciphering key
} SConn;
} SSrvConn;
typedef struct SSrvMsg {
SSrvConn* pConn;
SRpcMsg msg;
queue q;
} SSrvMsg;
typedef struct SWorkThrdObj {
pthread_t thread;
......@@ -48,8 +55,8 @@ typedef struct SWorkThrdObj {
int fd;
uv_loop_t* loop;
uv_async_t* workerAsync; //
queue conn;
pthread_mutex_t connMtx;
queue msg;
pthread_mutex_t msgMtx;
void* pTransInst;
} SWorkThrdObj;
......@@ -68,9 +75,9 @@ typedef struct SServerObj {
static const char* notify = "a";
// refactor later
static int transAddAuthPart(SConn* pConn, char* msg, int msgLen);
static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen);
static int uvAuthMsg(SConn* pConn, char* msg, int msgLen);
static int uvAuthMsg(SSrvConn* pConn, char* msg, int msgLen);
static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
......@@ -82,12 +89,13 @@ static void uvOnAcceptCb(uv_stream_t* stream, int status);
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
static void uvWorkerAsyncCb(uv_async_t* handle);
static void uvPrepareSendData(SConn* conn, uv_buf_t* wb);
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSrvMsg* msg);
static void destroySrvMsg(SSrvConn* conn);
// check whether already read complete packet
static bool readComplete(SConnBuffer* buf);
static SConn* createConn();
static void destroyConn(SConn* conn, bool clear /*clear handle or not*/);
static bool readComplete(SConnBuffer* buf);
static SSrvConn* createConn();
static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/);
static void uvDestroyConn(uv_handle_t* handle);
......@@ -105,31 +113,9 @@ void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
* |<--------------------------data from socket------------------------------->|
* |<------STransMsgHead------->|<-------------------other data--------------->|
*/
static const int CAPACITY = 1024;
SConn* conn = handle->data;
SConnBuffer* pBuf = &conn->connBuf;
if (pBuf->cap == 0) {
pBuf->buf = (char*)calloc(CAPACITY, sizeof(char));
pBuf->len = 0;
pBuf->cap = CAPACITY;
pBuf->left = -1;
buf->base = pBuf->buf;
buf->len = CAPACITY;
} else {
if (pBuf->len >= pBuf->cap) {
if (pBuf->left == -1) {
pBuf->cap *= 2;
pBuf->buf = realloc(pBuf->buf, pBuf->cap);
} else if (pBuf->len + pBuf->left > pBuf->cap) {
pBuf->cap = pBuf->len + pBuf->left;
pBuf->buf = realloc(pBuf->buf, pBuf->len + pBuf->left);
}
}
buf->base = pBuf->buf + pBuf->len;
buf->len = pBuf->cap - pBuf->len;
}
SSrvConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf;
transAllocBuffer(pBuf, buf);
}
// check data read from socket completely or not
......@@ -159,7 +145,7 @@ static bool readComplete(SConnBuffer* data) {
// // impl later
// STransMsgHead* pHead = (STransMsgHead*)pRecv->msg;
// SRpcInfo* pRpc = (SRpcInfo*)pRecv->shandle;
// SConn* pConn = pRecv->thandle;
// SSrvConn* pConn = pRecv->thandle;
// tDump(pRecv->msg, pRecv->msgLen);
// terrno = 0;
// // SRpcReqContext* pContest;
......@@ -167,7 +153,7 @@ static bool readComplete(SConnBuffer* data) {
// // do auth and check
//}
static int uvAuthMsg(SConn* pConn, char* msg, int len) {
static int uvAuthMsg(SSrvConn* pConn, char* msg, int len) {
STransMsgHead* pHead = (STransMsgHead*)msg;
int code = 0;
......@@ -222,14 +208,14 @@ static int uvAuthMsg(SConn* pConn, char* msg, int len) {
// refers specifically to query or insert timeout
static void uvHandleActivityTimeout(uv_timer_t* handle) {
SConn* conn = handle->data;
SSrvConn* conn = handle->data;
tDebug("%p timeout since no activity", conn);
}
static void uvHandleReq(SConn* pConn) {
static void uvHandleReq(SSrvConn* pConn) {
SRecvInfo info;
SRecvInfo* p = &info;
SConnBuffer* pBuf = &pConn->connBuf;
SConnBuffer* pBuf = &pConn->readBuf;
p->msg = pBuf->buf;
p->msgLen = pBuf->len;
p->ip = 0;
......@@ -255,7 +241,6 @@ static void uvHandleReq(SConn* pConn) {
pHead->code = htonl(pHead->code);
int32_t dlen = 0;
SRpcMsg rpcMsg;
if (transDecompressMsg(NULL, 0, NULL)) {
// add compress later
// pHead = rpcDecompressRpcMsg(pHead);
......@@ -264,6 +249,8 @@ static void uvHandleReq(SConn* pConn) {
// impl later
//
}
SRpcMsg rpcMsg;
rpcMsg.contLen = transContLenFromMsg(pHead->msgLen);
rpcMsg.pCont = pHead->content;
rpcMsg.msgType = pHead->msgType;
......@@ -271,6 +258,7 @@ static void uvHandleReq(SConn* pConn) {
rpcMsg.ahandle = NULL;
rpcMsg.handle = pConn;
transClearBuffer(&pConn->readBuf);
pConn->ref++;
(*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
// uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
......@@ -280,8 +268,8 @@ static void uvHandleReq(SConn* pConn) {
void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
// opt
SConn* conn = cli->data;
SConnBuffer* pBuf = &conn->connBuf;
SSrvConn* conn = cli->data;
SConnBuffer* pBuf = &conn->readBuf;
if (nread > 0) {
pBuf->len += nread;
tDebug("conn %p read summroy, total read: %d, current read: %d", conn, pBuf->len, (int)nread);
......@@ -294,11 +282,12 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
return;
}
if (nread == 0) {
tDebug("conn %p except read", conn);
// destroyConn(conn, true);
return;
}
if (nread != UV_EOF) {
if (nread < 0 || nread != UV_EOF) {
if (conn->ref > 1) {
conn->ref++; // ref > 1 signed that write is in progress
}
tDebug("conn %p read error: %s", conn, uv_err_name(nread));
destroyConn(conn, true);
}
......@@ -310,25 +299,21 @@ void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
void uvOnTimeoutCb(uv_timer_t* handle) {
// opt
SConn* pConn = handle->data;
SSrvConn* pConn = handle->data;
tDebug("conn %p time out", pConn);
}
void uvOnWriteCb(uv_write_t* req, int status) {
SConn* conn = req->data;
SConnBuffer* buf = &conn->connBuf;
buf->len = 0;
memset(buf->buf, 0, buf->cap);
buf->left = -1;
SRpcMsg* pMsg = &conn->sendMsg;
transFreeMsg(pMsg->pCont);
SSrvConn* conn = req->data;
SSrvMsg* smsg = conn->pSrvMsg;
destroySrvMsg(conn);
transClearBuffer(&conn->readBuf);
if (status == 0) {
tDebug("conn %p data already was written on stream", conn);
} else {
tDebug("conn %p failed to write data, %s", conn, uv_err_name(status));
//
destroyConn(conn, true);
}
// opt
......@@ -341,16 +326,16 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
}
}
static void uvPrepareSendData(SConn* conn, uv_buf_t* wb) {
static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
// impl later;
tDebug("conn %p prepare to send resp", conn);
SRpcMsg* pMsg = &conn->sendMsg;
tDebug("conn %p prepare to send resp", smsg->pConn);
SRpcMsg* pMsg = &smsg->msg;
if (pMsg->pCont == 0) {
pMsg->pCont = (void*)rpcMallocCont(0);
pMsg->contLen = 0;
}
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
pHead->msgType = conn->inType + 1;
pHead->msgType = smsg->pConn->inType + 1;
// add more info
char* msg = (char*)pHead;
int32_t len = transMsgLenFromCont(pMsg->contLen);
......@@ -361,28 +346,55 @@ static void uvPrepareSendData(SConn* conn, uv_buf_t* wb) {
wb->base = msg;
wb->len = len;
}
static void uvStartSendResp(SSrvMsg* smsg) {
// impl
uv_buf_t wb;
uvPrepareSendData(smsg, &wb);
SSrvConn* pConn = smsg->pConn;
uv_timer_stop(pConn->pTimer);
pConn->pSrvMsg = smsg;
// conn->pWriter->data = smsg;
uv_write(pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnWriteCb);
// SRpcMsg* rpcMsg = smsg->msg;
return;
}
static void destroySrvMsg(SSrvConn* conn) {
SSrvMsg* smsg = conn->pSrvMsg;
if (smsg == NULL) {
return;
}
transFreeMsg(smsg->msg.pCont);
free(conn->pSrvMsg);
conn->pSrvMsg = NULL;
}
void uvWorkerAsyncCb(uv_async_t* handle) {
SWorkThrdObj* pThrd = handle->data;
SConn* conn = NULL;
SSrvConn* conn = NULL;
queue wq;
// batch process to avoid to lock/unlock frequently
pthread_mutex_lock(&pThrd->connMtx);
QUEUE_MOVE(&pThrd->conn, &wq);
pthread_mutex_unlock(&pThrd->connMtx);
pthread_mutex_lock(&pThrd->msgMtx);
QUEUE_MOVE(&pThrd->msg, &wq);
pthread_mutex_unlock(&pThrd->msgMtx);
while (!QUEUE_IS_EMPTY(&wq)) {
queue* head = QUEUE_HEAD(&wq);
QUEUE_REMOVE(head);
SConn* conn = QUEUE_DATA(head, SConn, queue);
if (conn == NULL) {
tError("except occurred, do nothing");
return;
SSrvMsg* msg = QUEUE_DATA(head, SSrvMsg, q);
if (msg == NULL) {
tError("except occurred, continue");
continue;
}
uv_buf_t wb;
uvPrepareSendData(conn, &wb);
uv_timer_stop(conn->pTimer);
uvStartSendResp(msg);
// uv_buf_t wb;
// uvPrepareSendData(msg, &wb);
// uv_timer_stop(conn->pTimer);
uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb);
// uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb);
}
}
......@@ -435,7 +447,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
uv_handle_type pending = uv_pipe_pending_type(pipe);
assert(pending == UV_TCP);
SConn* pConn = createConn();
SSrvConn* pConn = createConn();
pConn->pTransInst = pThrd->pTransInst;
/* init conn timer*/
......@@ -484,8 +496,8 @@ static bool addHandleToWorkloop(void* arg) {
pThrd->pipe->data = pThrd;
QUEUE_INIT(&pThrd->conn);
pthread_mutex_init(&pThrd->connMtx, NULL);
QUEUE_INIT(&pThrd->msg);
pthread_mutex_init(&pThrd->msgMtx, NULL);
pThrd->workerAsync = malloc(sizeof(uv_async_t));
uv_async_init(pThrd->loop, pThrd->workerAsync, uvWorkerAsyncCb);
......@@ -523,34 +535,42 @@ void* workerThread(void* arg) {
uv_run(pThrd->loop, UV_RUN_DEFAULT);
}
static SConn* createConn() {
SConn* pConn = (SConn*)calloc(1, sizeof(SConn));
static SSrvConn* createConn() {
SSrvConn* pConn = (SSrvConn*)calloc(1, sizeof(SSrvConn));
tDebug("conn %p created", pConn);
++pConn->ref;
return pConn;
}
static void destroyConn(SConn* conn, bool clear) {
static void destroyConn(SSrvConn* conn, bool clear) {
if (conn == NULL) {
return;
}
if (--conn->ref == 0) {
// SRpcMsg* pMsg = &conn->sendMsg;
// transFreeMsg(pMsg->pCont);
// pMsg->pCont = NULL;
tDebug("conn %p try to destroy", conn);
if (--conn->ref > 0) {
return;
}
transDestroyBuffer(&conn->readBuf);
destroySrvMsg(conn);
if (clear) {
uv_close((uv_handle_t*)conn->pTcp, NULL);
uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn);
}
}
static void uvDestroyConn(uv_handle_t* handle) {
SSrvConn* conn = handle->data;
tDebug("conn %p destroy", conn);
uv_timer_stop(conn->pTimer);
free(conn->pTimer);
free(conn->pTcp);
free(conn->connBuf.buf);
// free(conn->pTcp);
free(conn->pWriter);
free(conn);
}
static void uvDestroyConn(uv_handle_t* handle) {
SConn* conn = handle->data;
destroyConn(conn, false);
}
static int transAddAuthPart(SConn* pConn, char* msg, int msgLen) {
static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen) {
STransMsgHead* pHead = (STransMsgHead*)msg;
if (pConn->spi && pConn->secured == 0) {
......@@ -632,6 +652,7 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) {
pthread_join(pThrd->thread, NULL);
// free(srv->pipe[i]);
free(pThrd->loop);
pthread_mutex_destroy(&pThrd->msgMtx);
free(pThrd);
}
void taosCloseServer(void* arg) {
......@@ -648,17 +669,20 @@ void taosCloseServer(void* arg) {
}
void rpcSendResponse(const SRpcMsg* pMsg) {
SConn* pConn = pMsg->handle;
SSrvConn* pConn = pMsg->handle;
SWorkThrdObj* pThrd = pConn->hostThrd;
// opt later
pConn->sendMsg = *pMsg;
pthread_mutex_lock(&pThrd->connMtx);
QUEUE_PUSH(&pThrd->conn, &pConn->queue);
pthread_mutex_unlock(&pThrd->connMtx);
SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
srvMsg->pConn = pConn;
srvMsg->msg = *pMsg;
pthread_mutex_lock(&pThrd->msgMtx);
QUEUE_PUSH(&pThrd->msg, &srvMsg->q);
pthread_mutex_unlock(&pThrd->msgMtx);
tDebug("conn %p start to send resp", pConn);
uv_async_send(pConn->pWorkerAsync);
uv_async_send(pThrd->workerAsync);
}
#endif
......@@ -64,6 +64,7 @@ static void *sendRequest(void *param) {
// tsem_wait(&pInfo->rspSem);
tsem_wait(&pInfo->rspSem);
tDebug("recv response succefully");
// usleep(100000000);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册