提交 81058010 编写于 作者: dengyihao's avatar dengyihao

refactor rpc

上级 9c3a3c10
......@@ -21,6 +21,7 @@ typedef struct SCliConn {
uv_connect_t connReq;
uv_stream_t* stream;
uv_write_t* writeReq;
void* hostThrd;
SConnBuffer readBuf;
void* data;
queue conn;
......@@ -45,7 +46,7 @@ typedef struct SCliThrdObj {
queue msg;
pthread_mutex_t msgMtx;
uint64_t nextTimeout; // next timeout
void* shandle; //
void* pTransInst; //
} SCliThrdObj;
......@@ -69,7 +70,7 @@ static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn*
// register timer in each thread to clear expire conn
static void clientTimeoutCb(uv_timer_t* handle);
// process data read from server, auth/decompress etc
// process data read from server, auth/decompress etc later
static void clientProcessData(SCliConn* conn);
// check whether already read complete packet from server
static bool clientReadComplete(SConnBuffer* pBuf);
......@@ -91,7 +92,7 @@ static void* clientThread(void* arg);
static void clientProcessData(SCliConn* conn) {
STransConnCtx* pCtx = ((SCliMsg*)conn->data)->ctx;
SRpcInfo* pRpc = pCtx->ahandle;
SRpcInfo* pRpc = pCtx->pRpc;
SRpcMsg rpcMsg;
rpcMsg.pCont = conn->readBuf.buf;
......@@ -104,7 +105,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void clientTimeoutCb(uv_timer_t* handle) {
SCliThrdObj* pThrd = handle->data;
SRpcInfo* pRpc = pThrd->shandle;
SRpcInfo* pRpc = pThrd->pTransInst;
int64_t currentTime = pThrd->nextTimeout;
SConnList* p = taosHashIterate((SHashObj*)pThrd->cache, NULL);
......@@ -127,7 +128,7 @@ static void clientTimeoutCb(uv_timer_t* handle) {
static void* connCacheCreate(int size) {
SHashObj* cache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
return false;
return cache;
static void* connCacheDestroy(void* cache) {
SConnList* connList = taosHashIterate((SHashObj*)cache, NULL);
......@@ -153,8 +154,9 @@ static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) {
if (plist == NULL) {
SConnList list;
plist = &list;
taosHashPut(pCache, key, strlen(key), plist, sizeof(*plist));
plist = taosHashGet(pCache, key, strlen(key));
if (QUEUE_IS_EMPTY(&plist->conn)) {
......@@ -169,8 +171,7 @@ static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn)
tstrncpy(key, ip, strlen(ip));
tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
STransConnCtx* ctx = ((SCliMsg*)conn->data)->ctx;
SRpcInfo* pRpc = ctx->pRpc;
SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
conn->expireTime = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10;
SConnList* plist = taosHashGet((SHashObj*)cache, key, strlen(key));
// list already create before
......@@ -200,10 +201,11 @@ static void clientAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size,
SCliConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf;
if (pBuf->cap == 0) {
pBuf->buf = (char*)calloc(CAPACITY, sizeof(char));
pBuf->buf = (char*)calloc(1, CAPACITY * sizeof(char));
pBuf->len = 0;
pBuf->cap = CAPACITY;
pBuf->left = -1;
buf->base = pBuf->buf;
buf->len = CAPACITY;
} else {
......@@ -213,7 +215,7 @@ static void clientAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size,
pBuf->buf = realloc(pBuf->buf, pBuf->cap);
} else if (pBuf->len + pBuf->left > pBuf->cap) {
pBuf->cap = pBuf->len + pBuf->left;
pBuf->buf = realloc(pBuf->buf, pBuf->len + pBuf->left);
pBuf->buf = realloc(pBuf->buf, pBuf->cap);
buf->base = pBuf->buf + pBuf->len;
......@@ -227,7 +229,7 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
if (nread > 0) {
pBuf->len += nread;
if (clientReadComplete(pBuf)) {
tDebug("alread read complete pack");
tDebug("alread read complete");
} else {
tDebug("read halp packet, continue to read");
......@@ -260,7 +262,12 @@ static void clientWriteCb(uv_write_t* req, int status) {
uv_close((uv_handle_t*)pConn->stream, clientDestroy);
SCliThrdObj* pThrd = pConn->hostThrd;
if (pConn->stream == NULL) {
pConn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
uv_tcp_init(pThrd->loop, (uv_tcp_t*)pConn->stream);
pConn->stream->data = pConn;
uv_read_start((uv_stream_t*)pConn->stream, clientAllocReadBufferCb, clientReadCb);
// impl later
......@@ -270,35 +277,35 @@ static void clientWrite(SCliConn* pConn) {
SRpcMsg* pMsg = (SRpcMsg*)(&pCliMsg->msg);
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
int msgLen = transMsgLenFromCont(pMsg->contLen);
char* msg = (char*)(pHead);
int msgLen = transMsgLenFromCont(pMsg->contLen);
uv_buf_t wb = uv_buf_init(msg, msgLen);
pHead->msgType = pMsg->msgType;
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
tDebug("data write out, msgType : %d, len: %d", pHead->msgType, msgLen);
uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb);
static void clientConnCb(uv_connect_t* req, int status) {
// impl later
SCliConn* pConn = req->data;
if (status != 0) {
tError("failed to connect %s", uv_err_name(status));
SCliMsg* pMsg = pConn->data;
STransConnCtx* pCtx = ((SCliMsg*)(pConn->data))->ctx;
SCliMsg* pMsg = pConn->data;
SRpcMsg rpcMsg;
rpcMsg.ahandle = pCtx->ahandle;
STransConnCtx* pCtx = pMsg->ctx;
SRpcInfo* pRpc = pCtx->pRpc;
if (status != 0) {
// tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status));
tError("failed to connect server, errmsg: %s", uv_strerror(status));
// call user fp later
tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status));
SRpcInfo* pRpc = pMsg->ctx->pRpc;
SRpcMsg rpcMsg;
rpcMsg.ahandle = pCtx->ahandle;
// SRpcInfo* pRpc = pMsg->ctx->pRpc;
(pRpc->cfp)(NULL, &rpcMsg, NULL);
uv_close((uv_handle_t*)req->handle, clientDestroy);
assert(pConn->stream == req->handle);
......@@ -315,17 +322,27 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
// impl later
conn->data = pMsg;
conn->writeReq->data = conn;
conn->readBuf.len = 0;
memset(conn->readBuf.buf, 0, conn->readBuf.cap);
conn->readBuf.left = -1;
} else {
SCliConn* conn = calloc(1, sizeof(SCliConn));
// read/write stream handle
conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
conn->stream->data = conn;
// write req handle
conn->writeReq = malloc(sizeof(uv_write_t));
conn->writeReq->data = conn;
conn->connReq.data = conn;
conn->data = pMsg;
conn->hostThrd = pThrd;
struct sockaddr_in addr;
uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr);
......@@ -359,23 +376,24 @@ static void clientAsyncCb(uv_async_t* handle) {
static void* clientThread(void* arg) {
SCliThrdObj* pThrd = (SCliThrdObj*)arg;
SRpcInfo* pRpc = pThrd->shandle;
pThrd->nextTimeout = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10;
uv_timer_start(pThrd->pTimer, clientTimeoutCb, pRpc->idleTime * 10, 0);
uv_run(pThrd->loop, UV_RUN_DEFAULT);
void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
SClientObj* cli = calloc(1, sizeof(SClientObj));
SRpcInfo* pRpc = shandle;
memcpy(cli->label, label, strlen(label));
cli->numOfThreads = numOfThreads;
cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*));
for (int i = 0; i < cli->numOfThreads; i++) {
SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj));
pthread_mutex_init(&pThrd->msgMtx, NULL);
pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
......@@ -385,8 +403,11 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
pThrd->pTimer = malloc(sizeof(uv_timer_t));
uv_timer_init(pThrd->loop, pThrd->pTimer);
pThrd->pTimer->data = pThrd;
pThrd->nextTimeout = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10;
pThrd->shandle = shandle;
pThrd->cache = connCacheCreate(1);
pThrd->pTransInst = shandle;
int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd));
if (err == 0) {
......@@ -24,13 +24,15 @@ typedef struct SConn {
uv_async_t* pWorkerAsync;
queue queue;
int ref;
int persist; // persist connection or not
SConnBuffer connBuf; // read buf,
SConnBuffer writeBuf; // write buf
int persist; // persist connection or not
SConnBuffer connBuf; // read buf,
int count;
void* shandle; // rpc init
void* ahandle; //
int inType;
void* pTransInst; // rpc init
void* ahandle; //
void* hostThrd;
SRpcMsg sendMsg;
// del later
char secured;
int spi;
......@@ -48,7 +50,7 @@ typedef struct SWorkThrdObj {
uv_async_t* workerAsync; //
queue conn;
pthread_mutex_t connMtx;
void* shandle;
void* pTransInst;
} SWorkThrdObj;
typedef struct SServerObj {
......@@ -66,7 +68,7 @@ typedef struct SServerObj {
static const char* notify = "a";
// refactor later
static int rpcAddAuthPart(SConn* pConn, char* msg, int msgLen);
static int transAddAuthPart(SConn* pConn, char* msg, int msgLen);
static int uvAuthMsg(SConn* pConn, char* msg, int msgLen);
......@@ -75,10 +77,13 @@ static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_b
static void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
static void uvOnTimeoutCb(uv_timer_t* handle);
static void uvOnWriteCb(uv_write_t* req, int status);
static void uvOnPipeWriteCb(uv_write_t* req, int status);
static void uvOnAcceptCb(uv_stream_t* stream, int status);
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
static void uvWorkerAsyncCb(uv_async_t* handle);
static void uvPrepareSendData(SConn* conn, uv_buf_t* wb);
// already read complete packet
static bool readComplete(SConnBuffer* buf);
......@@ -135,25 +140,28 @@ static bool readComplete(SConnBuffer* data) {
if (msgLen > data->len) {
data->left = msgLen - data->len;
return false;
} else {
} else if (msgLen == data->len) {
return true;
} else if (msgLen < data->len) {
return false;
// handle other packet later
} else {
return false;
static void uvDoProcess(SRecvInfo* pRecv) {
// impl later
STransMsgHead* pHead = (STransMsgHead*)pRecv->msg;
SRpcInfo* pRpc = (SRpcInfo*)pRecv->shandle;
SConn* pConn = pRecv->thandle;
tDump(pRecv->msg, pRecv->msgLen);
terrno = 0;
// SRpcReqContext* pContest;
// do auth and check
// static void uvDoProcess(SRecvInfo* pRecv) {
// // impl later
// STransMsgHead* pHead = (STransMsgHead*)pRecv->msg;
// SRpcInfo* pRpc = (SRpcInfo*)pRecv->shandle;
// SConn* pConn = pRecv->thandle;
// tDump(pRecv->msg, pRecv->msgLen);
// terrno = 0;
// // SRpcReqContext* pContest;
// // do auth and check
static int uvAuthMsg(SConn* pConn, char* msg, int len) {
STransMsgHead* pHead = (STransMsgHead*)msg;
......@@ -222,12 +230,13 @@ static void uvProcessData(SConn* pConn) {
p->msgLen = pBuf->len;
p->ip = 0;
p->port = 0;
p->shandle = pConn->shandle; //
p->shandle = pConn->pTransInst; //
p->thandle = pConn;
p->chandle = NULL;
STransMsgHead* pHead = (STransMsgHead*)p->msg;
pConn->inType = pHead->msgType;
SRpcInfo* pRpc = (SRpcInfo*)p->shandle;
......@@ -247,7 +256,9 @@ static void uvProcessData(SConn* pConn) {
// add compress later
// pHead = rpcDecompressRpcMsg(pHead);
} else {
pHead->msgLen = htonl(pHead->msgLen);
// impl later
rpcMsg.contLen = transContLenFromMsg(pHead->msgLen);
rpcMsg.pCont = pHead->content;
......@@ -257,7 +268,7 @@ static void uvProcessData(SConn* pConn) {
rpcMsg.handle = pConn;
(*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime, 0);
uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
// auth
// validate msg type
......@@ -277,8 +288,9 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
if (nread != UV_EOF) {
tDebug("Read error %s\n", uv_err_name(nread));
tDebug("Read error %s", uv_err_name(nread));
tDebug("read error %s", uv_err_name(nread));
uv_close((uv_handle_t*)cli, uvConnDestroy);
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
......@@ -293,16 +305,48 @@ void uvOnTimeoutCb(uv_timer_t* handle) {
void uvOnWriteCb(uv_write_t* req, int status) {
SConn* conn = req->data;
SConnBuffer* buf = &conn->connBuf;
buf->len = 0;
memset(buf->buf, 0, buf->cap);
buf->left = -1;
if (status == 0) {
tDebug("data already was written on stream");
} else {
tDebug("failed to write data, %s", uv_err_name(status));
// opt
static void uvOnPipeWriteCb(uv_write_t* req, int status) {
if (status == 0) {
tDebug("success to dispatch conn to work thread");
} else {
tError("fail to dispatch conn to work thread");
static void uvPrepareSendData(SConn* conn, uv_buf_t* wb) {
// impl later
SRpcMsg* pMsg = &conn->sendMsg;
if (pMsg->pCont == 0) {
pMsg->pCont = (void*)rpcMallocCont(0);
pMsg->contLen = 0;
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
pHead->msgType = conn->inType + 1;
// add more info
char* msg = (char*)pHead;
int32_t len = transMsgLenFromCont(pMsg->contLen);
if (transCompressMsg(msg, len, NULL)) {
// impl later
pHead->msgLen = htonl(len);
wb->base = msg;
wb->len = len;
void uvWorkerAsyncCb(uv_async_t* handle) {
SWorkThrdObj* pThrd = container_of(handle, SWorkThrdObj, workerAsync);
SWorkThrdObj* pThrd = handle->data;
SConn* conn = NULL;
queue wq;
// batch process to avoid to lock/unlock frequently
......@@ -318,8 +362,8 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
tError("except occurred, do nothing");
uv_buf_t wb = uv_buf_init(conn->writeBuf.buf, conn->writeBuf.len);
uv_buf_t wb;
uvPrepareSendData(conn, &wb);
uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb);
......@@ -341,8 +385,9 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));
pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads;
tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnWriteCb);
uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb);
} else {
uv_close((uv_handle_t*)cli, NULL);
......@@ -374,7 +419,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
assert(pending == UV_TCP);
SConn* pConn = connCreate();
pConn->shandle = pThrd->shandle;
pConn->pTransInst = pThrd->pTransInst;
/* init conn timer*/
pConn->pTimer = malloc(sizeof(uv_timer_t));
uv_timer_init(pThrd->loop, pConn->pTimer);
......@@ -398,6 +443,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
tDebug("new connection created: %d", fd);
uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb);
} else {
tDebug("failed to create new connection");
......@@ -418,14 +464,12 @@ void* acceptThread(void* arg) {
uv_run(srv->loop, UV_RUN_DEFAULT);
void* workerThread(void* arg) {
SWorkThrdObj* pThrd = (SWorkThrdObj*)arg;
static void initWorkThrdObj(SWorkThrdObj* pThrd) {
pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
// SRpcInfo* pRpc = pThrd->shandle;
uv_pipe_init(pThrd->loop, pThrd->pipe, 0);
uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
uv_pipe_open(pThrd->pipe, pThrd->fd);
pThrd->pipe->data = pThrd;
......@@ -435,8 +479,12 @@ void* workerThread(void* arg) {
pThrd->workerAsync = malloc(sizeof(uv_async_t));
uv_async_init(pThrd->loop, pThrd->workerAsync, uvWorkerAsyncCb);
pThrd->workerAsync->data = pThrd;
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
void* workerThread(void* arg) {
SWorkThrdObj* pThrd = (SWorkThrdObj*)arg;
uv_run(pThrd->loop, UV_RUN_DEFAULT);
......@@ -444,34 +492,39 @@ static SConn* connCreate() {
SConn* pConn = (SConn*)calloc(1, sizeof(SConn));
return pConn;
static void connCloseCb(uv_handle_t* handle) {
// impl later
static void connDestroy(SConn* conn) {
if (conn == NULL) {
uv_close((uv_handle_t*)conn->pTcp, NULL);
// uv_close((uv_handle_t*)conn->pTcp, connCloseCb);
// free(conn);
// handle
static void uvConnDestroy(uv_handle_t* handle) {
SConn* conn = handle->data;
static int rpcAddAuthPart(SConn* pConn, char* msg, int msgLen) {
SRpcHead* pHead = (SRpcHead*)msg;
static int transAddAuthPart(SConn* pConn, char* msg, int msgLen) {
STransMsgHead* pHead = (STransMsgHead*)msg;
if (pConn->spi && pConn->secured == 0) {
// add auth part
pHead->spi = pConn->spi;
SRpcDigest* pDigest = (SRpcDigest*)(msg + msgLen);
STransDigestMsg* pDigest = (STransDigestMsg*)(msg + msgLen);
pDigest->timeStamp = htonl(taosGetTimestampSec());
msgLen += sizeof(SRpcDigest);
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
rpcBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
// transBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
// transBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
} else {
pHead->spi = 0;
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
......@@ -502,9 +555,11 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write
thrd->shandle = shandle;
thrd->pTransInst = shandle;
thrd->fd = fds[0];
thrd->pipe = &(srv->pipe[i][1]); // init read
int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd));
if (err == 0) {
tDebug("sucess to create worker-thread %d", i);
......@@ -547,6 +602,7 @@ void rpcSendResponse(const SRpcMsg* pMsg) {
SWorkThrdObj* pThrd = pConn->hostThrd;
// opt later
pConn->sendMsg = *pMsg;
QUEUE_PUSH(&pThrd->conn, &pConn->queue);
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册