未验证 提交 a558b445 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #11906 from taosdata/feature/support_retry

Feature/support retry
......@@ -59,9 +59,13 @@ typedef struct {
void * pNode;
} SNodeMsg;
typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *);
typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *rf);
typedef int (*RpcAfp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
typedef int (*RpcRfp)(void *parent, SRpcMsg *, SEpSet *);
///
// // SRpcMsg code
// REDIERE,
// NOT READY, EpSet
typedef bool (*RpcRfp)(int32_t code);
typedef struct SRpcInit {
uint16_t localPort; // local port
......
......@@ -103,6 +103,9 @@ typedef void* queue[2];
/* Return the structure holding the given element. */
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
#define TRANS_RETRY_COUNT_LIMIT 10 // retry count limit
#define TRANS_RETRY_INTERVAL 5 // ms retry interval
typedef struct {
SRpcInfo* pRpc; // associated SRpcInfo
SEpSet epSet; // ip list provided by app
......@@ -137,14 +140,12 @@ typedef struct {
int8_t connType; // connection type cli/srv
int64_t rid; // refId returned by taosAddRef
int8_t retryCount;
STransCtx appCtx; //
STransMsg* pRsp; // for synchronous API
tsem_t* pSem; // for synchronous API
int hThrdIdx;
char* ip;
uint32_t port;
// SEpSet* pSet; // for synchronous API
int hThrdIdx;
} STransConnCtx;
#pragma pack(push, 1)
......@@ -215,8 +216,6 @@ void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
bool transCompressMsg(char* msg, int32_t len, int32_t* flen);
bool transDecompressMsg(char* msg, int32_t len, int32_t* flen);
void transConnCtxDestroy(STransConnCtx* ctx);
void transFreeMsg(void* msg);
//
......@@ -262,8 +261,8 @@ void transUnrefCliHandle(void* handle);
void transReleaseCliHandle(void* handle);
void transReleaseSrvHandle(void* handle);
void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransCtx* pCtx);
void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransMsg* pRsp);
void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx);
void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp);
void transSendResponse(const STransMsg* msg);
void transRegisterMsg(const STransMsg* msg);
int transGetConnInfo(void* thandle, STransHandleInfo* pInfo);
......
......@@ -62,8 +62,7 @@ typedef struct {
char ckey[TSDB_PASSWORD_LEN]; // ciphering key
void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey);
int (*retry)(void* parent, SRpcMsg*, SEpSet*);
bool (*retry)(int32_t code);
int32_t refCount;
void* parent;
......
......@@ -38,7 +38,6 @@ void* rpcOpen(const SRpcInit* pInit) {
// register callback handle
pRpc->cfp = pInit->cfp;
pRpc->afp = pInit->afp;
pRpc->retry = pInit->rfp;
if (pInit->connType == TAOS_CONN_SERVER) {
......@@ -116,19 +115,13 @@ int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; }
void rpcCancelRequest(int64_t rid) { return; }
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
uint32_t port = pEpSet->eps[pEpSet->inUse].port;
transSendRequest(shandle, ip, port, pMsg, NULL);
transSendRequest(shandle, pEpSet, pMsg, NULL);
}
void rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) {
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
uint32_t port = pEpSet->eps[pEpSet->inUse].port;
transSendRequest(shandle, ip, port, pMsg, pCtx);
transSendRequest(shandle, pEpSet, pMsg, pCtx);
}
void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
uint32_t port = pEpSet->eps[pEpSet->inUse].port;
transSendRecv(shandle, ip, port, pMsg, pRsp);
transSendRecv(shandle, pEpSet, pMsg, pRsp);
}
void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
/* * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
......@@ -97,7 +96,7 @@ static void cliSendCb(uv_write_t* req, int status);
static void cliConnCb(uv_connect_t* req, int status);
static void cliAsyncCb(uv_async_t* handle);
static void cliAppCb(SCliConn* pConn, STransMsg* pMsg);
static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg);
static SCliConn* cliCreateConn(SCliThrdObj* thrd);
static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
......@@ -227,6 +226,9 @@ static void cliWalkCb(uv_handle_t* handle, void* arg);
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1)
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
#define EPSET_GET_INUSE_IP(epSet) ((epSet)->eps[(epSet)->inUse].fqdn)
#define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port)
static void* cliWorkThread(void* arg);
bool cliMaySendCachedMsg(SCliConn* conn) {
......@@ -311,14 +313,10 @@ void cliHandleResp(SCliConn* conn) {
return;
}
if (pCtx == NULL || pCtx->pSem == NULL) {
tTrace("%s cli conn %p handle resp", pTransInst->label, conn);
cliAppCb(conn, &transMsg);
//(pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
} else {
tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, conn);
memcpy((char*)pCtx->pRsp, (char*)&transMsg, sizeof(transMsg));
tsem_post(pCtx->pSem);
int ret = cliAppCb(conn, &transMsg, pMsg);
if (ret != 0) {
tTrace("try to send req to next node");
return;
}
destroyCmsg(pMsg);
......@@ -375,17 +373,15 @@ void cliHandleExcept(SCliConn* pConn) {
}
if (pCtx == NULL || pCtx->pSem == NULL) {
tTrace("%s cli conn %p handle except", pTransInst->label, pConn);
if (transMsg.ahandle == NULL) {
once = true;
continue;
}
cliAppCb(pConn, &transMsg);
//(pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
} else {
tTrace("%s cli conn(sync) %p handle except", pTransInst->label, pConn);
memcpy((char*)(pCtx->pRsp), (char*)(&transMsg), sizeof(transMsg));
tsem_post(pCtx->pSem);
}
int ret = cliAppCb(pConn, &transMsg, pMsg);
if (ret != 0) {
tTrace("try to send req to next node");
return;
}
destroyCmsg(pMsg);
tTrace("%s cli conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn);
......@@ -695,7 +691,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
}
} else {
STransConnCtx* pCtx = pMsg->ctx;
conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet));
if (conn != NULL) {
tTrace("%s cli conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn);
} else {
......@@ -719,10 +715,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
transCtxMerge(&conn->ctx, &pCtx->appCtx);
transQueuePush(&conn->cliMsgs, pMsg);
// tTrace("%s cli conn %p queue msg size %d", ((STrans*)pThrd->pTransInst)->label, conn, 2);
// return;
//}
// transDestroyBuffer(&conn->readBuf);
cliSend(conn);
} else {
conn = cliCreateConn(pThrd);
......@@ -730,8 +722,8 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
transQueuePush(&conn->cliMsgs, pMsg);
conn->hThrdIdx = pCtx->hThrdIdx;
conn->ip = strdup(pMsg->ctx->ip);
conn->port = pMsg->ctx->port;
conn->ip = strdup(EPSET_GET_INUSE_IP(&pCtx->epSet));
conn->port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
int ret = transSetConnOption((uv_tcp_t*)conn->stream);
if (ret) {
......@@ -743,10 +735,14 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = taosGetIpv4FromFqdn(conn->ip);
addr.sin_port = (uint16_t)htons((uint16_t)conn->port);
// uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr);
// handle error in callback if fail to connect
tTrace("%s cli conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port);
uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
tTrace("%s cli conn %p try to connect to %s:%d", pTransInst->label, conn, conn->ip, conn->port);
ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
if (ret != 0) {
tTrace("%s cli conn %p failed to connect to %s:%d, reason: %s", pTransInst->label, conn, conn->ip, conn->port,
uv_err_name(ret));
cliHandleExcept(conn);
return;
}
}
}
static void cliAsyncCb(uv_async_t* handle) {
......@@ -856,12 +852,10 @@ static void destroyThrdObj(SCliThrdObj* pThrd) {
}
static void transDestroyConnCtx(STransConnCtx* ctx) {
if (ctx != NULL) {
taosMemoryFree(ctx->ip);
}
//
taosMemoryFree(ctx);
}
//
void cliSendQuit(SCliThrdObj* thrd) {
// cli can stop gracefully
SCliMsg* msg = taosMemoryCalloc(1, sizeof(SCliMsg));
......@@ -881,17 +875,58 @@ int cliRBChoseIdx(STrans* pTransInst) {
}
return index % pTransInst->numOfThreads;
}
void cliAppCb(SCliConn* pConn, STransMsg* transMsg) {
int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
SCliThrdObj* pThrd = pConn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
if (transMsg->code == TSDB_CODE_RPC_REDIRECT && pTransInst->retry != NULL) {
SMEpSet emsg = {0};
tDeserializeSMEpSet(transMsg->pCont, transMsg->contLen, &emsg);
pTransInst->retry(pTransInst, transMsg, &(emsg.epSet));
if (pMsg == NULL || pMsg->ctx == NULL) {
tTrace("%s cli conn %p handle resp", pTransInst->label, pConn);
pTransInst->cfp(pTransInst->parent, pResp, NULL);
return 0;
}
STransConnCtx* pCtx = pMsg->ctx;
SEpSet* pEpSet = &pCtx->epSet;
/*
* upper layer handle retry if code equal TSDB_CODE_RPC_NETWORK_UNAVAIL
*/
tmsg_t msgType = pCtx->msgType;
if ((pTransInst->retry != NULL && (pTransInst->retry(pResp->code))) ||
((pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && msgType == TDMT_MND_CONNECT)) {
pCtx->retryCount += 1;
pMsg->st = taosGetTimestampUs();
if (msgType == TDMT_MND_CONNECT && pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
if (pCtx->retryCount < pEpSet->numOfEps) {
pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps;
cliHandleReq(pMsg, pThrd);
cliDestroy((uv_handle_t*)pConn->stream);
return -1;
}
} else if (pCtx->retryCount < TRANS_RETRY_COUNT_LIMIT) {
if (pResp->contLen == 0) {
pEpSet->inUse = (pEpSet->inUse++) % pEpSet->numOfEps;
} else {
SMEpSet emsg = {0};
tDeserializeSMEpSet(pResp->pCont, pResp->contLen, &emsg);
pCtx->epSet = emsg.epSet;
}
cliHandleReq(pMsg, pThrd);
// release pConn
addConnToPool(pThrd, pConn);
return -1;
}
}
if (pCtx->pSem != NULL) {
tTrace("%s cli conn %p handle resp", pTransInst->label, pConn);
memcpy((char*)pCtx->pRsp, (char*)pResp, sizeof(*pResp));
tsem_post(pCtx->pSem);
} else {
pTransInst->cfp(pTransInst->parent, transMsg, NULL);
tTrace("%s cli conn %p handle resp", pTransInst->label, pConn);
pTransInst->cfp(pTransInst->parent, pResp, pEpSet);
}
return 0;
}
void transCloseClient(void* arg) {
......@@ -934,18 +969,17 @@ void transReleaseCliHandle(void* handle) {
transSendAsync(thrd->asyncPool, &cmsg->q);
}
void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransCtx* ctx) {
void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
STrans* pTransInst = (STrans*)shandle;
int index = CONN_HOST_THREAD_INDEX((SCliConn*)pMsg->handle);
int index = CONN_HOST_THREAD_INDEX((SCliConn*)pReq->handle);
if (index == -1) {
index = cliRBChoseIdx(pTransInst);
}
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
pCtx->ahandle = pMsg->ahandle;
pCtx->msgType = pMsg->msgType;
pCtx->ip = strdup(ip);
pCtx->port = port;
pCtx->epSet = *pEpSet;
pCtx->ahandle = pReq->ahandle;
pCtx->msgType = pReq->msgType;
pCtx->hThrdIdx = index;
if (ctx != NULL) {
......@@ -955,17 +989,18 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p
SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
cliMsg->ctx = pCtx;
cliMsg->msg = *pMsg;
cliMsg->msg = *pReq;
cliMsg->st = taosGetTimestampUs();
cliMsg->type = Normal;
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
tDebug("send request at thread:%d %p, dst: %s:%d, app:%p", index, pMsg, ip, port, pMsg->ahandle);
tDebug("send request at thread:%d %p, dst: %s:%d, app:%p", index, pReq, EPSET_GET_INUSE_IP(&pCtx->epSet),
EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->ahandle);
ASSERT(transSendAsync(thrd->asyncPool, &(cliMsg->q)) == 0);
}
void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq, STransMsg* pRsp) {
void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
STrans* pTransInst = (STrans*)shandle;
int index = CONN_HOST_THREAD_INDEX(pReq->handle);
if (index == -1) {
......@@ -973,10 +1008,9 @@ void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq
}
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
pCtx->epSet = *pEpSet;
pCtx->ahandle = pReq->ahandle;
pCtx->msgType = pReq->msgType;
pCtx->ip = strdup(ip);
pCtx->port = port;
pCtx->hThrdIdx = index;
pCtx->pSem = taosMemoryCalloc(1, sizeof(tsem_t));
pCtx->pRsp = pRsp;
......@@ -989,6 +1023,9 @@ void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq
cliMsg->type = Normal;
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
tDebug("send request at thread:%d %p, dst: %s:%d, app:%p", index, pReq, EPSET_GET_INUSE_IP(&pCtx->epSet),
EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->ahandle);
transSendAsync(thrd->asyncPool, &(cliMsg->q));
tsem_t* pSem = pCtx->pSem;
tsem_wait(pSem);
......
......@@ -93,11 +93,6 @@ bool transDecompressMsg(char* msg, int32_t len, int32_t* flen) {
return false;
}
void transConnCtxDestroy(STransConnCtx* ctx) {
taosMemoryFree(ctx->ip);
taosMemoryFree(ctx);
}
void transFreeMsg(void* msg) {
if (msg == NULL) {
return;
......@@ -363,10 +358,4 @@ void transQueueDestroy(STransQueue* queue) {
transQueueClear(queue);
taosArrayDestroy(queue->q);
}
// int32_t transGetExHandle() {
// static
//}
// void transThreadOnce() {
// taosThreadOnce(&transModuleInit, );
//}
#endif
......@@ -802,7 +802,6 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
taosThreadOnce(&transModuleInit, uvInitExHandleMgt);
transSrvInst++;
// uvOpenExHandleMgt(10000);
for (int i = 0; i < srv->numOfThreads; i++) {
SWorkThrdObj* thrd = (SWorkThrdObj*)taosMemoryCalloc(1, sizeof(SWorkThrdObj));
......@@ -831,6 +830,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
} else {
// TODO: clear all other resource later
tError("failed to create worker-thread %d", i);
goto End;
}
}
if (false == addHandleToAcceptloop(srv)) {
......@@ -840,6 +840,8 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
if (err == 0) {
tDebug("success to create accept-thread");
} else {
tError("failed to create accept-thread");
goto End;
// clear all resource later
}
......@@ -1078,6 +1080,7 @@ void transRegisterMsg(const STransMsg* msg) {
transSendAsync(pThrd->asyncPool, &srvMsg->q);
uvReleaseExHandle(refId);
return;
_return1:
tTrace("server handle %p failed to send to register brokenlink", exh);
rpcFreeCont(msg->pCont);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册