diff --git a/src/dnode/src/dnodePeer.c b/src/dnode/src/dnodePeer.c index cc1b1c98aa6c97032b4ce6aa198088353c48374f..e7a854415e8dae78acbefb6b1a186859b02a80a3 100644 --- a/src/dnode/src/dnodePeer.c +++ b/src/dnode/src/dnodePeer.c @@ -13,58 +13,58 @@ * along with this program. If not, see . */ -/* this file is mainly responsible for the communication between DNODEs. Each +/* this file is mainly responsible for the communication between DNODEs. Each * dnode works as both server and client. Dnode may send status, grant, config - * messages to mnode, mnode may send create/alter/drop table/vnode messages + * messages to mnode, mnode may send create/alter/drop table/vnode messages * to dnode. All theses messages are handled from here */ #define _DEFAULT_SOURCE -#include "os.h" -#include "mnode.h" -#include "dnodeVMgmt.h" -#include "dnodeVWrite.h" -#include "dnodeMPeer.h" #include "dnodeMInfos.h" +#include "dnodeMPeer.h" #include "dnodeStep.h" +#include "dnodeVMgmt.h" +#include "dnodeVWrite.h" +#include "mnode.h" +#include "os.h" static void (*dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *); static void (*dnodeProcessRspMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg); -static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet); +static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet); static void *tsServerRpc = NULL; static void *tsClientRpc = NULL; int32_t dnodeInitServer() { dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeDispatchToVWriteQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeDispatchToVWriteQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVWriteQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVWriteQueue; - - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToVMgmtQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeDispatchToVMgmtQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = dnodeDispatchToVMgmtQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToVMgmtQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeDispatchToVWriteQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVWriteQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVWriteQueue; + + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToVMgmtQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeDispatchToVMgmtQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = dnodeDispatchToVMgmtQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToVMgmtQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeDispatchToVMgmtQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = dnodeDispatchToVMgmtQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeDispatchToVMgmtQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMPeerQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMPeerQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_AUTH] = dnodeDispatchToMPeerQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_GRANT] = dnodeDispatchToMPeerQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_STATUS] = dnodeDispatchToMPeerQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_AUTH] = dnodeDispatchToMPeerQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_GRANT] = dnodeDispatchToMPeerQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_STATUS] = dnodeDispatchToMPeerQueue; SRpcInit rpcInitial; memset(&rpcInitial, 0, sizeof(rpcInitial)); - rpcInitial.localPort = tsDnodeDnodePort; - rpcInitial.label = "DND-S"; + rpcInitial.localPort = tsDnodeDnodePort; + rpcInitial.label = "DND-S"; rpcInitial.numOfThreads = 1; - rpcInitial.cfp = dnodeProcessReqMsgFromDnode; - rpcInitial.sessions = TSDB_MAX_VNODES << 4; - rpcInitial.connType = TAOS_CONN_SERVER; - rpcInitial.idleTime = tsShellActivityTimer * 1000; + rpcInitial.cfp = dnodeProcessReqMsgFromDnode; + rpcInitial.sessions = TSDB_MAX_VNODES << 4; + rpcInitial.connType = TAOS_CONN_SERVER; + rpcInitial.idleTime = tsShellActivityTimer * 1000; tsServerRpc = rpcOpen(&rpcInitial); if (tsServerRpc == NULL) { @@ -85,11 +85,7 @@ void dnodeCleanupServer() { } static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { - SRpcMsg rspMsg = { - .handle = pMsg->handle, - .pCont = NULL, - .contLen = 0 - }; + SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0}; if (pMsg->pCont == NULL) return; if (pMsg->msgType == TSDB_MSG_TYPE_NETWORK_TEST) { @@ -122,18 +118,18 @@ static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { } int32_t dnodeInitClient() { - char secret[TSDB_KEY_LEN] = "secret"; + char secret[TSDB_KEY_LEN] = "secret"; SRpcInit rpcInitial; memset(&rpcInitial, 0, sizeof(rpcInitial)); - rpcInitial.label = "DND-C"; + rpcInitial.label = "DND-C"; rpcInitial.numOfThreads = 1; - rpcInitial.cfp = dnodeProcessRspFromDnode; - rpcInitial.sessions = TSDB_MAX_VNODES << 4; - rpcInitial.connType = TAOS_CONN_CLIENT; - rpcInitial.idleTime = tsShellActivityTimer * 1000; - rpcInitial.user = "t"; - rpcInitial.ckey = "key"; - rpcInitial.secret = secret; + rpcInitial.cfp = dnodeProcessRspFromDnode; + rpcInitial.sessions = TSDB_MAX_VNODES << 4; + rpcInitial.connType = TAOS_CONN_CLIENT; + rpcInitial.idleTime = tsShellActivityTimer * 1000; + rpcInitial.user = "t"; + rpcInitial.ckey = "key"; + rpcInitial.secret = secret; tsClientRpc = rpcOpen(&rpcInitial); if (tsClientRpc == NULL) { @@ -165,7 +161,7 @@ static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { dnodeUpdateEpSetForPeer(pEpSet); } - if (dnodeProcessRspMsgFp[pMsg->msgType]) { + if (dnodeProcessRspMsgFp[pMsg->msgType]) { (*dnodeProcessRspMsgFp[pMsg->msgType])(pMsg); } else { mnodeProcessPeerRsp(pMsg); @@ -174,13 +170,9 @@ static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { rpcFreeCont(pMsg->pCont); } -void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) { - dnodeProcessRspMsgFp[msgType] = fp; -} +void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) { dnodeProcessRspMsgFp[msgType] = fp; } -void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { - rpcSendRequest(tsClientRpc, epSet, rpcMsg, NULL); -} +void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { rpcSendRequest(tsClientRpc, epSet, rpcMsg, NULL); } void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { SRpcEpSet epSet = {0}; @@ -189,6 +181,13 @@ void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { assert(tsClientRpc != 0); rpcSendRecv(tsClientRpc, &epSet, rpcMsg, rpcRsp); } +void dnodeSendMsgToMnodeRecvWithTimeout(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { + SRpcEpSet epSet = {0}; + dnodeGetEpSetForPeer(&epSet); + + assert(tsClientRpc != 0); + rpcSendRecvWithTimeout(tsClientRpc, &epSet, rpcMsg, rpcRsp); +} void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet) { rpcSendRecv(tsClientRpc, epSet, rpcMsg, rpcRsp); diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index af1afc9766af70b180b06cb56bb35d90de40d2eb..668a14c8dcf2af308ff63d66f589d078bf1d3500 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -14,70 +14,70 @@ */ #define _DEFAULT_SOURCE -#include "os.h" -#include "http.h" -#include "mnode.h" -#include "dnodeVRead.h" -#include "dnodeVWrite.h" +#include "dnodeShell.h" #include "dnodeMRead.h" #include "dnodeMWrite.h" -#include "dnodeShell.h" #include "dnodeStep.h" +#include "dnodeVRead.h" +#include "dnodeVWrite.h" +#include "http.h" +#include "mnode.h" +#include "os.h" -static void (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); +static void (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *); static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); -static void * tsShellRpc = NULL; -static int64_t tsQueryReqNum = 0; +static void *tsShellRpc = NULL; +static int64_t tsQueryReqNum = 0; static int64_t tsSubmitReqNum = 0; int32_t dnodeInitShell() { - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVReadQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVWriteQueue; // the following message shall be treated as mnode write dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE]= dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TP] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TP] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_FUNCTION] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SYNC_DB] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_TP] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SYNC_DB] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_TP] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_FUNCTION] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TP] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE]= dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TP] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TABLE] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM]= dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE]= dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_COMPACT_VNODE]= dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_COMPACT_VNODE] = dnodeDispatchToMWriteQueue; // the following message shall be treated as mnode query - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = dnodeDispatchToMReadQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = dnodeDispatchToMReadQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = dnodeDispatchToMReadQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = dnodeDispatchToMReadQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP]= dnodeDispatchToMReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = dnodeDispatchToMReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = dnodeDispatchToMReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = dnodeDispatchToMReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = dnodeDispatchToMReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP] = dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = dnodeDispatchToMReadQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = dnodeDispatchToMReadQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = dnodeDispatchToMReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = dnodeDispatchToMReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = dnodeDispatchToMReadQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_PROBE_CONN] = dnodeResponseProbeMsg; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_PROBE_CONN] = dnodeResponseProbeMsg; int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0); if (numOfThreads < 1) { @@ -86,14 +86,14 @@ int32_t dnodeInitShell() { SRpcInit rpcInitial; memset(&rpcInitial, 0, sizeof(rpcInitial)); - rpcInitial.localPort = tsDnodeShellPort; - rpcInitial.label = "SHELL"; + rpcInitial.localPort = tsDnodeShellPort; + rpcInitial.label = "SHELL"; rpcInitial.numOfThreads = numOfThreads; - rpcInitial.cfp = dnodeProcessMsgFromShell; - rpcInitial.sessions = tsMaxShellConns; - rpcInitial.connType = TAOS_CONN_SERVER; - rpcInitial.idleTime = tsShellActivityTimer * 1000; - rpcInitial.afp = dnodeRetrieveUserAuthInfo; + rpcInitial.cfp = dnodeProcessMsgFromShell; + rpcInitial.sessions = tsMaxShellConns; + rpcInitial.connType = TAOS_CONN_SERVER; + rpcInitial.idleTime = tsShellActivityTimer * 1000; + rpcInitial.afp = dnodeRetrieveUserAuthInfo; tsShellRpc = rpcOpen(&rpcInitial); if (tsShellRpc == NULL) { @@ -113,11 +113,7 @@ void dnodeCleanupShell() { } static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { - SRpcMsg rpcMsg = { - .handle = pMsg->handle, - .pCont = NULL, - .contLen = 0 - }; + SRpcMsg rpcMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0}; if (pMsg->pCont == NULL) return; @@ -148,9 +144,10 @@ static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { atomic_fetch_add_64(&tsQueryReqNum, 1); } else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) { atomic_fetch_add_64(&tsSubmitReqNum, 1); - } else {} + } else { + } - if ( dnodeProcessShellMsgFp[pMsg->msgType] ) { + if (dnodeProcessShellMsgFp[pMsg->msgType]) { (*dnodeProcessShellMsgFp[pMsg->msgType])(pMsg); } else { dError("RPC %p, shell msg:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]); @@ -191,10 +188,10 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char dDebug("user:%s, send auth msg to mnodes", user); SRpcMsg rpcRsp = {0}; - dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp); + dnodeSendMsgToMnodeRecvWithTimeout(&rpcMsg, &rpcRsp); if (rpcRsp.code != 0) { - dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code)); + dError("user:%s, auth msg received from mnodes, error:%s, may be timeout", user, tstrerror(rpcRsp.code)); } else { SAuthRsp *pRsp = rpcRsp.pCont; dDebug("user:%s, auth msg received from mnodes", user); @@ -211,7 +208,7 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid) { dDebug("vgId:%d, tid:%d send config table msg to mnode", vgId, tid); - int32_t contLen = sizeof(SConfigTableMsg); + int32_t contLen = sizeof(SConfigTableMsg); SConfigTableMsg *pMsg = rpcMallocCont(contLen); pMsg->dnodeId = htonl(dnodeGetDnodeId()); @@ -236,11 +233,12 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid) { // delete this after debug finished SMDCreateTableMsg *pTable = rpcRsp.pCont; - int16_t numOfColumns = htons(pTable->numOfColumns); - int16_t numOfTags = htons(pTable->numOfTags); - int32_t tableId = htonl(pTable->tid); - uint64_t uid = htobe64(pTable->uid); - dInfo("table:%s, numOfColumns:%d numOfTags:%d tid:%d uid:%" PRIu64, pTable->tableFname, numOfColumns, numOfTags, tableId, uid); + int16_t numOfColumns = htons(pTable->numOfColumns); + int16_t numOfTags = htons(pTable->numOfTags); + int32_t tableId = htonl(pTable->tid); + uint64_t uid = htobe64(pTable->uid); + dInfo("table:%s, numOfColumns:%d numOfTags:%d tid:%d uid:%" PRIu64, pTable->tableFname, numOfColumns, numOfTags, + tableId, uid); return rpcRsp.pCont; } @@ -250,9 +248,9 @@ SDnodeStatisInfo dnodeGetStatisInfo() { SDnodeStatisInfo info = {0}; if (dnodeGetRunStatus() == TSDB_RUN_STATUS_RUNING) { #ifdef HTTP_EMBEDDED - info.httpReqNum = httpGetReqCount(); + info.httpReqNum = httpGetReqCount(); #endif - info.queryReqNum = atomic_exchange_64(&tsQueryReqNum, 0); + info.queryReqNum = atomic_exchange_64(&tsQueryReqNum, 0); info.submitReqNum = atomic_exchange_64(&tsSubmitReqNum, 0); } @@ -262,7 +260,7 @@ SDnodeStatisInfo dnodeGetStatisInfo() { int32_t dnodeGetHttpStatusInfo(int32_t idx) { int32_t httpStatus = 0; #ifdef HTTP_EMBEDDED - httpStatus = httpGetStatusCodeCount(idx); + httpStatus = httpGetStatusCodeCount(idx); #endif return httpStatus; } diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 117b8a5657046abb6412144b352fca79b9d590da..dc8aaba5d93fbb6c3d51bdef59f7cea6377e6974 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -20,8 +20,8 @@ extern "C" { #endif -#include "trpc.h" #include "taosmsg.h" +#include "trpc.h" #define MAX_HTTP_STATUS_CODE_NUM 63 typedef struct { @@ -50,6 +50,7 @@ int32_t dnodeStartMnode(SMInfos *pMinfos); void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg); void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp); +void dnodeSendMsgToMnodeRecvWithTimeout(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp); void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet); void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid); @@ -71,7 +72,7 @@ void dnodeSendRpcMWriteRsp(void *pMsg, int32_t code); void dnodeReprocessMWriteMsg(void *pMsg); void dnodeDelayReprocessMWriteMsg(void *pMsg); -void dnodeSendStatusMsgToMnode(); +void dnodeSendStatusMsgToMnode(); typedef struct { char *name; diff --git a/src/inc/trpc.h b/src/inc/trpc.h index fe061eb4f20fa550299316c028d99cdd9d6e8bd7..c74f7e51635825bd0be16016c01600d16c65a9ee 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -23,28 +23,28 @@ extern "C" { #include #include "taosdef.h" -#define TAOS_CONN_SERVER 0 -#define TAOS_CONN_CLIENT 1 +#define TAOS_CONN_SERVER 0 +#define TAOS_CONN_CLIENT 1 extern int tsRpcHeadSize; typedef struct SRpcEpSet { - int8_t inUse; - int8_t numOfEps; - uint16_t port[TSDB_MAX_REPLICA]; - char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN]; + int8_t inUse; + int8_t numOfEps; + uint16_t port[TSDB_MAX_REPLICA]; + char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN]; } SRpcEpSet; typedef struct SRpcCorEpSet { - int32_t version; - SRpcEpSet epSet; + int32_t version; + SRpcEpSet epSet; } SRpcCorEpSet; typedef struct SRpcConnInfo { - uint32_t clientIp; - uint16_t clientPort; - uint32_t serverIp; - char user[TSDB_USER_LEN]; + uint32_t clientIp; + uint16_t clientPort; + uint32_t serverIp; + char user[TSDB_USER_LEN]; } SRpcConnInfo; typedef struct SRpcMsg { @@ -57,46 +57,47 @@ typedef struct SRpcMsg { } SRpcMsg; typedef struct SRpcInit { - uint16_t localPort; // local port - char *label; // for debug purpose - int numOfThreads; // number of threads to handle connections - int sessions; // number of sessions allowed - int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS - int idleTime; // milliseconds, 0 means idle timer is disabled + uint16_t localPort; // local port + char *label; // for debug purpose + int numOfThreads; // number of threads to handle connections + int sessions; // number of sessions allowed + int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS + int idleTime; // milliseconds, 0 means idle timer is disabled // the following is for client app ecurity only - char *user; // user name - char spi; // security parameter index - char encrypt; // encrypt algorithm - char *secret; // key for authentication - char *ckey; // ciphering key + char *user; // user name + char spi; // security parameter index + char encrypt; // encrypt algorithm + char *secret; // key for authentication + char *ckey; // ciphering key // call back to process incoming msg, code shall be ignored by server app - void (*cfp)(SRpcMsg *, SRpcEpSet *); + void (*cfp)(SRpcMsg *, SRpcEpSet *); - // call back to retrieve the client auth info, for server app only - int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey); + // call back to retrieve the client auth info, for server app only + int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey); } SRpcInit; int32_t rpcInit(); -void rpcCleanup(); -void *rpcOpen(const SRpcInit *pRpc); -void rpcClose(void *); -void *rpcMallocCont(int contLen); -void rpcFreeCont(void *pCont); -void *rpcReallocCont(void *ptr, int contLen); -TBOOL rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); -void rpcSendResponse(const SRpcMsg *pMsg); -void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet); -int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); -void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); -int rpcReportProgress(void *pConn, char *pCont, int contLen); -void rpcCancelRequest(int64_t rid); -int32_t rpcUnusedSession(void * rpcInfo, bool bLock); -// send rpc Refid connection probe alive message -bool rpcSendProbe(int64_t rpcRid, void* pPrevContext, bool *pReqOver); +void rpcCleanup(); +void *rpcOpen(const SRpcInit *pRpc); +void rpcClose(void *); +void *rpcMallocCont(int contLen); +void rpcFreeCont(void *pCont); +void *rpcReallocCont(void *ptr, int contLen); +TBOOL rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); +void rpcSendResponse(const SRpcMsg *pMsg); +void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet); +int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); +void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); +void rpcSendRecvWithTimeout(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); +int rpcReportProgress(void *pConn, char *pCont, int contLen); +void rpcCancelRequest(int64_t rid); +int32_t rpcUnusedSession(void *rpcInfo, bool bLock); +// send rpc Refid connection probe alive message +bool rpcSendProbe(int64_t rpcRid, void *pPrevContext, bool *pReqOver); // after sql request send , save conn info -bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext); +bool rpcSaveSendInfo(int64_t rpcRid, void **ppContext); #ifdef __cplusplus } diff --git a/src/os/inc/osSemaphore.h b/src/os/inc/osSemaphore.h index 10d14700e013f66e6d98208f0e65fe1ca5fc3874..6f29318d22ee92404c5cf9be005139ae679c411a 100644 --- a/src/os/inc/osSemaphore.h +++ b/src/os/inc/osSemaphore.h @@ -20,42 +20,46 @@ extern "C" { #endif -#if defined (_TD_DARWIN_64) - typedef struct tsem_s *tsem_t; - int tsem_init(tsem_t *sem, int pshared, unsigned int value); - int tsem_wait(tsem_t *sem); - int tsem_post(tsem_t *sem); - int tsem_destroy(tsem_t *sem); +#include +#if defined(_TD_DARWIN_64) +typedef struct tsem_s *tsem_t; +int tsem_init(tsem_t *sem, int pshared, unsigned int value); +int tsem_wait(tsem_t *sem); +int tsem_post(tsem_t *sem); +int tsem_timewait(tsem_t *sim, int64_t milis); +int tsem_destroy(tsem_t *sem); + #else - #define tsem_t sem_t - #define tsem_init sem_init - int tsem_wait(tsem_t* sem); - #define tsem_post sem_post - #define tsem_destroy sem_destroy +#define tsem_t sem_t +#define tsem_init sem_init +int tsem_wait(tsem_t *sem); +int tsem_timewait(tsem_t *sim, int64_t milis); +#define tsem_post sem_post +#define tsem_destroy sem_destroy #endif -#if defined (_TD_DARWIN_64) - #define pthread_rwlock_t pthread_mutex_t - #define pthread_rwlock_init(lock, NULL) pthread_mutex_init(lock, NULL) - #define pthread_rwlock_destroy(lock) pthread_mutex_destroy(lock) - #define pthread_rwlock_wrlock(lock) pthread_mutex_lock(lock) - #define pthread_rwlock_rdlock(lock) pthread_mutex_lock(lock) - #define pthread_rwlock_unlock(lock) pthread_mutex_unlock(lock) +#if defined(_TD_DARWIN_64) +#define pthread_rwlock_t pthread_mutex_t +#define pthread_rwlock_init(lock, NULL) pthread_mutex_init(lock, NULL) +#define pthread_rwlock_destroy(lock) pthread_mutex_destroy(lock) +#define pthread_rwlock_wrlock(lock) pthread_mutex_lock(lock) +#define pthread_rwlock_rdlock(lock) pthread_mutex_lock(lock) +#define pthread_rwlock_unlock(lock) pthread_mutex_unlock(lock) - #define pthread_spinlock_t pthread_mutex_t - #define pthread_spin_init(lock, NULL) pthread_mutex_init(lock, NULL) - #define pthread_spin_destroy(lock) pthread_mutex_destroy(lock) - #define pthread_spin_lock(lock) pthread_mutex_lock(lock) - #define pthread_spin_unlock(lock) pthread_mutex_unlock(lock) +#define pthread_spinlock_t pthread_mutex_t +#define pthread_spin_init(lock, NULL) pthread_mutex_init(lock, NULL) +#define pthread_spin_destroy(lock) pthread_mutex_destroy(lock) +#define pthread_spin_lock(lock) pthread_mutex_lock(lock) +#define pthread_spin_unlock(lock) pthread_mutex_unlock(lock) #endif bool taosCheckPthreadValid(pthread_t thread); int64_t taosGetSelfPthreadId(); int64_t taosGetPthreadId(pthread_t thread); -void taosResetPthread(pthread_t* thread); +void taosResetPthread(pthread_t *thread); bool taosComparePthread(pthread_t first, pthread_t second); int32_t taosGetPId(); -int32_t taosGetCurrentAPPName(char* name, int32_t* len); +int32_t taosGetCurrentAPPName(char *name, int32_t *len); #ifdef __cplusplus } diff --git a/src/os/src/darwin/dwSemaphore.c b/src/os/src/darwin/dwSemaphore.c index 25cb28cff1b6f9c83cab43faf68641717450c0ea..3e478e031883f2681ac23fc9e232444e357f0367 100644 --- a/src/os/src/darwin/dwSemaphore.c +++ b/src/os/src/darwin/dwSemaphore.c @@ -195,6 +195,7 @@ int tsem_wait(tsem_t *sem) { #endif // SEM_USE_PTHREAD } + int tsem_post(tsem_t *sem) { if (!*sem) { fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", basename(__FILE__), __LINE__, __func__, sem); diff --git a/src/os/src/detail/osSemaphore.c b/src/os/src/detail/osSemaphore.c index 06907d52582e8cdb5cfdbe6da4724c4cf2bc8151..a4f21d501a6b0f2ef8681c81478938067ec5823b 100644 --- a/src/os/src/detail/osSemaphore.c +++ b/src/os/src/detail/osSemaphore.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "os.h" -#if !defined (_TD_DARWIN_64) +#if !defined(_TD_DARWIN_64) int32_t tsem_wait(tsem_t* sem) { int ret = 0; @@ -28,7 +28,25 @@ int32_t tsem_wait(tsem_t* sem) { #endif -#if !(defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) || defined (_TD_DARWIN_64)) +#if !(defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) || defined(_TD_DARWIN_64)) + +int32_t tsem_timewait(tsem_t* sem, int64_t ms) { + int ret = 0; + + struct timespec ts = {0}; + + if (clock_gettime(CLOCK_REALTIME, &ts) == -1) { + return -1; + } + + ts.tv_nsec += ms * 1000000; + ts.tv_sec += ts.tv_nsec / 1000000000; + ts.tv_nsec %= 1000000000; + + while ((ret = sem_timedwait(sem, &ts)) == -1 && errno == EINTR) continue; + + return ret; +} bool taosCheckPthreadValid(pthread_t thread) { return thread != 0; } @@ -40,13 +58,13 @@ int64_t taosGetSelfPthreadId() { } int64_t taosGetPthreadId(pthread_t thread) { return (int64_t)thread; } -void taosResetPthread(pthread_t *thread) { *thread = 0; } +void taosResetPthread(pthread_t* thread) { *thread = 0; } bool taosComparePthread(pthread_t first, pthread_t second) { return first == second; } int32_t taosGetPId() { return getpid(); } -int32_t taosGetCurrentAPPName(char *name, int32_t* len) { +int32_t taosGetCurrentAPPName(char* name, int32_t* len) { const char* self = "/proc/self/exe"; - char path[PATH_MAX] = {0}; + char path[PATH_MAX] = {0}; if (readlink(self, path, PATH_MAX) <= 0) { return -1; @@ -70,3 +88,44 @@ int32_t taosGetCurrentAPPName(char *name, int32_t* len) { } #endif + +#if (defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)) +int32_t tsem_timewait(tsem_t* sem, int64_t ms) { + return tsem_wait(sem); + // struct timespec ts; + // taosClockGetTime(0, &ts); + + // ts.tv_nsec += ms * 1000000; + // ts.tv_sec += ts.tv_nsec / 1000000000; + // ts.tv_nsec %= 1000000000; + // int rc; + // while ((rc = sem_timedwait(sem, &ts)) == -1 && errno == EINTR) continue; + // return rc; + // /* This should have timed out */ + // // ASSERT(errno == ETIMEDOUT); + // // ASSERT(rc != 0); + // // GetSystemTimeAsFileTime(&ft_after); + // // // We specified a non-zero wait. Time must advance. + // // if (ft_before.dwLowDateTime == ft_after.dwLowDateTime && ft_before.dwHighDateTime == ft_after.dwHighDateTime) + // // { + // // printf("nanoseconds: %d, rc: %d, code:0x%x. before filetime: %d, %d; after filetime: %d, %d\n", + // // nanosecs, rc, errno, + // // (int)ft_before.dwLowDateTime, (int)ft_before.dwHighDateTime, + // // (int)ft_after.dwLowDateTime, (int)ft_after.dwHighDateTime); + // // printf("time must advance during sem_timedwait."); + // // return 1; + // // } +} + +#endif + +// #if defined(_TD_DARWIN_64) + +// int tsem_timewait(tsem_t* psem, int64_t milis) { +// if (psem == NULL || *psem == NULL) return -1; +// dispatch_time_t time = dispatch_time(DISPATCH_TIME_NOW, (int64_t)(milis * USEC_PER_SEC)); +// dispatch_semaphore_wait(*psem, time); +// return 0; +// } + +// #endif \ No newline at end of file diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index a2d6859a1a7ce1e08ce4de2cc72fd06bf6dbc889..bc9d24b62f7e09f637bb8c7ba5e1cdc45e5778e8 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -561,6 +561,37 @@ void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) return; } +void rpcSendRecvWithTimeout(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + SRpcReqContext *pContext; + pContext = (SRpcReqContext *)((char *)pMsg->pCont - sizeof(SRpcHead) - sizeof(SRpcReqContext)); + + memset(pRsp, 0, sizeof(SRpcMsg)); + + tsem_t sem; + tsem_init(&sem, 0, 0); + pContext->pSem = &sem; + pContext->pRsp = pRsp; + pContext->pSet = pEpSet; + + int64_t rid = 0; + rpcSendRequest(shandle, pEpSet, pMsg, &rid); + +#if defined (LINUX) + if (tsem_timewait(&sem, 3 * 1000) == 0) { + // do nothing + } else { + rpcCancelRequest(rid); + pRsp->code = -1; + } + +#else + tsem_wait(&sem); +#endif + tsem_destroy(&sem); + + return; +} + // this API is used by server app to keep an APP context in case connection is broken int rpcReportProgress(void *handle, char *pCont, int contLen) { SRpcConn *pConn = (SRpcConn *)handle; @@ -1469,7 +1500,7 @@ static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { if (pConn->connType != RPC_CONN_TCPC) { pContext->code = terrno; return BOOL_ASYNC; - } + } // try next ip again pContext->code = terrno; // in rpcProcessConnError if numOfTry over limit, could call rpcNotifyClient to stop query @@ -1503,7 +1534,7 @@ static bool rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { if (writtenLen != msgLen) { tError("%s, failed to send, msgLen:%d written:%d, reason:%s", pConn->info, msgLen, writtenLen, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); ret = false; }