From ac22fa74c164752a478fb3a16dc8902aa207fe6b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 26 Jul 2023 02:45:51 +0000 Subject: [PATCH] add timeout to auth --- src/dnode/src/dnodePeer.c | 95 ++++++++++++++------------- src/dnode/src/dnodeShell.c | 130 ++++++++++++++++++------------------- src/inc/dnode.h | 5 +- src/inc/trpc.h | 87 +++++++++++++------------ src/os/inc/osSemaphore.h | 54 ++++++++------- src/rpc/src/rpcMain.c | 27 +++++++- 6 files changed, 212 insertions(+), 186 deletions(-) diff --git a/src/dnode/src/dnodePeer.c b/src/dnode/src/dnodePeer.c index cc1b1c98aa..e7a854415e 100644 --- a/src/dnode/src/dnodePeer.c +++ b/src/dnode/src/dnodePeer.c @@ -13,58 +13,58 @@ * along with this program. If not, see . */ -/* this file is mainly responsible for the communication between DNODEs. Each +/* this file is mainly responsible for the communication between DNODEs. Each * dnode works as both server and client. Dnode may send status, grant, config - * messages to mnode, mnode may send create/alter/drop table/vnode messages + * messages to mnode, mnode may send create/alter/drop table/vnode messages * to dnode. All theses messages are handled from here */ #define _DEFAULT_SOURCE -#include "os.h" -#include "mnode.h" -#include "dnodeVMgmt.h" -#include "dnodeVWrite.h" -#include "dnodeMPeer.h" #include "dnodeMInfos.h" +#include "dnodeMPeer.h" #include "dnodeStep.h" +#include "dnodeVMgmt.h" +#include "dnodeVWrite.h" +#include "mnode.h" +#include "os.h" static void (*dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *); static void (*dnodeProcessRspMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg); -static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet); +static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet); static void *tsServerRpc = NULL; static void *tsClientRpc = NULL; int32_t dnodeInitServer() { dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeDispatchToVWriteQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeDispatchToVWriteQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVWriteQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVWriteQueue; - - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToVMgmtQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeDispatchToVMgmtQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = dnodeDispatchToVMgmtQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToVMgmtQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeDispatchToVWriteQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVWriteQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVWriteQueue; + + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToVMgmtQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeDispatchToVMgmtQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = dnodeDispatchToVMgmtQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToVMgmtQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeDispatchToVMgmtQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = dnodeDispatchToVMgmtQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeDispatchToVMgmtQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMPeerQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMPeerQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_AUTH] = dnodeDispatchToMPeerQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_GRANT] = dnodeDispatchToMPeerQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_STATUS] = dnodeDispatchToMPeerQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_AUTH] = dnodeDispatchToMPeerQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_GRANT] = dnodeDispatchToMPeerQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_STATUS] = dnodeDispatchToMPeerQueue; SRpcInit rpcInitial; memset(&rpcInitial, 0, sizeof(rpcInitial)); - rpcInitial.localPort = tsDnodeDnodePort; - rpcInitial.label = "DND-S"; + rpcInitial.localPort = tsDnodeDnodePort; + rpcInitial.label = "DND-S"; rpcInitial.numOfThreads = 1; - rpcInitial.cfp = dnodeProcessReqMsgFromDnode; - rpcInitial.sessions = TSDB_MAX_VNODES << 4; - rpcInitial.connType = TAOS_CONN_SERVER; - rpcInitial.idleTime = tsShellActivityTimer * 1000; + rpcInitial.cfp = dnodeProcessReqMsgFromDnode; + rpcInitial.sessions = TSDB_MAX_VNODES << 4; + rpcInitial.connType = TAOS_CONN_SERVER; + rpcInitial.idleTime = tsShellActivityTimer * 1000; tsServerRpc = rpcOpen(&rpcInitial); if (tsServerRpc == NULL) { @@ -85,11 +85,7 @@ void dnodeCleanupServer() { } static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { - SRpcMsg rspMsg = { - .handle = pMsg->handle, - .pCont = NULL, - .contLen = 0 - }; + SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0}; if (pMsg->pCont == NULL) return; if (pMsg->msgType == TSDB_MSG_TYPE_NETWORK_TEST) { @@ -122,18 +118,18 @@ static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { } int32_t dnodeInitClient() { - char secret[TSDB_KEY_LEN] = "secret"; + char secret[TSDB_KEY_LEN] = "secret"; SRpcInit rpcInitial; memset(&rpcInitial, 0, sizeof(rpcInitial)); - rpcInitial.label = "DND-C"; + rpcInitial.label = "DND-C"; rpcInitial.numOfThreads = 1; - rpcInitial.cfp = dnodeProcessRspFromDnode; - rpcInitial.sessions = TSDB_MAX_VNODES << 4; - rpcInitial.connType = TAOS_CONN_CLIENT; - rpcInitial.idleTime = tsShellActivityTimer * 1000; - rpcInitial.user = "t"; - rpcInitial.ckey = "key"; - rpcInitial.secret = secret; + rpcInitial.cfp = dnodeProcessRspFromDnode; + rpcInitial.sessions = TSDB_MAX_VNODES << 4; + rpcInitial.connType = TAOS_CONN_CLIENT; + rpcInitial.idleTime = tsShellActivityTimer * 1000; + rpcInitial.user = "t"; + rpcInitial.ckey = "key"; + rpcInitial.secret = secret; tsClientRpc = rpcOpen(&rpcInitial); if (tsClientRpc == NULL) { @@ -165,7 +161,7 @@ static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { dnodeUpdateEpSetForPeer(pEpSet); } - if (dnodeProcessRspMsgFp[pMsg->msgType]) { + if (dnodeProcessRspMsgFp[pMsg->msgType]) { (*dnodeProcessRspMsgFp[pMsg->msgType])(pMsg); } else { mnodeProcessPeerRsp(pMsg); @@ -174,13 +170,9 @@ static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { rpcFreeCont(pMsg->pCont); } -void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) { - dnodeProcessRspMsgFp[msgType] = fp; -} +void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) { dnodeProcessRspMsgFp[msgType] = fp; } -void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { - rpcSendRequest(tsClientRpc, epSet, rpcMsg, NULL); -} +void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { rpcSendRequest(tsClientRpc, epSet, rpcMsg, NULL); } void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { SRpcEpSet epSet = {0}; @@ -189,6 +181,13 @@ void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { assert(tsClientRpc != 0); rpcSendRecv(tsClientRpc, &epSet, rpcMsg, rpcRsp); } +void dnodeSendMsgToMnodeRecvWithTimeout(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { + SRpcEpSet epSet = {0}; + dnodeGetEpSetForPeer(&epSet); + + assert(tsClientRpc != 0); + rpcSendRecvWithTimeout(tsClientRpc, &epSet, rpcMsg, rpcRsp); +} void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet) { rpcSendRecv(tsClientRpc, epSet, rpcMsg, rpcRsp); diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index af1afc9766..668a14c8dc 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -14,70 +14,70 @@ */ #define _DEFAULT_SOURCE -#include "os.h" -#include "http.h" -#include "mnode.h" -#include "dnodeVRead.h" -#include "dnodeVWrite.h" +#include "dnodeShell.h" #include "dnodeMRead.h" #include "dnodeMWrite.h" -#include "dnodeShell.h" #include "dnodeStep.h" +#include "dnodeVRead.h" +#include "dnodeVWrite.h" +#include "http.h" +#include "mnode.h" +#include "os.h" -static void (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); +static void (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *); static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); -static void * tsShellRpc = NULL; -static int64_t tsQueryReqNum = 0; +static void *tsShellRpc = NULL; +static int64_t tsQueryReqNum = 0; static int64_t tsSubmitReqNum = 0; int32_t dnodeInitShell() { - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVReadQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVWriteQueue; // the following message shall be treated as mnode write dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE]= dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TP] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TP] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_FUNCTION] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SYNC_DB] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_TP] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SYNC_DB] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_TP] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_FUNCTION] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TP] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE]= dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TP] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TABLE] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM]= dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE]= dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_COMPACT_VNODE]= dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_COMPACT_VNODE] = dnodeDispatchToMWriteQueue; // the following message shall be treated as mnode query - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = dnodeDispatchToMReadQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = dnodeDispatchToMReadQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = dnodeDispatchToMReadQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = dnodeDispatchToMReadQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP]= dnodeDispatchToMReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = dnodeDispatchToMReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = dnodeDispatchToMReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = dnodeDispatchToMReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = dnodeDispatchToMReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP] = dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = dnodeDispatchToMReadQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = dnodeDispatchToMReadQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = dnodeDispatchToMReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = dnodeDispatchToMReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = dnodeDispatchToMReadQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_PROBE_CONN] = dnodeResponseProbeMsg; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_PROBE_CONN] = dnodeResponseProbeMsg; int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0); if (numOfThreads < 1) { @@ -86,14 +86,14 @@ int32_t dnodeInitShell() { SRpcInit rpcInitial; memset(&rpcInitial, 0, sizeof(rpcInitial)); - rpcInitial.localPort = tsDnodeShellPort; - rpcInitial.label = "SHELL"; + rpcInitial.localPort = tsDnodeShellPort; + rpcInitial.label = "SHELL"; rpcInitial.numOfThreads = numOfThreads; - rpcInitial.cfp = dnodeProcessMsgFromShell; - rpcInitial.sessions = tsMaxShellConns; - rpcInitial.connType = TAOS_CONN_SERVER; - rpcInitial.idleTime = tsShellActivityTimer * 1000; - rpcInitial.afp = dnodeRetrieveUserAuthInfo; + rpcInitial.cfp = dnodeProcessMsgFromShell; + rpcInitial.sessions = tsMaxShellConns; + rpcInitial.connType = TAOS_CONN_SERVER; + rpcInitial.idleTime = tsShellActivityTimer * 1000; + rpcInitial.afp = dnodeRetrieveUserAuthInfo; tsShellRpc = rpcOpen(&rpcInitial); if (tsShellRpc == NULL) { @@ -113,11 +113,7 @@ void dnodeCleanupShell() { } static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { - SRpcMsg rpcMsg = { - .handle = pMsg->handle, - .pCont = NULL, - .contLen = 0 - }; + SRpcMsg rpcMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0}; if (pMsg->pCont == NULL) return; @@ -148,9 +144,10 @@ static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { atomic_fetch_add_64(&tsQueryReqNum, 1); } else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) { atomic_fetch_add_64(&tsSubmitReqNum, 1); - } else {} + } else { + } - if ( dnodeProcessShellMsgFp[pMsg->msgType] ) { + if (dnodeProcessShellMsgFp[pMsg->msgType]) { (*dnodeProcessShellMsgFp[pMsg->msgType])(pMsg); } else { dError("RPC %p, shell msg:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]); @@ -191,10 +188,10 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char dDebug("user:%s, send auth msg to mnodes", user); SRpcMsg rpcRsp = {0}; - dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp); + dnodeSendMsgToMnodeRecvWithTimeout(&rpcMsg, &rpcRsp); if (rpcRsp.code != 0) { - dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code)); + dError("user:%s, auth msg received from mnodes, error:%s, may be timeout", user, tstrerror(rpcRsp.code)); } else { SAuthRsp *pRsp = rpcRsp.pCont; dDebug("user:%s, auth msg received from mnodes", user); @@ -211,7 +208,7 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid) { dDebug("vgId:%d, tid:%d send config table msg to mnode", vgId, tid); - int32_t contLen = sizeof(SConfigTableMsg); + int32_t contLen = sizeof(SConfigTableMsg); SConfigTableMsg *pMsg = rpcMallocCont(contLen); pMsg->dnodeId = htonl(dnodeGetDnodeId()); @@ -236,11 +233,12 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid) { // delete this after debug finished SMDCreateTableMsg *pTable = rpcRsp.pCont; - int16_t numOfColumns = htons(pTable->numOfColumns); - int16_t numOfTags = htons(pTable->numOfTags); - int32_t tableId = htonl(pTable->tid); - uint64_t uid = htobe64(pTable->uid); - dInfo("table:%s, numOfColumns:%d numOfTags:%d tid:%d uid:%" PRIu64, pTable->tableFname, numOfColumns, numOfTags, tableId, uid); + int16_t numOfColumns = htons(pTable->numOfColumns); + int16_t numOfTags = htons(pTable->numOfTags); + int32_t tableId = htonl(pTable->tid); + uint64_t uid = htobe64(pTable->uid); + dInfo("table:%s, numOfColumns:%d numOfTags:%d tid:%d uid:%" PRIu64, pTable->tableFname, numOfColumns, numOfTags, + tableId, uid); return rpcRsp.pCont; } @@ -250,9 +248,9 @@ SDnodeStatisInfo dnodeGetStatisInfo() { SDnodeStatisInfo info = {0}; if (dnodeGetRunStatus() == TSDB_RUN_STATUS_RUNING) { #ifdef HTTP_EMBEDDED - info.httpReqNum = httpGetReqCount(); + info.httpReqNum = httpGetReqCount(); #endif - info.queryReqNum = atomic_exchange_64(&tsQueryReqNum, 0); + info.queryReqNum = atomic_exchange_64(&tsQueryReqNum, 0); info.submitReqNum = atomic_exchange_64(&tsSubmitReqNum, 0); } @@ -262,7 +260,7 @@ SDnodeStatisInfo dnodeGetStatisInfo() { int32_t dnodeGetHttpStatusInfo(int32_t idx) { int32_t httpStatus = 0; #ifdef HTTP_EMBEDDED - httpStatus = httpGetStatusCodeCount(idx); + httpStatus = httpGetStatusCodeCount(idx); #endif return httpStatus; } diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 117b8a5657..dc8aaba5d9 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -20,8 +20,8 @@ extern "C" { #endif -#include "trpc.h" #include "taosmsg.h" +#include "trpc.h" #define MAX_HTTP_STATUS_CODE_NUM 63 typedef struct { @@ -50,6 +50,7 @@ int32_t dnodeStartMnode(SMInfos *pMinfos); void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg); void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp); +void dnodeSendMsgToMnodeRecvWithTimeout(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp); void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet); void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid); @@ -71,7 +72,7 @@ void dnodeSendRpcMWriteRsp(void *pMsg, int32_t code); void dnodeReprocessMWriteMsg(void *pMsg); void dnodeDelayReprocessMWriteMsg(void *pMsg); -void dnodeSendStatusMsgToMnode(); +void dnodeSendStatusMsgToMnode(); typedef struct { char *name; diff --git a/src/inc/trpc.h b/src/inc/trpc.h index fe061eb4f2..c74f7e5163 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -23,28 +23,28 @@ extern "C" { #include #include "taosdef.h" -#define TAOS_CONN_SERVER 0 -#define TAOS_CONN_CLIENT 1 +#define TAOS_CONN_SERVER 0 +#define TAOS_CONN_CLIENT 1 extern int tsRpcHeadSize; typedef struct SRpcEpSet { - int8_t inUse; - int8_t numOfEps; - uint16_t port[TSDB_MAX_REPLICA]; - char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN]; + int8_t inUse; + int8_t numOfEps; + uint16_t port[TSDB_MAX_REPLICA]; + char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN]; } SRpcEpSet; typedef struct SRpcCorEpSet { - int32_t version; - SRpcEpSet epSet; + int32_t version; + SRpcEpSet epSet; } SRpcCorEpSet; typedef struct SRpcConnInfo { - uint32_t clientIp; - uint16_t clientPort; - uint32_t serverIp; - char user[TSDB_USER_LEN]; + uint32_t clientIp; + uint16_t clientPort; + uint32_t serverIp; + char user[TSDB_USER_LEN]; } SRpcConnInfo; typedef struct SRpcMsg { @@ -57,46 +57,47 @@ typedef struct SRpcMsg { } SRpcMsg; typedef struct SRpcInit { - uint16_t localPort; // local port - char *label; // for debug purpose - int numOfThreads; // number of threads to handle connections - int sessions; // number of sessions allowed - int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS - int idleTime; // milliseconds, 0 means idle timer is disabled + uint16_t localPort; // local port + char *label; // for debug purpose + int numOfThreads; // number of threads to handle connections + int sessions; // number of sessions allowed + int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS + int idleTime; // milliseconds, 0 means idle timer is disabled // the following is for client app ecurity only - char *user; // user name - char spi; // security parameter index - char encrypt; // encrypt algorithm - char *secret; // key for authentication - char *ckey; // ciphering key + char *user; // user name + char spi; // security parameter index + char encrypt; // encrypt algorithm + char *secret; // key for authentication + char *ckey; // ciphering key // call back to process incoming msg, code shall be ignored by server app - void (*cfp)(SRpcMsg *, SRpcEpSet *); + void (*cfp)(SRpcMsg *, SRpcEpSet *); - // call back to retrieve the client auth info, for server app only - int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey); + // call back to retrieve the client auth info, for server app only + int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey); } SRpcInit; int32_t rpcInit(); -void rpcCleanup(); -void *rpcOpen(const SRpcInit *pRpc); -void rpcClose(void *); -void *rpcMallocCont(int contLen); -void rpcFreeCont(void *pCont); -void *rpcReallocCont(void *ptr, int contLen); -TBOOL rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); -void rpcSendResponse(const SRpcMsg *pMsg); -void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet); -int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); -void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); -int rpcReportProgress(void *pConn, char *pCont, int contLen); -void rpcCancelRequest(int64_t rid); -int32_t rpcUnusedSession(void * rpcInfo, bool bLock); -// send rpc Refid connection probe alive message -bool rpcSendProbe(int64_t rpcRid, void* pPrevContext, bool *pReqOver); +void rpcCleanup(); +void *rpcOpen(const SRpcInit *pRpc); +void rpcClose(void *); +void *rpcMallocCont(int contLen); +void rpcFreeCont(void *pCont); +void *rpcReallocCont(void *ptr, int contLen); +TBOOL rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); +void rpcSendResponse(const SRpcMsg *pMsg); +void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet); +int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); +void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); +void rpcSendRecvWithTimeout(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); +int rpcReportProgress(void *pConn, char *pCont, int contLen); +void rpcCancelRequest(int64_t rid); +int32_t rpcUnusedSession(void *rpcInfo, bool bLock); +// send rpc Refid connection probe alive message +bool rpcSendProbe(int64_t rpcRid, void *pPrevContext, bool *pReqOver); // after sql request send , save conn info -bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext); +bool rpcSaveSendInfo(int64_t rpcRid, void **ppContext); #ifdef __cplusplus } diff --git a/src/os/inc/osSemaphore.h b/src/os/inc/osSemaphore.h index 10d14700e0..6f29318d22 100644 --- a/src/os/inc/osSemaphore.h +++ b/src/os/inc/osSemaphore.h @@ -20,42 +20,46 @@ extern "C" { #endif -#if defined (_TD_DARWIN_64) - typedef struct tsem_s *tsem_t; - int tsem_init(tsem_t *sem, int pshared, unsigned int value); - int tsem_wait(tsem_t *sem); - int tsem_post(tsem_t *sem); - int tsem_destroy(tsem_t *sem); +#include +#if defined(_TD_DARWIN_64) +typedef struct tsem_s *tsem_t; +int tsem_init(tsem_t *sem, int pshared, unsigned int value); +int tsem_wait(tsem_t *sem); +int tsem_post(tsem_t *sem); +int tsem_timewait(tsem_t *sim, int64_t milis); +int tsem_destroy(tsem_t *sem); + #else - #define tsem_t sem_t - #define tsem_init sem_init - int tsem_wait(tsem_t* sem); - #define tsem_post sem_post - #define tsem_destroy sem_destroy +#define tsem_t sem_t +#define tsem_init sem_init +int tsem_wait(tsem_t *sem); +int tsem_timewait(tsem_t *sim, int64_t milis); +#define tsem_post sem_post +#define tsem_destroy sem_destroy #endif -#if defined (_TD_DARWIN_64) - #define pthread_rwlock_t pthread_mutex_t - #define pthread_rwlock_init(lock, NULL) pthread_mutex_init(lock, NULL) - #define pthread_rwlock_destroy(lock) pthread_mutex_destroy(lock) - #define pthread_rwlock_wrlock(lock) pthread_mutex_lock(lock) - #define pthread_rwlock_rdlock(lock) pthread_mutex_lock(lock) - #define pthread_rwlock_unlock(lock) pthread_mutex_unlock(lock) +#if defined(_TD_DARWIN_64) +#define pthread_rwlock_t pthread_mutex_t +#define pthread_rwlock_init(lock, NULL) pthread_mutex_init(lock, NULL) +#define pthread_rwlock_destroy(lock) pthread_mutex_destroy(lock) +#define pthread_rwlock_wrlock(lock) pthread_mutex_lock(lock) +#define pthread_rwlock_rdlock(lock) pthread_mutex_lock(lock) +#define pthread_rwlock_unlock(lock) pthread_mutex_unlock(lock) - #define pthread_spinlock_t pthread_mutex_t - #define pthread_spin_init(lock, NULL) pthread_mutex_init(lock, NULL) - #define pthread_spin_destroy(lock) pthread_mutex_destroy(lock) - #define pthread_spin_lock(lock) pthread_mutex_lock(lock) - #define pthread_spin_unlock(lock) pthread_mutex_unlock(lock) +#define pthread_spinlock_t pthread_mutex_t +#define pthread_spin_init(lock, NULL) pthread_mutex_init(lock, NULL) +#define pthread_spin_destroy(lock) pthread_mutex_destroy(lock) +#define pthread_spin_lock(lock) pthread_mutex_lock(lock) +#define pthread_spin_unlock(lock) pthread_mutex_unlock(lock) #endif bool taosCheckPthreadValid(pthread_t thread); int64_t taosGetSelfPthreadId(); int64_t taosGetPthreadId(pthread_t thread); -void taosResetPthread(pthread_t* thread); +void taosResetPthread(pthread_t *thread); bool taosComparePthread(pthread_t first, pthread_t second); int32_t taosGetPId(); -int32_t taosGetCurrentAPPName(char* name, int32_t* len); +int32_t taosGetCurrentAPPName(char *name, int32_t *len); #ifdef __cplusplus } diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index a2d6859a1a..55a94e3746 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -561,6 +561,29 @@ void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) return; } +void rpcSendRecvWithTimeout(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + SRpcReqContext *pContext; + pContext = (SRpcReqContext *)((char *)pMsg->pCont - sizeof(SRpcHead) - sizeof(SRpcReqContext)); + + memset(pRsp, 0, sizeof(SRpcMsg)); + + tsem_t sem; + tsem_init(&sem, 0, 0); + pContext->pSem = &sem; + pContext->pRsp = pRsp; + pContext->pSet = pEpSet; + + int64_t rid = 0; + rpcSendRequest(shandle, pEpSet, pMsg, &rid); + + tsem_timewait(&sem, 3 * 1000); + rpcCancelRequest(rid); + tsem_destroy(&sem); + + pRsp->code = -1; + return; +} + // this API is used by server app to keep an APP context in case connection is broken int rpcReportProgress(void *handle, char *pCont, int contLen) { SRpcConn *pConn = (SRpcConn *)handle; @@ -1469,7 +1492,7 @@ static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { if (pConn->connType != RPC_CONN_TCPC) { pContext->code = terrno; return BOOL_ASYNC; - } + } // try next ip again pContext->code = terrno; // in rpcProcessConnError if numOfTry over limit, could call rpcNotifyClient to stop query @@ -1503,7 +1526,7 @@ static bool rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { if (writtenLen != msgLen) { tError("%s, failed to send, msgLen:%d written:%d, reason:%s", pConn->info, msgLen, writtenLen, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); ret = false; } -- GitLab