提交 64d979fd 编写于 作者: U ubuntu

update transport

上级 1dfae9b4
......@@ -71,6 +71,10 @@ typedef struct SRpcInit {
// call back to keep conn or not
bool (*pfp)(void *parent, tmsg_t msgType);
// to support Send messages multiple times on a link
//
void* (*mfp)(void *parent, tmsg_t msgType);
void *parent;
} SRpcInit;
......@@ -89,6 +93,9 @@ void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp)
int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(int64_t rid);
// just release client conn to rpc instance, no close sock
void rpcReleaseHandle(void *handle);
void rpcRefHandle(void *handle, int8_t type);
void rpcUnrefHandle(void *handle, int8_t type);
......
......@@ -64,6 +64,7 @@ typedef struct {
void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey);
bool (*pfp)(void* parent, tmsg_t msgType);
void* (*mfp)(void* parent, tmsg_t msgType);
int32_t refCount;
void* parent;
......
......@@ -135,13 +135,12 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
} \
} while (0);
#define CONN_PERSIST_BY_APP(conn) do { if (conn->persist == false) { conn->persist = true; transRefCliHandle(conn);}} while(0)
#define CONN_NO_PERSIST_BY_APP(conn) ((conn)->persist == false)
static void* cliWorkThread(void* arg);
static void* cliNotifyApp() {}
static void cliHandleResp(SCliConn* conn) {
SCliMsg* pMsg = conn->data;
STransConnCtx* pCtx = pMsg->ctx;
SCliThrdObj* pThrd = conn->hostThrd;
SRpcInfo* pTransInst = pThrd->pTransInst;
......@@ -157,14 +156,24 @@ static void cliHandleResp(SCliConn* conn) {
rpcMsg.pCont = transContFromHead((char*)pHead);
rpcMsg.code = pHead->code;
rpcMsg.msgType = pHead->msgType;
rpcMsg.ahandle = pCtx->ahandle;
rpcMsg.ahandle = NULL;
if (pTransInst->pfp != NULL && (pTransInst->pfp)(pTransInst->parent, rpcMsg.msgType)) {
rpcMsg.handle = conn;
transRefCliHandle(conn);
SCliMsg* pMsg = conn->data;
STransConnCtx *pCtx = pMsg ? pMsg->ctx : NULL;
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) {
rpcMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, rpcMsg.msgType) : NULL;
} else {
rpcMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
}
//if (rpcMsg.ahandle == NULL) {
// tDebug("%s cli conn %p handle except", CONN_GET_INST_LABEL(conn), conn);
// return;
//}
conn->persist = 1;
tDebug("cli conn %p persist by app", conn);
if (pTransInst->pfp != NULL && (*pTransInst->pfp)(pTransInst->parent, rpcMsg.msgType)) {
rpcMsg.handle = conn;
CONN_PERSIST_BY_APP(conn);
tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
}
tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn,
......@@ -173,7 +182,7 @@ static void cliHandleResp(SCliConn* conn) {
conn->secured = pHead->secured;
if (pCtx->pSem == NULL) {
if (pCtx == NULL || pCtx->pSem == NULL) {
tTrace("%s cli conn %p handle resp", pTransInst->label, conn);
(pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL);
} else {
......@@ -184,8 +193,7 @@ static void cliHandleResp(SCliConn* conn) {
uv_read_start((uv_stream_t*)conn->stream, cliAllocBufferCb, cliRecvCb);
// user owns conn->persist = 1
if (conn->persist == 0) {
if (CONN_NO_PERSIST_BY_APP(conn)) {
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
}
destroyCmsg(conn->data);
......@@ -198,22 +206,29 @@ static void cliHandleResp(SCliConn* conn) {
}
static void cliHandleExcept(SCliConn* pConn) {
if (pConn->data == NULL) {
// handle conn except in conn pool
transUnrefCliHandle(pConn);
return;
if (pConn->broken == true || CONN_NO_PERSIST_BY_APP(pConn)) {
transUnrefCliHandle(pConn);
return;
}
}
SCliThrdObj* pThrd = pConn->hostThrd;
SRpcInfo* pTransInst = pThrd->pTransInst;
SCliMsg* pMsg = pConn->data;
STransConnCtx* pCtx = pMsg->ctx;
STransConnCtx *pCtx = pMsg ? pMsg->ctx : NULL;
SRpcMsg rpcMsg = {0};
rpcMsg.ahandle = pCtx->ahandle;
rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
rpcMsg.msgType = pMsg->msg.msgType + 1;
rpcMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0;
rpcMsg.ahandle = NULL;
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
rpcMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, rpcMsg.msgType) : NULL;
} else {
rpcMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
}
if (pCtx->pSem == NULL) {
if (pCtx == NULL || pCtx->pSem == NULL) {
tTrace("%s cli conn %p handle resp", pTransInst->label, pConn);
(pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL);
} else {
......@@ -358,6 +373,7 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) {
QUEUE_INIT(&conn->conn);
conn->hostThrd = pThrd;
conn->persist = false;
conn->broken = false;
transRefCliHandle(conn);
return conn;
......@@ -476,7 +492,6 @@ static SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn* conn = NULL;
if (pMsg->msg.handle != NULL) {
conn = (SCliConn*)(pMsg->msg.handle);
transUnrefCliHandle(conn);
if (conn != NULL) {
tTrace("%s cli conn %p reused", CONN_GET_INST_LABEL(conn), conn);
}
......@@ -514,6 +529,7 @@ static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
tTrace("%s cli conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port);
uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
}
conn->hThrdIdx = pCtx->hThrdIdx;
}
static void cliAsyncCb(uv_async_t* handle) {
......@@ -683,6 +699,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) {
// imp later
}
tDebug("send request at thread:%d %p", index, pMsg);
STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
pCtx->ahandle = pMsg->ahandle;
pCtx->msgType = pMsg->msgType;
......
......@@ -260,7 +260,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
}
tError("server conn %p read error: %s", conn, uv_err_name(nread));
if (nread < 0 || nread == UV_EOF) {
if (nread < 0) {
conn->broken = true;
transUnrefSrvHandle(conn);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册