提交 c45c1a06 编写于 作者: dengyihao's avatar dengyihao

fix push crashed

上级 e5d767cf
...@@ -29,6 +29,8 @@ extern "C" { ...@@ -29,6 +29,8 @@ extern "C" {
extern int tsRpcHeadSize; extern int tsRpcHeadSize;
typedef struct SRpcPush SRpcPush;
typedef struct SRpcConnInfo { typedef struct SRpcConnInfo {
uint32_t clientIp; uint32_t clientIp;
uint16_t clientPort; uint16_t clientPort;
...@@ -43,8 +45,17 @@ typedef struct SRpcMsg { ...@@ -43,8 +45,17 @@ typedef struct SRpcMsg {
int32_t code; int32_t code;
void * handle; // rpc handle returned to app void * handle; // rpc handle returned to app
void * ahandle; // app handle set by client void * ahandle; // app handle set by client
int persist; // keep handle or not, default 0
SRpcPush *push;
} SRpcMsg; } SRpcMsg;
typedef struct SRpcPush {
void *arg;
int (*callback)(void *arg, SRpcMsg *rpcMsg);
} SRpcPush;
typedef struct SRpcInit { typedef struct SRpcInit {
uint16_t localPort; // local port uint16_t localPort; // local port
char * label; // for debug purpose char * label; // for debug purpose
...@@ -83,6 +94,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); ...@@ -83,6 +94,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcReportProgress(void *pConn, char *pCont, int contLen); int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(int64_t rid); void rpcCancelRequest(int64_t rid);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -27,12 +27,16 @@ typedef struct SCliConn { ...@@ -27,12 +27,16 @@ typedef struct SCliConn {
SConnBuffer readBuf; SConnBuffer readBuf;
void* data; void* data;
queue conn; queue conn;
char spi;
char secured;
uint64_t expireTime; uint64_t expireTime;
int8_t notifyCount; // timers already notify to client int8_t notifyCount; // timers already notify to client
int32_t ref;
SRpcPush* push;
int persist; //
// spi configure
char spi;
char secured;
int32_t ref;
// debug and log info
struct sockaddr_in addr; struct sockaddr_in addr;
} SCliConn; } SCliConn;
...@@ -128,6 +132,10 @@ static void clientHandleResp(SCliConn* conn) { ...@@ -128,6 +132,10 @@ static void clientHandleResp(SCliConn* conn) {
tDebug("client conn %p %s received from %s:%d", conn, TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr), tDebug("client conn %p %s received from %s:%d", conn, TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr),
ntohs(conn->addr.sin_port)); ntohs(conn->addr.sin_port));
if (conn->push != NULL) {
(*conn->push->callback)(conn->push->arg, &rpcMsg);
} else {
if (pCtx->pSem == NULL) { if (pCtx->pSem == NULL) {
tTrace("client conn(sync) %p handle resp", conn); tTrace("client conn(sync) %p handle resp", conn);
(pRpc->cfp)(pRpc->parent, &rpcMsg, NULL); (pRpc->cfp)(pRpc->parent, &rpcMsg, NULL);
...@@ -136,6 +144,7 @@ static void clientHandleResp(SCliConn* conn) { ...@@ -136,6 +144,7 @@ static void clientHandleResp(SCliConn* conn) {
memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg)); memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg));
tsem_post(pCtx->pSem); tsem_post(pCtx->pSem);
} }
}
conn->notifyCount += 1; conn->notifyCount += 1;
// buf's mem alread translated to rpcMsg.pCont // buf's mem alread translated to rpcMsg.pCont
...@@ -144,7 +153,10 @@ static void clientHandleResp(SCliConn* conn) { ...@@ -144,7 +153,10 @@ static void clientHandleResp(SCliConn* conn) {
uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb); uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb);
SCliThrdObj* pThrd = conn->hostThrd; SCliThrdObj* pThrd = conn->hostThrd;
// user owns conn->persist = 1
if (conn->push != NULL) {
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
}
destroyCmsg(pMsg); destroyCmsg(pMsg);
conn->data = NULL; conn->data = NULL;
...@@ -154,7 +166,7 @@ static void clientHandleResp(SCliConn* conn) { ...@@ -154,7 +166,7 @@ static void clientHandleResp(SCliConn* conn) {
} }
} }
static void clientHandleExcept(SCliConn* pConn) { static void clientHandleExcept(SCliConn* pConn) {
if (pConn->data == NULL) { if (pConn->data == NULL && pConn->push == NULL) {
// handle conn except in conn pool // handle conn except in conn pool
clientConnDestroy(pConn, true); clientConnDestroy(pConn, true);
return; return;
...@@ -169,14 +181,18 @@ static void clientHandleExcept(SCliConn* pConn) { ...@@ -169,14 +181,18 @@ static void clientHandleExcept(SCliConn* pConn) {
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
rpcMsg.ahandle = pCtx->ahandle; rpcMsg.ahandle = pCtx->ahandle;
rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
if (pConn->push != NULL) {
(*pConn->push->callback)(pConn->push->arg, &rpcMsg);
} else {
if (pCtx->pSem == NULL) { if (pCtx->pSem == NULL) {
// SRpcInfo* pRpc = pMsg->ctx->pRpc;
(pCtx->pTransInst->cfp)(pCtx->pTransInst->parent, &rpcMsg, NULL); (pCtx->pTransInst->cfp)(pCtx->pTransInst->parent, &rpcMsg, NULL);
} else { } else {
memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg)); memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg));
// SRpcMsg rpcMsg // SRpcMsg rpcMsg
tsem_post(pCtx->pSem); tsem_post(pCtx->pSem);
} }
}
destroyCmsg(pMsg); destroyCmsg(pMsg);
pConn->data = NULL; pConn->data = NULL;
...@@ -411,6 +427,10 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -411,6 +427,10 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
tTrace("client msg tran time cost: %" PRIu64 "", el); tTrace("client msg tran time cost: %" PRIu64 "", el);
et = taosGetTimestampUs(); et = taosGetTimestampUs();
// if (pMsg->msg.handle != NULL) {
// // handle
//}
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
SCliConn* conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); SCliConn* conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
if (conn != NULL) { if (conn != NULL) {
...@@ -426,6 +446,8 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -426,6 +446,8 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
} }
clientWrite(conn); clientWrite(conn);
conn->push = pMsg->msg.push;
} else { } else {
SCliConn* conn = calloc(1, sizeof(SCliConn)); SCliConn* conn = calloc(1, sizeof(SCliConn));
conn->ref++; conn->ref++;
...@@ -444,6 +466,8 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -444,6 +466,8 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
conn->data = pMsg; conn->data = pMsg;
conn->hostThrd = pThrd; conn->hostThrd = pThrd;
conn->push = pMsg->msg.push;
struct sockaddr_in addr; struct sockaddr_in addr;
uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr); uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr);
// handle error in callback if fail to connect // handle error in callback if fail to connect
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册