提交 916dbe79 编写于 作者: dengyihao's avatar dengyihao

handle except

上级 a9b712ab
......@@ -38,13 +38,12 @@ typedef struct SRpcConnInfo {
typedef struct SRpcMsg {
tmsg_t msgType;
tmsg_t expectMsgType;
void * pCont;
int contLen;
int32_t code;
void * handle; // rpc handle returned to app
void * ahandle; // app handle set by client
int noResp; // has response or not(default 0 indicate resp);
int noResp; // has response or not(default 0, 0: resp, 1: no resp);
int persistHandle; // persist handle or not
} SRpcMsg;
......
......@@ -181,7 +181,7 @@ typedef struct {
#pragma pack(pop)
typedef enum { Normal, Quit, Release } STransMsgType;
typedef enum { Normal, Quit, Release, Register } STransMsgType;
typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken } ConnStatus;
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
......@@ -262,7 +262,8 @@ void transReleaseSrvHandle(void* handle);
void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransCtx* pCtx);
void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransMsg* pRsp);
void transSendResponse(const STransMsg* pMsg);
void transSendResponse(const STransMsg* msg);
void transRegisterMsg(const STransMsg* msg);
int transGetConnInfo(void* thandle, STransHandleInfo* pInfo);
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
......
......@@ -144,10 +144,7 @@ void rpcUnrefHandle(void* handle, int8_t type) {
(*taosUnRefHandle[type])(handle);
}
void rpcRegisterBrokenLinkArg(SRpcMsg* msg) {
//
rpcSendResponse(msg);
}
void rpcRegisterBrokenLinkArg(SRpcMsg* msg) { rpcSendResponse(msg); }
void rpcReleaseHandle(void* handle, int8_t type) {
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
(*transReleaseHandle[type])(handle);
......
......@@ -17,6 +17,12 @@
#include "transComm.h"
typedef struct {
int notifyCount; //
int init; // init or not
STransMsg msg;
} SSrvRegArg;
typedef struct SSrvConn {
T_REF_DECLARE()
uv_tcp_t* pTcp;
......@@ -33,7 +39,8 @@ typedef struct SSrvConn {
void* hostThrd;
SArray* srvMsgs;
bool broken; // conn broken;
SSrvRegArg regArg;
bool broken; // conn broken;
ConnStatus status;
struct sockaddr_in addr;
......@@ -117,7 +124,9 @@ static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/)
static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd);
static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd);
static void uvHandleSendResp(SSrvMsg* msg, SWorkThrdObj* thrd);
static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleSendResp, uvHandleQuit, uvHandleRelease};
static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd);
static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleSendResp, uvHandleQuit, uvHandleRelease,
uvHandleRegister};
static void uvDestroyConn(uv_handle_t* handle);
......@@ -285,11 +294,13 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
tError("server conn %p read error: %s", conn, uv_err_name(nread));
if (nread < 0) {
conn->broken = true;
// uvNotifyLinkBrokenToApp(conn);
// STrans* pTransInst = conn->pTransInst;
// if (pTransInst->efp != NULL && (pTransInst->efp)(NULL, conn->inType)) {
//}
if (conn->status == ConnAcquire) {
if (conn->regArg.init) {
STrans* pTransInst = conn->pTransInst;
(*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
memset(&conn->regArg, 0, sizeof(conn->regArg));
}
}
transUnrefSrvHandle(conn);
}
}
......@@ -317,6 +328,17 @@ void uvOnSendCb(uv_write_t* req, int status) {
if (msg->type == Release && conn->status != ConnNormal) {
conn->status = ConnNormal;
transUnrefSrvHandle(conn);
} else if (msg->type == Register && conn->status == ConnAcquire) {
conn->regArg.notifyCount = 0;
conn->regArg.init = 1;
conn->regArg.msg = msg->msg;
if (conn->broken) {
STrans* pTransInst = conn->pTransInst;
(pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
memset(&conn->regArg, 0, sizeof(conn->regArg));
}
free(msg);
return;
}
destroySmsg(msg);
// send second data, just use for push
......@@ -403,16 +425,6 @@ static void uvStartSendResp(SSrvMsg* smsg) {
return;
}
// static void uvNotifyLinkBrokenToApp(SSrvConn* conn) {
// STrans* pTransInst = conn->pTransInst;
// if (pTransInst->efp != NULL && (*pTransInst->efp)(NULL, conn->inType) && T_REF_VAL_GET(conn) >= 2) {
// STransMsg transMsg = {0};
// transMsg.msgType = conn->inType;
// transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
// // transRefSrvHandle(conn);
// (*pTransInst->cfp)(pTransInst->parent, &transMsg, 0);
// }
//}
static void destroySmsg(SSrvMsg* smsg) {
if (smsg == NULL) {
return;
......@@ -641,6 +653,7 @@ static SSrvConn* createConn(void* hThrd) {
pConn->srvMsgs = taosArrayInit(2, sizeof(void*)); //
tTrace("conn %p created", pConn);
memset(&pConn->regArg, 0, sizeof(pConn->regArg));
pConn->broken = false;
pConn->status = ConnNormal;
......@@ -774,6 +787,7 @@ void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) {
if (conn->status == ConnAcquire) {
if (taosArrayGetSize(conn->srvMsgs) > 0) {
taosArrayPush(conn->srvMsgs, &msg);
return;
}
taosArrayPush(conn->srvMsgs, &msg);
uvStartSendRespInternal(msg);
......@@ -790,6 +804,25 @@ void uvHandleSendResp(SSrvMsg* msg, SWorkThrdObj* thrd) {
// send msg to client
uvStartSendResp(msg);
}
void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd) {
SSrvConn* conn = msg->pConn;
if (conn->status == ConnAcquire) {
if (taosArrayGetSize(conn->srvMsgs) > 0) {
taosArrayPush(conn->srvMsgs, &msg);
return;
}
conn->regArg.notifyCount = 0;
conn->regArg.init = 1;
conn->regArg.msg = msg->msg;
if (conn->broken) {
STrans* pTransInst = conn->pTransInst;
(*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
memset(&conn->regArg, 0, sizeof(conn->regArg));
}
free(msg);
}
}
void destroyWorkThrd(SWorkThrdObj* pThrd) {
if (pThrd == NULL) {
return;
......@@ -884,6 +917,20 @@ void transSendResponse(const STransMsg* pMsg) {
tTrace("server conn %p start to send resp", pConn);
transSendAsync(pThrd->asyncPool, &srvMsg->q);
}
void transRegisterMsg(const STransMsg* msg) {
if (msg->handle == NULL) {
return;
}
SSrvConn* pConn = msg->handle;
SWorkThrdObj* pThrd = pConn->hostThrd;
SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
srvMsg->pConn = pConn;
srvMsg->msg = *msg;
srvMsg->type = Register;
tTrace("server conn %p start to send resp", pConn);
transSendAsync(pThrd->asyncPool, &srvMsg->q);
}
int transGetConnInfo(void* thandle, STransHandleInfo* pInfo) {
SSrvConn* pConn = thandle;
struct sockaddr_in addr = pConn->addr;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册