未验证 提交 166e72a2 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #9966 from taosdata/origin/feature/rpc-refactor

refactor rpc
...@@ -21,6 +21,7 @@ typedef struct SCliConn { ...@@ -21,6 +21,7 @@ typedef struct SCliConn {
uv_connect_t connReq; uv_connect_t connReq;
uv_stream_t* stream; uv_stream_t* stream;
uv_write_t* writeReq; uv_write_t* writeReq;
void* hostThrd;
SConnBuffer readBuf; SConnBuffer readBuf;
void* data; void* data;
queue conn; queue conn;
...@@ -45,7 +46,7 @@ typedef struct SCliThrdObj { ...@@ -45,7 +46,7 @@ typedef struct SCliThrdObj {
queue msg; queue msg;
pthread_mutex_t msgMtx; pthread_mutex_t msgMtx;
uint64_t nextTimeout; // next timeout uint64_t nextTimeout; // next timeout
void* shandle; // void* pTransInst; //
} SCliThrdObj; } SCliThrdObj;
...@@ -69,7 +70,7 @@ static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* ...@@ -69,7 +70,7 @@ static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn*
// register timer in each thread to clear expire conn // register timer in each thread to clear expire conn
static void clientTimeoutCb(uv_timer_t* handle); static void clientTimeoutCb(uv_timer_t* handle);
// process data read from server, auth/decompress etc // process data read from server, auth/decompress etc later
static void clientProcessData(SCliConn* conn); static void clientProcessData(SCliConn* conn);
// check whether already read complete packet from server // check whether already read complete packet from server
static bool clientReadComplete(SConnBuffer* pBuf); static bool clientReadComplete(SConnBuffer* pBuf);
...@@ -91,20 +92,25 @@ static void* clientThread(void* arg); ...@@ -91,20 +92,25 @@ static void* clientThread(void* arg);
static void clientProcessData(SCliConn* conn) { static void clientProcessData(SCliConn* conn) {
STransConnCtx* pCtx = ((SCliMsg*)conn->data)->ctx; STransConnCtx* pCtx = ((SCliMsg*)conn->data)->ctx;
SRpcInfo* pRpc = pCtx->ahandle; SRpcInfo* pRpc = pCtx->pRpc;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
rpcMsg.pCont = conn->readBuf.buf; rpcMsg.pCont = conn->readBuf.buf;
rpcMsg.contLen = conn->readBuf.len; rpcMsg.contLen = conn->readBuf.len;
rpcMsg.ahandle = pCtx->ahandle; rpcMsg.ahandle = pCtx->ahandle;
(pRpc->cfp)(NULL, &rpcMsg, NULL); (pRpc->cfp)(NULL, &rpcMsg, NULL);
SCliThrdObj* pThrd = conn->hostThrd;
addConnToCache(pThrd->cache, pCtx->ip, pCtx->port, conn);
free(pCtx->ip);
free(pCtx);
// impl // impl
} }
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void clientTimeoutCb(uv_timer_t* handle) { static void clientTimeoutCb(uv_timer_t* handle) {
SCliThrdObj* pThrd = handle->data; SCliThrdObj* pThrd = handle->data;
SRpcInfo* pRpc = pThrd->shandle; SRpcInfo* pRpc = pThrd->pTransInst;
int64_t currentTime = pThrd->nextTimeout; int64_t currentTime = pThrd->nextTimeout;
SConnList* p = taosHashIterate((SHashObj*)pThrd->cache, NULL); SConnList* p = taosHashIterate((SHashObj*)pThrd->cache, NULL);
...@@ -127,7 +133,7 @@ static void clientTimeoutCb(uv_timer_t* handle) { ...@@ -127,7 +133,7 @@ static void clientTimeoutCb(uv_timer_t* handle) {
} }
static void* connCacheCreate(int size) { static void* connCacheCreate(int size) {
SHashObj* cache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); SHashObj* cache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
return false; return cache;
} }
static void* connCacheDestroy(void* cache) { static void* connCacheDestroy(void* cache) {
SConnList* connList = taosHashIterate((SHashObj*)cache, NULL); SConnList* connList = taosHashIterate((SHashObj*)cache, NULL);
...@@ -153,8 +159,9 @@ static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) { ...@@ -153,8 +159,9 @@ static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) {
if (plist == NULL) { if (plist == NULL) {
SConnList list; SConnList list;
plist = &list; plist = &list;
QUEUE_INIT(&plist->conn);
taosHashPut(pCache, key, strlen(key), plist, sizeof(*plist)); taosHashPut(pCache, key, strlen(key), plist, sizeof(*plist));
plist = taosHashGet(pCache, key, strlen(key));
QUEUE_INIT(&plist->conn);
} }
if (QUEUE_IS_EMPTY(&plist->conn)) { if (QUEUE_IS_EMPTY(&plist->conn)) {
...@@ -169,8 +176,7 @@ static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn) ...@@ -169,8 +176,7 @@ static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn)
tstrncpy(key, ip, strlen(ip)); tstrncpy(key, ip, strlen(ip));
tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
STransConnCtx* ctx = ((SCliMsg*)conn->data)->ctx; SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
SRpcInfo* pRpc = ctx->pRpc;
conn->expireTime = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10; conn->expireTime = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10;
SConnList* plist = taosHashGet((SHashObj*)cache, key, strlen(key)); SConnList* plist = taosHashGet((SHashObj*)cache, key, strlen(key));
// list already create before // list already create before
...@@ -200,10 +206,11 @@ static void clientAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, ...@@ -200,10 +206,11 @@ static void clientAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size,
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf; SConnBuffer* pBuf = &conn->readBuf;
if (pBuf->cap == 0) { if (pBuf->cap == 0) {
pBuf->buf = (char*)calloc(CAPACITY, sizeof(char)); pBuf->buf = (char*)calloc(1, CAPACITY * sizeof(char));
pBuf->len = 0; pBuf->len = 0;
pBuf->cap = CAPACITY; pBuf->cap = CAPACITY;
pBuf->left = -1; pBuf->left = -1;
buf->base = pBuf->buf; buf->base = pBuf->buf;
buf->len = CAPACITY; buf->len = CAPACITY;
} else { } else {
...@@ -213,7 +220,7 @@ static void clientAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, ...@@ -213,7 +220,7 @@ static void clientAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size,
pBuf->buf = realloc(pBuf->buf, pBuf->cap); pBuf->buf = realloc(pBuf->buf, pBuf->cap);
} else if (pBuf->len + pBuf->left > pBuf->cap) { } else if (pBuf->len + pBuf->left > pBuf->cap) {
pBuf->cap = pBuf->len + pBuf->left; pBuf->cap = pBuf->len + pBuf->left;
pBuf->buf = realloc(pBuf->buf, pBuf->len + pBuf->left); pBuf->buf = realloc(pBuf->buf, pBuf->cap);
} }
} }
buf->base = pBuf->buf + pBuf->len; buf->base = pBuf->buf + pBuf->len;
...@@ -227,7 +234,7 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf ...@@ -227,7 +234,7 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
if (nread > 0) { if (nread > 0) {
pBuf->len += nread; pBuf->len += nread;
if (clientReadComplete(pBuf)) { if (clientReadComplete(pBuf)) {
tDebug("alread read complete pack"); tDebug("alread read complete");
clientProcessData(conn); clientProcessData(conn);
} else { } else {
tDebug("read halp packet, continue to read"); tDebug("read halp packet, continue to read");
...@@ -260,7 +267,12 @@ static void clientWriteCb(uv_write_t* req, int status) { ...@@ -260,7 +267,12 @@ static void clientWriteCb(uv_write_t* req, int status) {
uv_close((uv_handle_t*)pConn->stream, clientDestroy); uv_close((uv_handle_t*)pConn->stream, clientDestroy);
return; return;
} }
SCliThrdObj* pThrd = pConn->hostThrd;
if (pConn->stream == NULL) {
pConn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
uv_tcp_init(pThrd->loop, (uv_tcp_t*)pConn->stream);
pConn->stream->data = pConn;
}
uv_read_start((uv_stream_t*)pConn->stream, clientAllocReadBufferCb, clientReadCb); uv_read_start((uv_stream_t*)pConn->stream, clientAllocReadBufferCb, clientReadCb);
// impl later // impl later
} }
...@@ -270,35 +282,35 @@ static void clientWrite(SCliConn* pConn) { ...@@ -270,35 +282,35 @@ static void clientWrite(SCliConn* pConn) {
SRpcMsg* pMsg = (SRpcMsg*)(&pCliMsg->msg); SRpcMsg* pMsg = (SRpcMsg*)(&pCliMsg->msg);
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
int msgLen = transMsgLenFromCont(pMsg->contLen); int msgLen = transMsgLenFromCont(pMsg->contLen);
char* msg = (char*)(pHead);
pHead->msgType = pMsg->msgType;
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
uv_buf_t wb = uv_buf_init(msg, msgLen); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
tDebug("data write out, msgType : %d, len: %d", pHead->msgType, msgLen);
uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb); uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb);
} }
static void clientConnCb(uv_connect_t* req, int status) { static void clientConnCb(uv_connect_t* req, int status) {
// impl later // impl later
SCliConn* pConn = req->data; SCliConn* pConn = req->data;
if (status != 0) { SCliMsg* pMsg = pConn->data;
tError("failed to connect %s", uv_err_name(status));
clientConnDestroy(pConn);
return;
}
SCliMsg* pMsg = pConn->data; STransConnCtx* pCtx = pMsg->ctx;
STransConnCtx* pCtx = ((SCliMsg*)(pConn->data))->ctx; SRpcInfo* pRpc = pCtx->pRpc;
SRpcMsg rpcMsg;
rpcMsg.ahandle = pCtx->ahandle;
if (status != 0) { if (status != 0) {
// tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status));
tError("failed to connect server, errmsg: %s", uv_strerror(status));
// call user fp later // call user fp later
tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status)); SRpcMsg rpcMsg;
SRpcInfo* pRpc = pMsg->ctx->pRpc; rpcMsg.ahandle = pCtx->ahandle;
// SRpcInfo* pRpc = pMsg->ctx->pRpc;
(pRpc->cfp)(NULL, &rpcMsg, NULL); (pRpc->cfp)(NULL, &rpcMsg, NULL);
uv_close((uv_handle_t*)req->handle, clientDestroy); uv_close((uv_handle_t*)req->handle, clientDestroy);
return; return;
} }
assert(pConn->stream == req->handle); assert(pConn->stream == req->handle);
clientWrite(pConn); clientWrite(pConn);
} }
...@@ -315,17 +327,27 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -315,17 +327,27 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
// impl later // impl later
conn->data = pMsg; conn->data = pMsg;
conn->writeReq->data = conn; conn->writeReq->data = conn;
conn->readBuf.len = 0;
memset(conn->readBuf.buf, 0, conn->readBuf.cap);
conn->readBuf.left = -1;
clientWrite(conn); clientWrite(conn);
} else { } else {
SCliConn* conn = calloc(1, sizeof(SCliConn)); SCliConn* conn = calloc(1, sizeof(SCliConn));
// read/write stream handle
conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
conn->stream->data = conn;
// write req handle
conn->writeReq = malloc(sizeof(uv_write_t)); conn->writeReq = malloc(sizeof(uv_write_t));
conn->writeReq->data = conn;
QUEUE_INIT(&conn->conn); QUEUE_INIT(&conn->conn);
conn->connReq.data = conn; conn->connReq.data = conn;
conn->data = pMsg; conn->data = pMsg;
conn->hostThrd = pThrd;
struct sockaddr_in addr; struct sockaddr_in addr;
uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr); uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr);
...@@ -359,23 +381,24 @@ static void clientAsyncCb(uv_async_t* handle) { ...@@ -359,23 +381,24 @@ static void clientAsyncCb(uv_async_t* handle) {
static void* clientThread(void* arg) { static void* clientThread(void* arg) {
SCliThrdObj* pThrd = (SCliThrdObj*)arg; SCliThrdObj* pThrd = (SCliThrdObj*)arg;
SRpcInfo* pRpc = pThrd->shandle;
pThrd->nextTimeout = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10;
uv_timer_start(pThrd->pTimer, clientTimeoutCb, pRpc->idleTime * 10, 0);
uv_run(pThrd->loop, UV_RUN_DEFAULT); uv_run(pThrd->loop, UV_RUN_DEFAULT);
} }
void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
SClientObj* cli = calloc(1, sizeof(SClientObj)); SClientObj* cli = calloc(1, sizeof(SClientObj));
SRpcInfo* pRpc = shandle;
memcpy(cli->label, label, strlen(label)); memcpy(cli->label, label, strlen(label));
cli->numOfThreads = numOfThreads; cli->numOfThreads = numOfThreads;
cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*)); cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*));
for (int i = 0; i < cli->numOfThreads; i++) { for (int i = 0; i < cli->numOfThreads; i++) {
SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj)); SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj));
QUEUE_INIT(&pThrd->msg); QUEUE_INIT(&pThrd->msg);
pthread_mutex_init(&pThrd->msgMtx, NULL); pthread_mutex_init(&pThrd->msgMtx, NULL);
pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
uv_loop_init(pThrd->loop); uv_loop_init(pThrd->loop);
...@@ -385,8 +408,11 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, ...@@ -385,8 +408,11 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
pThrd->pTimer = malloc(sizeof(uv_timer_t)); pThrd->pTimer = malloc(sizeof(uv_timer_t));
uv_timer_init(pThrd->loop, pThrd->pTimer); uv_timer_init(pThrd->loop, pThrd->pTimer);
pThrd->pTimer->data = pThrd;
pThrd->nextTimeout = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10;
pThrd->shandle = shandle; pThrd->cache = connCacheCreate(1);
pThrd->pTransInst = shandle;
int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd)); int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd));
if (err == 0) { if (err == 0) {
......
...@@ -24,13 +24,15 @@ typedef struct SConn { ...@@ -24,13 +24,15 @@ typedef struct SConn {
uv_async_t* pWorkerAsync; uv_async_t* pWorkerAsync;
queue queue; queue queue;
int ref; int ref;
int persist; // persist connection or not int persist; // persist connection or not
SConnBuffer connBuf; // read buf, SConnBuffer connBuf; // read buf,
SConnBuffer writeBuf; // write buf
int count; int count;
void* shandle; // rpc init int inType;
void* ahandle; // void* pTransInst; // rpc init
void* ahandle; //
void* hostThrd; void* hostThrd;
SRpcMsg sendMsg;
// del later // del later
char secured; char secured;
int spi; int spi;
...@@ -48,7 +50,7 @@ typedef struct SWorkThrdObj { ...@@ -48,7 +50,7 @@ typedef struct SWorkThrdObj {
uv_async_t* workerAsync; // uv_async_t* workerAsync; //
queue conn; queue conn;
pthread_mutex_t connMtx; pthread_mutex_t connMtx;
void* shandle; void* pTransInst;
} SWorkThrdObj; } SWorkThrdObj;
typedef struct SServerObj { typedef struct SServerObj {
...@@ -66,7 +68,7 @@ typedef struct SServerObj { ...@@ -66,7 +68,7 @@ typedef struct SServerObj {
static const char* notify = "a"; static const char* notify = "a";
// refactor later // refactor later
static int rpcAddAuthPart(SConn* pConn, char* msg, int msgLen); static int transAddAuthPart(SConn* pConn, char* msg, int msgLen);
static int uvAuthMsg(SConn* pConn, char* msg, int msgLen); static int uvAuthMsg(SConn* pConn, char* msg, int msgLen);
...@@ -75,10 +77,13 @@ static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_b ...@@ -75,10 +77,13 @@ static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_b
static void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); static void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
static void uvOnTimeoutCb(uv_timer_t* handle); static void uvOnTimeoutCb(uv_timer_t* handle);
static void uvOnWriteCb(uv_write_t* req, int status); static void uvOnWriteCb(uv_write_t* req, int status);
static void uvOnPipeWriteCb(uv_write_t* req, int status);
static void uvOnAcceptCb(uv_stream_t* stream, int status); static void uvOnAcceptCb(uv_stream_t* stream, int status);
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
static void uvWorkerAsyncCb(uv_async_t* handle); static void uvWorkerAsyncCb(uv_async_t* handle);
static void uvPrepareSendData(SConn* conn, uv_buf_t* wb);
// already read complete packet // already read complete packet
static bool readComplete(SConnBuffer* buf); static bool readComplete(SConnBuffer* buf);
...@@ -135,25 +140,28 @@ static bool readComplete(SConnBuffer* data) { ...@@ -135,25 +140,28 @@ static bool readComplete(SConnBuffer* data) {
if (msgLen > data->len) { if (msgLen > data->len) {
data->left = msgLen - data->len; data->left = msgLen - data->len;
return false; return false;
} else { } else if (msgLen == data->len) {
return true; return true;
} else if (msgLen < data->len) {
return false;
// handle other packet later
} }
} else { } else {
return false; return false;
} }
} }
static void uvDoProcess(SRecvInfo* pRecv) { // static void uvDoProcess(SRecvInfo* pRecv) {
// impl later // // impl later
STransMsgHead* pHead = (STransMsgHead*)pRecv->msg; // STransMsgHead* pHead = (STransMsgHead*)pRecv->msg;
SRpcInfo* pRpc = (SRpcInfo*)pRecv->shandle; // SRpcInfo* pRpc = (SRpcInfo*)pRecv->shandle;
SConn* pConn = pRecv->thandle; // SConn* pConn = pRecv->thandle;
tDump(pRecv->msg, pRecv->msgLen); // tDump(pRecv->msg, pRecv->msgLen);
terrno = 0; // terrno = 0;
// SRpcReqContext* pContest; // // SRpcReqContext* pContest;
//
// do auth and check // // do auth and check
} //}
static int uvAuthMsg(SConn* pConn, char* msg, int len) { static int uvAuthMsg(SConn* pConn, char* msg, int len) {
STransMsgHead* pHead = (STransMsgHead*)msg; STransMsgHead* pHead = (STransMsgHead*)msg;
...@@ -222,12 +230,13 @@ static void uvProcessData(SConn* pConn) { ...@@ -222,12 +230,13 @@ static void uvProcessData(SConn* pConn) {
p->msgLen = pBuf->len; p->msgLen = pBuf->len;
p->ip = 0; p->ip = 0;
p->port = 0; p->port = 0;
p->shandle = pConn->shandle; // p->shandle = pConn->pTransInst; //
p->thandle = pConn; p->thandle = pConn;
p->chandle = NULL; p->chandle = NULL;
//
STransMsgHead* pHead = (STransMsgHead*)p->msg; STransMsgHead* pHead = (STransMsgHead*)p->msg;
pConn->inType = pHead->msgType;
assert(transIsReq(pHead->msgType)); assert(transIsReq(pHead->msgType));
SRpcInfo* pRpc = (SRpcInfo*)p->shandle; SRpcInfo* pRpc = (SRpcInfo*)p->shandle;
...@@ -247,7 +256,9 @@ static void uvProcessData(SConn* pConn) { ...@@ -247,7 +256,9 @@ static void uvProcessData(SConn* pConn) {
// add compress later // add compress later
// pHead = rpcDecompressRpcMsg(pHead); // pHead = rpcDecompressRpcMsg(pHead);
} else { } else {
pHead->msgLen = htonl(pHead->msgLen);
// impl later // impl later
//
} }
rpcMsg.contLen = transContLenFromMsg(pHead->msgLen); rpcMsg.contLen = transContLenFromMsg(pHead->msgLen);
rpcMsg.pCont = pHead->content; rpcMsg.pCont = pHead->content;
...@@ -257,7 +268,7 @@ static void uvProcessData(SConn* pConn) { ...@@ -257,7 +268,7 @@ static void uvProcessData(SConn* pConn) {
rpcMsg.handle = pConn; rpcMsg.handle = pConn;
(*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL); (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime, 0); uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
// auth // auth
// validate msg type // validate msg type
} }
...@@ -277,8 +288,9 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { ...@@ -277,8 +288,9 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
return; return;
} }
if (nread != UV_EOF) { if (nread != UV_EOF) {
tDebug("Read error %s\n", uv_err_name(nread)); tDebug("Read error %s", uv_err_name(nread));
} }
tDebug("read error %s", uv_err_name(nread));
uv_close((uv_handle_t*)cli, uvConnDestroy); uv_close((uv_handle_t*)cli, uvConnDestroy);
} }
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
...@@ -293,16 +305,48 @@ void uvOnTimeoutCb(uv_timer_t* handle) { ...@@ -293,16 +305,48 @@ void uvOnTimeoutCb(uv_timer_t* handle) {
void uvOnWriteCb(uv_write_t* req, int status) { void uvOnWriteCb(uv_write_t* req, int status) {
SConn* conn = req->data; SConn* conn = req->data;
SConnBuffer* buf = &conn->connBuf;
buf->len = 0;
memset(buf->buf, 0, buf->cap);
buf->left = -1;
if (status == 0) { if (status == 0) {
tDebug("data already was written on stream"); tDebug("data already was written on stream");
} else { } else {
tDebug("failed to write data, %s", uv_err_name(status));
connDestroy(conn); connDestroy(conn);
} }
// opt // opt
} }
static void uvOnPipeWriteCb(uv_write_t* req, int status) {
if (status == 0) {
tDebug("success to dispatch conn to work thread");
} else {
tError("fail to dispatch conn to work thread");
}
}
static void uvPrepareSendData(SConn* conn, uv_buf_t* wb) {
// impl later
SRpcMsg* pMsg = &conn->sendMsg;
if (pMsg->pCont == 0) {
pMsg->pCont = (void*)rpcMallocCont(0);
pMsg->contLen = 0;
}
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
pHead->msgType = conn->inType + 1;
// add more info
char* msg = (char*)pHead;
int32_t len = transMsgLenFromCont(pMsg->contLen);
if (transCompressMsg(msg, len, NULL)) {
// impl later
}
pHead->msgLen = htonl(len);
wb->base = msg;
wb->len = len;
}
void uvWorkerAsyncCb(uv_async_t* handle) { void uvWorkerAsyncCb(uv_async_t* handle) {
SWorkThrdObj* pThrd = container_of(handle, SWorkThrdObj, workerAsync); SWorkThrdObj* pThrd = handle->data;
SConn* conn = NULL; SConn* conn = NULL;
queue wq; queue wq;
// batch process to avoid to lock/unlock frequently // batch process to avoid to lock/unlock frequently
...@@ -318,8 +362,8 @@ void uvWorkerAsyncCb(uv_async_t* handle) { ...@@ -318,8 +362,8 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
tError("except occurred, do nothing"); tError("except occurred, do nothing");
return; return;
} }
uv_buf_t wb = uv_buf_init(conn->writeBuf.buf, conn->writeBuf.len); uv_buf_t wb;
uvPrepareSendData(conn, &wb);
uv_timer_stop(conn->pTimer); uv_timer_stop(conn->pTimer);
uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb); uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb);
...@@ -341,8 +385,9 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { ...@@ -341,8 +385,9 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify)); uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));
pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads; pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads;
tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx); tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnWriteCb); uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb);
} else { } else {
uv_close((uv_handle_t*)cli, NULL); uv_close((uv_handle_t*)cli, NULL);
} }
...@@ -374,7 +419,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { ...@@ -374,7 +419,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
assert(pending == UV_TCP); assert(pending == UV_TCP);
SConn* pConn = connCreate(); SConn* pConn = connCreate();
pConn->shandle = pThrd->shandle; pConn->pTransInst = pThrd->pTransInst;
/* init conn timer*/ /* init conn timer*/
pConn->pTimer = malloc(sizeof(uv_timer_t)); pConn->pTimer = malloc(sizeof(uv_timer_t));
uv_timer_init(pThrd->loop, pConn->pTimer); uv_timer_init(pThrd->loop, pConn->pTimer);
...@@ -398,6 +443,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { ...@@ -398,6 +443,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
tDebug("new connection created: %d", fd); tDebug("new connection created: %d", fd);
uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb); uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb);
} else { } else {
tDebug("failed to create new connection");
connDestroy(pConn); connDestroy(pConn);
} }
} }
...@@ -418,14 +464,12 @@ void* acceptThread(void* arg) { ...@@ -418,14 +464,12 @@ void* acceptThread(void* arg) {
} }
uv_run(srv->loop, UV_RUN_DEFAULT); uv_run(srv->loop, UV_RUN_DEFAULT);
} }
void* workerThread(void* arg) { static void initWorkThrdObj(SWorkThrdObj* pThrd) {
SWorkThrdObj* pThrd = (SWorkThrdObj*)arg;
pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
uv_loop_init(pThrd->loop); uv_loop_init(pThrd->loop);
// SRpcInfo* pRpc = pThrd->shandle; // SRpcInfo* pRpc = pThrd->shandle;
uv_pipe_init(pThrd->loop, pThrd->pipe, 0); uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
uv_pipe_open(pThrd->pipe, pThrd->fd); uv_pipe_open(pThrd->pipe, pThrd->fd);
pThrd->pipe->data = pThrd; pThrd->pipe->data = pThrd;
...@@ -435,8 +479,12 @@ void* workerThread(void* arg) { ...@@ -435,8 +479,12 @@ void* workerThread(void* arg) {
pThrd->workerAsync = malloc(sizeof(uv_async_t)); pThrd->workerAsync = malloc(sizeof(uv_async_t));
uv_async_init(pThrd->loop, pThrd->workerAsync, uvWorkerAsyncCb); uv_async_init(pThrd->loop, pThrd->workerAsync, uvWorkerAsyncCb);
pThrd->workerAsync->data = pThrd;
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
}
void* workerThread(void* arg) {
SWorkThrdObj* pThrd = (SWorkThrdObj*)arg;
uv_run(pThrd->loop, UV_RUN_DEFAULT); uv_run(pThrd->loop, UV_RUN_DEFAULT);
} }
...@@ -444,34 +492,39 @@ static SConn* connCreate() { ...@@ -444,34 +492,39 @@ static SConn* connCreate() {
SConn* pConn = (SConn*)calloc(1, sizeof(SConn)); SConn* pConn = (SConn*)calloc(1, sizeof(SConn));
return pConn; return pConn;
} }
static void connCloseCb(uv_handle_t* handle) {
// impl later
//
}
static void connDestroy(SConn* conn) { static void connDestroy(SConn* conn) {
if (conn == NULL) { if (conn == NULL) {
return; return;
} }
uv_timer_stop(conn->pTimer); uv_timer_stop(conn->pTimer);
free(conn->pTimer); free(conn->pTimer);
uv_close((uv_handle_t*)conn->pTcp, NULL); // uv_close((uv_handle_t*)conn->pTcp, connCloseCb);
free(conn->connBuf.buf);
free(conn->pTcp); free(conn->pTcp);
free(conn->connBuf.buf);
free(conn->pWriter); free(conn->pWriter);
free(conn); // free(conn);
// handle // handle
} }
static void uvConnDestroy(uv_handle_t* handle) { static void uvConnDestroy(uv_handle_t* handle) {
SConn* conn = handle->data; SConn* conn = handle->data;
connDestroy(conn); connDestroy(conn);
} }
static int rpcAddAuthPart(SConn* pConn, char* msg, int msgLen) { static int transAddAuthPart(SConn* pConn, char* msg, int msgLen) {
SRpcHead* pHead = (SRpcHead*)msg; STransMsgHead* pHead = (STransMsgHead*)msg;
if (pConn->spi && pConn->secured == 0) { if (pConn->spi && pConn->secured == 0) {
// add auth part // add auth part
pHead->spi = pConn->spi; pHead->spi = pConn->spi;
SRpcDigest* pDigest = (SRpcDigest*)(msg + msgLen); STransDigestMsg* pDigest = (STransDigestMsg*)(msg + msgLen);
pDigest->timeStamp = htonl(taosGetTimestampSec()); pDigest->timeStamp = htonl(taosGetTimestampSec());
msgLen += sizeof(SRpcDigest); msgLen += sizeof(SRpcDigest);
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
rpcBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret); // transBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
// transBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
} else { } else {
pHead->spi = 0; pHead->spi = 0;
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
...@@ -502,9 +555,11 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, ...@@ -502,9 +555,11 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write
thrd->shandle = shandle; thrd->pTransInst = shandle;
thrd->fd = fds[0]; thrd->fd = fds[0];
thrd->pipe = &(srv->pipe[i][1]); // init read thrd->pipe = &(srv->pipe[i][1]); // init read
initWorkThrdObj(thrd);
int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd)); int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd));
if (err == 0) { if (err == 0) {
tDebug("sucess to create worker-thread %d", i); tDebug("sucess to create worker-thread %d", i);
...@@ -547,6 +602,7 @@ void rpcSendResponse(const SRpcMsg* pMsg) { ...@@ -547,6 +602,7 @@ void rpcSendResponse(const SRpcMsg* pMsg) {
SWorkThrdObj* pThrd = pConn->hostThrd; SWorkThrdObj* pThrd = pConn->hostThrd;
// opt later // opt later
pConn->sendMsg = *pMsg;
pthread_mutex_lock(&pThrd->connMtx); pthread_mutex_lock(&pThrd->connMtx);
QUEUE_PUSH(&pThrd->conn, &pConn->queue); QUEUE_PUSH(&pThrd->conn, &pConn->queue);
pthread_mutex_unlock(&pThrd->connMtx); pthread_mutex_unlock(&pThrd->connMtx);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册