提交 7c57b03d 编写于 作者: dengyihao's avatar dengyihao

handle rpc retry

上级 1b339c9c
...@@ -86,7 +86,7 @@ void closeTransporter(STscObj *pTscObj) { ...@@ -86,7 +86,7 @@ void closeTransporter(STscObj *pTscObj) {
static bool clientRpcRfp(int32_t code) { static bool clientRpcRfp(int32_t code) {
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED || if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
code == TSDB_CODE_SYN_NOT_LEADER) { code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) {
return true; return true;
} else { } else {
return false; return false;
......
...@@ -70,9 +70,9 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { ...@@ -70,9 +70,9 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
} }
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
SDnodeTrans *pTrans = &pDnode->trans; SDnodeTrans * pTrans = &pDnode->trans;
int32_t code = -1; int32_t code = -1;
SRpcMsg *pMsg = NULL; SRpcMsg * pMsg = NULL;
SMgmtWrapper *pWrapper = NULL; SMgmtWrapper *pWrapper = NULL;
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)]; SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
...@@ -194,11 +194,11 @@ int32_t dmInitMsgHandle(SDnode *pDnode) { ...@@ -194,11 +194,11 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) { for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
SArray *pArray = (*pWrapper->func.getHandlesFp)(); SArray * pArray = (*pWrapper->func.getHandlesFp)();
if (pArray == NULL) return -1; if (pArray == NULL) return -1;
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
SMgmtHandle *pMgmt = taosArrayGet(pArray, i); SMgmtHandle * pMgmt = taosArrayGet(pArray, i);
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)]; SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
if (pMgmt->needCheckVgId) { if (pMgmt->needCheckVgId) {
pHandle->needCheckVgId = pMgmt->needCheckVgId; pHandle->needCheckVgId = pMgmt->needCheckVgId;
...@@ -248,7 +248,14 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { ...@@ -248,7 +248,14 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) {
} }
} }
static bool rpcRfp(int32_t code) { return code == TSDB_CODE_RPC_REDIRECT; } static bool rpcRfp(int32_t code) {
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) {
return true;
} else {
return false;
}
}
int32_t dmInitClient(SDnode *pDnode) { int32_t dmInitClient(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans; SDnodeTrans *pTrans = &pDnode->trans;
......
...@@ -12,6 +12,8 @@ ...@@ -12,6 +12,8 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
// clang-format off
#include "uv.h" #include "uv.h"
#include "os.h" #include "os.h"
#include "fnLog.h" #include "fnLog.h"
...@@ -25,6 +27,7 @@ ...@@ -25,6 +27,7 @@
#include "tglobal.h" #include "tglobal.h"
#include "tmsg.h" #include "tmsg.h"
#include "trpc.h" #include "trpc.h"
// clang-foramt on
typedef struct SUdfdContext { typedef struct SUdfdContext {
uv_loop_t * loop; uv_loop_t * loop;
...@@ -103,12 +106,12 @@ typedef struct SUdfdRpcSendRecvInfo { ...@@ -103,12 +106,12 @@ typedef struct SUdfdRpcSendRecvInfo {
uv_sem_t resultSem; uv_sem_t resultSem;
} SUdfdRpcSendRecvInfo; } SUdfdRpcSendRecvInfo;
static void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); static void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf); static int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf);
static int32_t udfdConnectToMnode(); static int32_t udfdConnectToMnode();
static int32_t udfdLoadUdf(char *udfName, SUdf *udf); static int32_t udfdLoadUdf(char *udfName, SUdf *udf);
static bool udfdRpcRfp(int32_t code); static bool udfdRpcRfp(int32_t code);
static int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet); static int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
static int32_t udfdOpenClientRpc(); static int32_t udfdOpenClientRpc();
static int32_t udfdCloseClientRpc(); static int32_t udfdCloseClientRpc();
...@@ -126,19 +129,19 @@ static void udfdUvHandleError(SUdfdUvConn *conn) { uv_close((uv_handle_t *)conn- ...@@ -126,19 +129,19 @@ static void udfdUvHandleError(SUdfdUvConn *conn) { uv_close((uv_handle_t *)conn-
static void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf); static void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf);
static void udfdOnNewConnection(uv_stream_t *server, int status); static void udfdOnNewConnection(uv_stream_t *server, int status);
static void udfdIntrSignalHandler(uv_signal_t *handle, int signum); static void udfdIntrSignalHandler(uv_signal_t *handle, int signum);
static int32_t removeListeningPipe(); static int32_t removeListeningPipe();
static void udfdPrintVersion(); static void udfdPrintVersion();
static int32_t udfdParseArgs(int32_t argc, char *argv[]); static int32_t udfdParseArgs(int32_t argc, char *argv[]);
static int32_t udfdInitLog(); static int32_t udfdInitLog();
static void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf); static void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf);
static void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf); static void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf);
static int32_t udfdUvInit(); static int32_t udfdUvInit();
static void udfdCloseWalkCb(uv_handle_t *handle, void *arg); static void udfdCloseWalkCb(uv_handle_t *handle, void *arg);
static int32_t udfdRun(); static int32_t udfdRun();
static void udfdConnectMnodeThreadFunc(void* args); static void udfdConnectMnodeThreadFunc(void *args);
void udfdProcessRequest(uv_work_t *req) { void udfdProcessRequest(uv_work_t *req) {
SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data); SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data);
...@@ -401,11 +404,11 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -401,11 +404,11 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
udf->bufSize = pFuncInfo->bufSize; udf->bufSize = pFuncInfo->bufSize;
char path[PATH_MAX] = {0}; char path[PATH_MAX] = {0};
#ifdef WINDOWS #ifdef WINDOWS
snprintf(path, sizeof(path), "%s%s.dll", TD_TMP_DIR_PATH, pFuncInfo->name); snprintf(path, sizeof(path), "%s%s.dll", TD_TMP_DIR_PATH, pFuncInfo->name);
#else #else
snprintf(path, sizeof(path), "%s/lib%s.so", TD_TMP_DIR_PATH, pFuncInfo->name); snprintf(path, sizeof(path), "%s/lib%s.so", TD_TMP_DIR_PATH, pFuncInfo->name);
#endif #endif
TdFilePtr file = TdFilePtr file =
taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL); taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
if (file == NULL) { if (file == NULL) {
...@@ -544,7 +547,8 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) { ...@@ -544,7 +547,8 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
return 0; return 0;
} }
static bool udfdRpcRfp(int32_t code) { static bool udfdRpcRfp(int32_t code) {
if (code == TSDB_CODE_RPC_REDIRECT) { if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) {
return true; return true;
} else { } else {
return false; return false;
...@@ -652,8 +656,7 @@ void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) { ...@@ -652,8 +656,7 @@ void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
buf->base = ctx->inputBuf; buf->base = ctx->inputBuf;
buf->len = ctx->inputCap; buf->len = ctx->inputCap;
} else { } else {
fnError("udfd can not allocate enough memory") fnError("udfd can not allocate enough memory") buf->base = NULL;
buf->base = NULL;
buf->len = 0; buf->len = 0;
} }
} else { } else {
...@@ -664,8 +667,7 @@ void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) { ...@@ -664,8 +667,7 @@ void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
buf->base = ctx->inputBuf + ctx->inputLen; buf->base = ctx->inputBuf + ctx->inputLen;
buf->len = ctx->inputCap - ctx->inputLen; buf->len = ctx->inputCap - ctx->inputLen;
} else { } else {
fnError("udfd can not allocate enough memory") fnError("udfd can not allocate enough memory") buf->base = NULL;
buf->base = NULL;
buf->len = 0; buf->len = 0;
} }
} }
...@@ -881,7 +883,7 @@ static int32_t udfdRun() { ...@@ -881,7 +883,7 @@ static int32_t udfdRun() {
return 0; return 0;
} }
void udfdConnectMnodeThreadFunc(void* args) { void udfdConnectMnodeThreadFunc(void *args) {
int32_t retryMnodeTimes = 0; int32_t retryMnodeTimes = 0;
int32_t code = 0; int32_t code = 0;
while (retryMnodeTimes++ <= TSDB_MAX_REPLICA) { while (retryMnodeTimes++ <= TSDB_MAX_REPLICA) {
...@@ -939,7 +941,7 @@ int main(int argc, char *argv[]) { ...@@ -939,7 +941,7 @@ int main(int argc, char *argv[]) {
uv_thread_create(&mnodeConnectThread, udfdConnectMnodeThreadFunc, NULL); uv_thread_create(&mnodeConnectThread, udfdConnectMnodeThreadFunc, NULL);
udfdRun(); udfdRun();
removeListeningPipe(); removeListeningPipe();
udfdCloseClientRpc(); udfdCloseClientRpc();
......
...@@ -758,13 +758,16 @@ static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) { ...@@ -758,13 +758,16 @@ static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) {
destroyCmsg(pMsg); destroyCmsg(pMsg);
} }
SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd) { SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) {
SCliConn* conn = NULL; SCliConn* conn = NULL;
int64_t refId = (int64_t)(pMsg->msg.info.handle); int64_t refId = (int64_t)(pMsg->msg.info.handle);
if (refId != 0) { if (refId != 0) {
SExHandle* exh = transAcquireExHandle(refMgt, refId); SExHandle* exh = transAcquireExHandle(refMgt, refId);
if (exh == NULL) { if (exh == NULL) {
assert(0); *ignore = true;
destroyCmsg(pMsg);
return NULL;
// assert(0);
} else { } else {
conn = exh->handle; conn = exh->handle;
transReleaseExHandle(refMgt, refId); transReleaseExHandle(refMgt, refId);
...@@ -799,7 +802,11 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { ...@@ -799,7 +802,11 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr); cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr);
// transPrintEpSet(&pCtx->epSet); // transPrintEpSet(&pCtx->epSet);
SCliConn* conn = cliGetConn(pMsg, pThrd); bool ignore = false;
SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore);
if (ignore == true) {
return;
}
if (conn != NULL) { if (conn != NULL) {
transCtxMerge(&conn->ctx, &pCtx->appCtx); transCtxMerge(&conn->ctx, &pCtx->appCtx);
transQueuePush(&conn->cliMsgs, pMsg); transQueuePush(&conn->cliMsgs, pMsg);
......
...@@ -422,7 +422,8 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { ...@@ -422,7 +422,8 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
transUnrefSrvHandle(pConn); transUnrefSrvHandle(pConn);
} else { } else {
pHead->msgType = pMsg->msgType; pHead->msgType = pMsg->msgType;
if (pHead->msgType == 0) pHead->msgType = pConn->inType + 1; if (pHead->msgType == 0 && transMsgLenFromCont(pMsg->contLen) == sizeof(STransMsgHead))
pHead->msgType = pConn->inType + 1;
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册