提交 8640cab1 编写于 作者: dengyihao's avatar dengyihao

feat: add retry

...@@ -45,7 +45,7 @@ typedef struct SRpcHandleInfo { ...@@ -45,7 +45,7 @@ typedef struct SRpcHandleInfo {
int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp); int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp);
int32_t persistHandle; // persist handle or not int32_t persistHandle; // persist handle or not
STraceId traceId; STraceId traceId;
// int64_t traceId; int8_t hasEpSet;
// app info // app info
void *ahandle; // app handle set by client void *ahandle; // app handle set by client
...@@ -123,7 +123,7 @@ void * rpcReallocCont(void *ptr, int32_t contLen); ...@@ -123,7 +123,7 @@ void * rpcReallocCont(void *ptr, int32_t contLen);
void rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); void rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
void rpcSendResponse(const SRpcMsg *pMsg); void rpcSendResponse(const SRpcMsg *pMsg);
void rpcRegisterBrokenLinkArg(SRpcMsg *msg); void rpcRegisterBrokenLinkArg(SRpcMsg *msg);
void rpcReleaseHandle(void *handle, int8_t type); // just release client conn to rpc instance, no close sock void rpcReleaseHandle(void *handle, int8_t type); // just release conn to rpc instance, no close sock
// These functions will not be called in the child process // These functions will not be called in the child process
void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet); void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet);
......
...@@ -148,6 +148,7 @@ typedef struct { ...@@ -148,6 +148,7 @@ typedef struct {
char release : 2; char release : 2;
char secured : 2; char secured : 2;
char spi : 2; char spi : 2;
char hasEpSet : 2; // contain epset or not, 0(default): no epset, 1: contain epset
char user[TSDB_UNI_LEN]; char user[TSDB_UNI_LEN];
STraceId traceId; STraceId traceId;
...@@ -378,13 +379,10 @@ typedef struct SDelayQueue { ...@@ -378,13 +379,10 @@ typedef struct SDelayQueue {
uv_loop_t* loop; uv_loop_t* loop;
} SDelayQueue; } SDelayQueue;
int transDQCreate(uv_loop_t* loop, SDelayQueue** queue); int transDQCreate(uv_loop_t* loop, SDelayQueue** queue);
void transDQDestroy(SDelayQueue* queue); void transDQDestroy(SDelayQueue* queue);
int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs);
int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs);
// void transPrintEpSet(SEpSet* pEpSet);
bool transEpSetIsEqual(SEpSet* a, SEpSet* b); bool transEpSetIsEqual(SEpSet* a, SEpSet* b);
/* /*
* init global func * init global func
......
...@@ -309,6 +309,7 @@ void cliHandleResp(SCliConn* conn) { ...@@ -309,6 +309,7 @@ void cliHandleResp(SCliConn* conn) {
transMsg.msgType = pHead->msgType; transMsg.msgType = pHead->msgType;
transMsg.info.ahandle = NULL; transMsg.info.ahandle = NULL;
transMsg.info.traceId = pHead->traceId; transMsg.info.traceId = pHead->traceId;
transMsg.info.hasEpSet = pHead->hasEpSet;
SCliMsg* pMsg = NULL; SCliMsg* pMsg = NULL;
STransConnCtx* pCtx = NULL; STransConnCtx* pCtx = NULL;
...@@ -1011,6 +1012,23 @@ void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) { ...@@ -1011,6 +1012,23 @@ void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) {
*val = newVal; *val = newVal;
} }
} }
bool cliTryToExtractEpSet(STransMsg* pResp, SEpSet* dst) {
if (pResp == NULL || pResp->info.hasEpSet == 0) {
return false;
}
tDeserializeSEpSet(pResp->pCont, pResp->contLen, dst);
int32_t tlen = tSerializeSEpSet(NULL, 0, dst);
int32_t bufLen = pResp->contLen - tlen;
char* buf = rpcMallocCont(bufLen);
memcpy(buf, (char*)pResp->pCont + tlen, bufLen);
pResp->pCont = buf;
pResp->contLen = bufLen;
return true;
}
int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
SCliThrd* pThrd = pConn->hostThrd; SCliThrd* pThrd = pConn->hostThrd;
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
...@@ -1055,6 +1073,12 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { ...@@ -1055,6 +1073,12 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
} }
STraceId* trace = &pResp->info.traceId; STraceId* trace = &pResp->info.traceId;
if (cliTryToExtractEpSet(pResp, &pCtx->epSet)) {
char tbuf[256] = {0};
EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
}
if (pCtx->pSem != NULL) { if (pCtx->pSem != NULL) {
tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn); tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn);
if (pCtx->pRsp == NULL) { if (pCtx->pRsp == NULL) {
......
...@@ -337,16 +337,8 @@ void uvOnSendCb(uv_write_t* req, int status) { ...@@ -337,16 +337,8 @@ void uvOnSendCb(uv_write_t* req, int status) {
tTrace("conn %p data already was written on stream", conn); tTrace("conn %p data already was written on stream", conn);
if (!transQueueEmpty(&conn->srvMsgs)) { if (!transQueueEmpty(&conn->srvMsgs)) {
SSvrMsg* msg = transQueuePop(&conn->srvMsgs); SSvrMsg* msg = transQueuePop(&conn->srvMsgs);
// if (msg->type == Release && conn->status != ConnNormal) {
// conn->status = ConnNormal;
// transUnrefSrvHandle(conn);
// reallocConnRef(conn);
// destroySmsg(msg);
// transQueueClear(&conn->srvMsgs);
// return;
//}
destroySmsg(msg); destroySmsg(msg);
// send second data, just use for push // send cached data
if (!transQueueEmpty(&conn->srvMsgs)) { if (!transQueueEmpty(&conn->srvMsgs)) {
msg = (SSvrMsg*)transQueueGet(&conn->srvMsgs, 0); msg = (SSvrMsg*)transQueueGet(&conn->srvMsgs, 0);
if (msg->type == Register && conn->status == ConnAcquire) { if (msg->type == Register && conn->status == ConnAcquire) {
...@@ -383,7 +375,6 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) { ...@@ -383,7 +375,6 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
tError("fail to dispatch conn to work thread"); tError("fail to dispatch conn to work thread");
} }
uv_close((uv_handle_t*)req->data, uvFreeCb); uv_close((uv_handle_t*)req->data, uvFreeCb);
// taosMemoryFree(req->data);
taosMemoryFree(req); taosMemoryFree(req);
} }
...@@ -397,6 +388,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { ...@@ -397,6 +388,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
pHead->ahandle = (uint64_t)pMsg->info.ahandle; pHead->ahandle = (uint64_t)pMsg->info.ahandle;
pHead->traceId = pMsg->info.traceId; pHead->traceId = pMsg->info.traceId;
pHead->hasEpSet = pMsg->info.hasEpSet;
if (pConn->status == ConnNormal) { if (pConn->status == ConnNormal) {
pHead->msgType = pConn->inType + 1; pHead->msgType = pConn->inType + 1;
...@@ -409,6 +401,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { ...@@ -409,6 +401,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
transUnrefSrvHandle(pConn); transUnrefSrvHandle(pConn);
} else { } else {
pHead->msgType = pMsg->msgType; pHead->msgType = pMsg->msgType;
// set up resp msg type
if (pHead->msgType == 0 && transMsgLenFromCont(pMsg->contLen) == sizeof(STransMsgHead)) if (pHead->msgType == 0 && transMsgLenFromCont(pMsg->contLen) == sizeof(STransMsgHead))
pHead->msgType = pConn->inType + 1; pHead->msgType = pConn->inType + 1;
} }
...@@ -1121,6 +1114,4 @@ _return2: ...@@ -1121,6 +1114,4 @@ _return2:
rpcFreeCont(msg->pCont); rpcFreeCont(msg->pCont);
} }
int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; }
#endif #endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册