提交 41f60069 编写于 作者: dengyihao's avatar dengyihao

add libuv

上级 a00a8dd9
...@@ -22,6 +22,27 @@ extern "C" { ...@@ -22,6 +22,27 @@ extern "C" {
#endif #endif
#ifdef USE_UV #ifdef USE_UV
typedef struct {
char version : 4; // RPC version
char comp : 4; // compression algorithm, 0:no compression 1:lz4
char resflag : 2; // reserved bits
char spi : 3; // security parameter index
char encrypt : 3; // encrypt algorithm, 0: no encryption
uint16_t tranId; // transcation ID
uint32_t linkUid; // for unique connection ID assigned by client
uint64_t ahandle; // ahandle assigned by client
uint32_t sourceId; // source ID, an index for connection list
uint32_t destId; // destination ID, an index for connection list
uint32_t destIp; // destination IP address, for NAT scenario
char user[TSDB_UNI_LEN]; // user ID
uint16_t port; // for UDP only, port may be changed
char empty[1]; // reserved
uint16_t msgType; // message type
int32_t msgLen; // message length including the header iteslf
uint32_t msgVer;
int32_t code; // code in response message
uint8_t content[0]; // message body starts from here
} SRpcHead;
#else #else
......
...@@ -13,7 +13,9 @@ ...@@ -13,7 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifdef USE_UV
#include <uv.h> #include <uv.h>
#endif
#include "lz4.h" #include "lz4.h"
#include "os.h" #include "os.h"
#include "rpcCache.h" #include "rpcCache.h"
...@@ -68,6 +70,8 @@ typedef struct { ...@@ -68,6 +70,8 @@ typedef struct {
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
static const char* notify = "a";
typedef struct SThreadObj { typedef struct SThreadObj {
pthread_t thread; pthread_t thread;
uv_pipe_t* pipe; uv_pipe_t* pipe;
...@@ -90,23 +94,39 @@ typedef struct SServerObj { ...@@ -90,23 +94,39 @@ typedef struct SServerObj {
uint32_t port; uint32_t port;
} SServerObj; } SServerObj;
typedef struct SContent {
char* buf;
int len;
int cap;
int toRead;
} SContent;
typedef struct SConnCtx { typedef struct SConnCtx {
uv_tcp_t* pTcp; uv_tcp_t* pTcp;
uv_write_t* pWriter;
uv_timer_t* pTimer; uv_timer_t* pTimer;
uv_async_t* pWorkerAsync; uv_async_t* pWorkerAsync;
queue queue; queue queue;
int ref; int ref;
int persist; // persist connection or not int persist; // persist connection or not
SContent pCont;
int count;
} SConnCtx; } SConnCtx;
static void allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void allocReadBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void onTimeout(uv_timer_t* handle);
static void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); static void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
static void allocConnBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void onTimeout(uv_timer_t* handle);
static void onWrite(uv_write_t* req, int status); static void onWrite(uv_write_t* req, int status);
static void onAccept(uv_stream_t* stream, int status); static void onAccept(uv_stream_t* stream, int status);
static void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); static void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
static void workerAsyncCB(uv_async_t* handle); static void workerAsyncCB(uv_async_t* handle);
static SConnCtx* connCtxCreate();
static void connCtxDestroy(SConnCtx* ctx);
static void uvConnCtxDestroy(uv_handle_t* handle);
static void* workerThread(void* arg); static void* workerThread(void* arg);
static void* acceptThread(void* arg); static void* acceptThread(void* arg);
...@@ -131,12 +151,11 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, ...@@ -131,12 +151,11 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
for (int i = 0; i < srv->numOfThread; i++) { for (int i = 0; i < srv->numOfThread; i++) {
SThreadObj* thrd = (SThreadObj*)calloc(1, sizeof(SThreadObj)); SThreadObj* thrd = (SThreadObj*)calloc(1, sizeof(SThreadObj));
srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
int fds[2]; int fds[2];
if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
return NULL; return NULL;
} }
srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write
...@@ -147,7 +166,7 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, ...@@ -147,7 +166,7 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
tDebug("sucess to create worker-thread %d", i); tDebug("sucess to create worker-thread %d", i);
// printf("thread %d create\n", i); // printf("thread %d create\n", i);
} else { } else {
// clear all resource later // TODO: clear all other resource later
tError("failed to create worker-thread %d", i); tError("failed to create worker-thread %d", i);
} }
srv->pThreadObj[i] = thrd; srv->pThreadObj[i] = thrd;
...@@ -171,7 +190,6 @@ void* rpcOpen(const SRpcInit* pInit) { ...@@ -171,7 +190,6 @@ void* rpcOpen(const SRpcInit* pInit) {
tstrncpy(pRpc->label, pInit->label, strlen(pInit->label)); tstrncpy(pRpc->label, pInit->label, strlen(pInit->label));
} }
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
pRpc->tcphandle = taosInitServer(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); pRpc->tcphandle = taosInitServer(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
return pRpc; return pRpc;
} }
...@@ -190,26 +208,106 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { ...@@ -190,26 +208,106 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) {
int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; } int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; }
void rpcCancelRequest(int64_t rid) { return; } void rpcCancelRequest(int64_t rid) { return; }
void allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { void allocReadBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
buf->base = malloc(suggested_size); static const int CAPACITY = 1024;
buf->len = suggested_size; tDebug("pre alloc buffer for read ");
SConnCtx* ctx = handle->data;
SContent* pCont = &ctx->pCont;
if (pCont->cap == 0) {
pCont->buf = (char*)calloc(CAPACITY, sizeof(char));
pCont->len = 0;
pCont->cap = CAPACITY;
pCont->toRead = -1;
buf->base = pCont->buf;
buf->len = CAPACITY;
} else {
if (pCont->len >= pCont->cap) {
if (pCont->toRead == -1) {
pCont->cap *= 2;
pCont->buf = realloc(pCont->buf, pCont->cap);
} else if (pCont->len + pCont->toRead > pCont->cap) {
pCont->cap = pCont->len + pCont->toRead;
pCont->buf = realloc(pCont->buf, pCont->len + pCont->toRead);
}
}
buf->base = pCont->buf + pCont->len;
buf->len = pCont->cap - pCont->len;
}
// if (ctx->pCont.cap == 0) {
// ctx->pCont.buf = (char*)calloc(64, sizeof(char));
// ctx->pCont.len = 0;
// ctx->pCont.cap = 64;
// //
// buf->base = ctx->pCont.buf;
// buf->len = sz;
//} else {
// if (ctx->pCont.len + sz > ctx->pCont.cap) {
// ctx->pCont.cap *= 2;
// ctx->pCont.buf = realloc(ctx->pCont.buf, ctx->pCont.cap);
// }
// buf->base = ctx->pCont.buf + ctx->pCont.len;
// buf->len = sz;
//}
}
// change later
static bool handleUserData(SContent* data) {
SRpcHead rpcHead;
bool finish = false;
int32_t msgLen, leftLen, retLen;
int32_t headLen = sizeof(rpcHead);
if (data->len >= headLen) {
memcpy((char*)&rpcHead, data->buf, headLen);
msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
if (msgLen + headLen <= data->len) {
return true;
} else {
return false;
}
} else {
return false;
}
} }
void onTimeout(uv_timer_t* handle) { void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
// opt // opt
tDebug("time out"); SConnCtx* ctx = cli->data;
SContent* pCont = &ctx->pCont;
if (nread > 0) {
pCont->len += nread;
bool finish = handleUserData(pCont);
if (finish == false) {
tDebug("continue read");
} else {
tDebug("read completely");
}
return;
}
if (nread != UV_EOF) {
tDebug("Read error %s\n", uv_err_name(nread));
}
uv_close((uv_handle_t*)cli, uvConnCtxDestroy);
} }
void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { void allocConnBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
buf->base = malloc(sizeof(char));
buf->len = 2;
}
void onTimeout(uv_timer_t* handle) {
// opt // opt
tDebug("data already was read on a stream"); tDebug("time out");
} }
void onWrite(uv_write_t* req, int status) { void onWrite(uv_write_t* req, int status) {
SConnCtx* ctx = req->data;
if (status == 0) { if (status == 0) {
tDebug("data already was written on stream"); tDebug("data already was written on stream");
} else {
connCtxDestroy(ctx);
} }
free(req);
// opt // opt
} }
...@@ -243,7 +341,7 @@ void onAccept(uv_stream_t* stream, int status) { ...@@ -243,7 +341,7 @@ void onAccept(uv_stream_t* stream, int status) {
if (uv_accept(stream, (uv_stream_t*)cli) == 0) { if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t)); uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t));
uv_buf_t buf = uv_buf_init("a", 1); uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));
pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThread; pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThread;
tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx); tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
...@@ -253,6 +351,7 @@ void onAccept(uv_stream_t* stream, int status) { ...@@ -253,6 +351,7 @@ void onAccept(uv_stream_t* stream, int status) {
} }
} }
void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
tDebug("connection coming");
if (nread < 0) { if (nread < 0) {
if (nread != UV_EOF) { if (nread != UV_EOF) {
tError("read error %s", uv_err_name(nread)); tError("read error %s", uv_err_name(nread));
...@@ -261,6 +360,11 @@ void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { ...@@ -261,6 +360,11 @@ void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
uv_close((uv_handle_t*)q, NULL); uv_close((uv_handle_t*)q, NULL);
return; return;
} }
// free memory allocated by
assert(nread == strlen(notify));
assert(buf->base[0] == notify[0]);
free(buf->base);
SThreadObj* pObj = (SThreadObj*)container_of(q, struct SThreadObj, pipe); SThreadObj* pObj = (SThreadObj*)container_of(q, struct SThreadObj, pipe);
uv_pipe_t* pipe = (uv_pipe_t*)q; uv_pipe_t* pipe = (uv_pipe_t*)q;
...@@ -268,30 +372,33 @@ void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { ...@@ -268,30 +372,33 @@ void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
tError("No pending count"); tError("No pending count");
return; return;
} }
uv_handle_type pending = uv_pipe_pending_type(pipe); uv_handle_type pending = uv_pipe_pending_type(pipe);
assert(pending == UV_TCP); assert(pending == UV_TCP);
SConnCtx* pConn = malloc(sizeof(SConnCtx)); SConnCtx* pConn = connCtxCreate();
/* init conn timer*/ /* init conn timer*/
pConn->pTimer = malloc(sizeof(uv_timer_t)); pConn->pTimer = malloc(sizeof(uv_timer_t));
uv_timer_init(pObj->loop, pConn->pTimer); uv_timer_init(pObj->loop, pConn->pTimer);
pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
pConn->pWorkerAsync = pObj->workerAsync; // thread safty pConn->pWorkerAsync = pObj->workerAsync; // thread safty
// init client handle
pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
uv_tcp_init(pObj->loop, pConn->pTcp); uv_tcp_init(pObj->loop, pConn->pTcp);
pConn->pTcp->data = pConn;
// init write request, just
pConn->pWriter = calloc(1, sizeof(uv_write_t));
pConn->pWriter->data = pConn;
if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) { if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
uv_os_fd_t fd; uv_os_fd_t fd;
uv_fileno((const uv_handle_t*)pConn->pTcp, &fd); uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
tDebug("new connection created: %d", fd); tDebug("new connection created: %d", fd);
uv_timer_start(pConn->pTimer, onTimeout, 10, 0); uv_read_start((uv_stream_t*)(pConn->pTcp), allocReadBuffer, onRead);
uv_read_start((uv_stream_t*)(pConn->pTcp), allocBuffer, onRead);
} else { } else {
uv_timer_stop(pConn->pTimer); connCtxDestroy(pConn);
free(pConn->pTimer);
uv_close((uv_handle_t*)pConn->pTcp, NULL);
free(pConn->pTcp);
free(pConn);
} }
} }
...@@ -325,11 +432,30 @@ void* workerThread(void* arg) { ...@@ -325,11 +432,30 @@ void* workerThread(void* arg) {
pObj->workerAsync = malloc(sizeof(uv_async_t)); pObj->workerAsync = malloc(sizeof(uv_async_t));
uv_async_init(pObj->loop, pObj->workerAsync, workerAsyncCB); uv_async_init(pObj->loop, pObj->workerAsync, workerAsyncCB);
// pObj->workerAsync->data = (void*)pObj; uv_read_start((uv_stream_t*)pObj->pipe, allocConnBuffer, onConnection);
uv_read_start((uv_stream_t*)pObj->pipe, allocBuffer, onConnection);
uv_run(pObj->loop, UV_RUN_DEFAULT); uv_run(pObj->loop, UV_RUN_DEFAULT);
} }
static SConnCtx* connCtxCreate() {
SConnCtx* pConn = (SConnCtx*)calloc(1, sizeof(SConnCtx));
return pConn;
}
static void connCtxDestroy(SConnCtx* ctx) {
if (ctx == NULL) {
return;
}
uv_timer_stop(ctx->pTimer);
free(ctx->pTimer);
uv_close((uv_handle_t*)ctx->pTcp, NULL);
free(ctx->pTcp);
free(ctx->pWriter);
free(ctx);
// handle
}
static void uvConnCtxDestroy(uv_handle_t* handle) {
SConnCtx* ctx = handle->data;
connCtxDestroy(ctx);
}
#else #else
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest)) #define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest))
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册