提交 a5253f25 编写于 作者: dengyihao's avatar dengyihao

add client

上级 a50c3f3d
...@@ -23,6 +23,8 @@ typedef struct SCliConn { ...@@ -23,6 +23,8 @@ typedef struct SCliConn {
uv_write_t* writeReq; uv_write_t* writeReq;
void* data; void* data;
queue conn; queue conn;
char spi;
char secured;
} SCliConn; } SCliConn;
typedef struct SCliMsg { typedef struct SCliMsg {
SRpcReqContext* context; SRpcReqContext* context;
...@@ -47,6 +49,10 @@ typedef struct SClientObj { ...@@ -47,6 +49,10 @@ typedef struct SClientObj {
SCliThrdObj** pThreadObj; SCliThrdObj** pThreadObj;
} SClientObj; } SClientObj;
// conn pool
static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port);
static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn);
static void clientAllocrReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void clientAllocrReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
static void clientWriteCb(uv_write_t* req, int status); static void clientWriteCb(uv_write_t* req, int status);
...@@ -93,6 +99,16 @@ static void clientWriteCb(uv_write_t* req, int status) { ...@@ -93,6 +99,16 @@ static void clientWriteCb(uv_write_t* req, int status) {
uv_read_start((uv_stream_t*)pConn->stream, clientAllocrReadBufferCb, clientReadCb); uv_read_start((uv_stream_t*)pConn->stream, clientAllocrReadBufferCb, clientReadCb);
// impl later // impl later
} }
static void clientWrite(SCliConn* pConn) {
SCliMsg* pMsg = pConn->data;
SRpcHead* pHead = rpcHeadFromCont(pMsg->context->pCont);
int msgLen = rpcMsgLenFromCont(pMsg->context->contLen);
char* msg = (char*)(pHead);
uv_buf_t wb = uv_buf_init(msg, msgLen);
uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb);
}
static void clientConnCb(uv_connect_t* req, int status) { static void clientConnCb(uv_connect_t* req, int status) {
// impl later // impl later
SCliConn* pConn = req->data; SCliConn* pConn = req->data;
...@@ -105,8 +121,8 @@ static void clientConnCb(uv_connect_t* req, int status) { ...@@ -105,8 +121,8 @@ static void clientConnCb(uv_connect_t* req, int status) {
SCliMsg* pMsg = pConn->data; SCliMsg* pMsg = pConn->data;
SEpSet* pEpSet = &pMsg->context->epSet; SEpSet* pEpSet = &pMsg->context->epSet;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
rpcMsg.ahandle = pMsg->context->ahandle; // rpcMsg.ahandle = pMsg->context->ahandle;
rpcMsg.pCont = NULL; // rpcMsg.pCont = NULL;
char* fqdn = pEpSet->fqdn[pEpSet->inUse]; char* fqdn = pEpSet->fqdn[pEpSet->inUse];
uint32_t port = pEpSet->port[pEpSet->inUse]; uint32_t port = pEpSet->port[pEpSet->inUse];
...@@ -119,15 +135,16 @@ static void clientConnCb(uv_connect_t* req, int status) { ...@@ -119,15 +135,16 @@ static void clientConnCb(uv_connect_t* req, int status) {
return; return;
} }
assert(pConn->stream == req->handle); assert(pConn->stream == req->handle);
uv_buf_t wb;
uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb);
} }
static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) { static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) {
// impl later // impl later
return NULL; return NULL;
} }
static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn) {
// impl later
}
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SEpSet* pEpSet = &pMsg->context->epSet; SEpSet* pEpSet = &pMsg->context->epSet;
...@@ -140,11 +157,12 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -140,11 +157,12 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn* conn = getConnFromCache(pThrd->cache, fqdn, port); SCliConn* conn = getConnFromCache(pThrd->cache, fqdn, port);
if (conn != NULL) { if (conn != NULL) {
// impl later
conn->data = pMsg; conn->data = pMsg;
conn->writeReq->data = conn; conn->writeReq->data = conn;
uv_buf_t wb; clientWrite(conn);
uv_write(conn->writeReq, (uv_stream_t*)conn->stream, &wb, 1, clientWriteCb); // uv_buf_t wb;
// impl later // uv_write(conn->writeReq, (uv_stream_t*)conn->stream, &wb, 1, clientWriteCb);
} else { } else {
SCliConn* conn = malloc(sizeof(SCliConn)); SCliConn* conn = malloc(sizeof(SCliConn));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册