提交 ac22fa74 编写于 作者: dengyihao's avatar dengyihao

add timeout to auth

上级 240171d3
...@@ -13,58 +13,58 @@ ...@@ -13,58 +13,58 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
/* this file is mainly responsible for the communication between DNODEs. Each /* this file is mainly responsible for the communication between DNODEs. Each
* dnode works as both server and client. Dnode may send status, grant, config * dnode works as both server and client. Dnode may send status, grant, config
* messages to mnode, mnode may send create/alter/drop table/vnode messages * messages to mnode, mnode may send create/alter/drop table/vnode messages
* to dnode. All theses messages are handled from here * to dnode. All theses messages are handled from here
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h"
#include "mnode.h"
#include "dnodeVMgmt.h"
#include "dnodeVWrite.h"
#include "dnodeMPeer.h"
#include "dnodeMInfos.h" #include "dnodeMInfos.h"
#include "dnodeMPeer.h"
#include "dnodeStep.h" #include "dnodeStep.h"
#include "dnodeVMgmt.h"
#include "dnodeVWrite.h"
#include "mnode.h"
#include "os.h"
static void (*dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); static void (*dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *); static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *);
static void (*dnodeProcessRspMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg); static void (*dnodeProcessRspMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg);
static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet); static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet);
static void *tsServerRpc = NULL; static void *tsServerRpc = NULL;
static void *tsClientRpc = NULL; static void *tsClientRpc = NULL;
int32_t dnodeInitServer() { int32_t dnodeInitServer() {
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeDispatchToVWriteQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeDispatchToVWriteQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVWriteQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVWriteQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = dnodeDispatchToVMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMPeerQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMPeerQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMPeerQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMPeerQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_AUTH] = dnodeDispatchToMPeerQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_AUTH] = dnodeDispatchToMPeerQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_GRANT] = dnodeDispatchToMPeerQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_GRANT] = dnodeDispatchToMPeerQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_STATUS] = dnodeDispatchToMPeerQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_STATUS] = dnodeDispatchToMPeerQueue;
SRpcInit rpcInitial; SRpcInit rpcInitial;
memset(&rpcInitial, 0, sizeof(rpcInitial)); memset(&rpcInitial, 0, sizeof(rpcInitial));
rpcInitial.localPort = tsDnodeDnodePort; rpcInitial.localPort = tsDnodeDnodePort;
rpcInitial.label = "DND-S"; rpcInitial.label = "DND-S";
rpcInitial.numOfThreads = 1; rpcInitial.numOfThreads = 1;
rpcInitial.cfp = dnodeProcessReqMsgFromDnode; rpcInitial.cfp = dnodeProcessReqMsgFromDnode;
rpcInitial.sessions = TSDB_MAX_VNODES << 4; rpcInitial.sessions = TSDB_MAX_VNODES << 4;
rpcInitial.connType = TAOS_CONN_SERVER; rpcInitial.connType = TAOS_CONN_SERVER;
rpcInitial.idleTime = tsShellActivityTimer * 1000; rpcInitial.idleTime = tsShellActivityTimer * 1000;
tsServerRpc = rpcOpen(&rpcInitial); tsServerRpc = rpcOpen(&rpcInitial);
if (tsServerRpc == NULL) { if (tsServerRpc == NULL) {
...@@ -85,11 +85,7 @@ void dnodeCleanupServer() { ...@@ -85,11 +85,7 @@ void dnodeCleanupServer() {
} }
static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
SRpcMsg rspMsg = { SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0};
.handle = pMsg->handle,
.pCont = NULL,
.contLen = 0
};
if (pMsg->pCont == NULL) return; if (pMsg->pCont == NULL) return;
if (pMsg->msgType == TSDB_MSG_TYPE_NETWORK_TEST) { if (pMsg->msgType == TSDB_MSG_TYPE_NETWORK_TEST) {
...@@ -122,18 +118,18 @@ static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { ...@@ -122,18 +118,18 @@ static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
} }
int32_t dnodeInitClient() { int32_t dnodeInitClient() {
char secret[TSDB_KEY_LEN] = "secret"; char secret[TSDB_KEY_LEN] = "secret";
SRpcInit rpcInitial; SRpcInit rpcInitial;
memset(&rpcInitial, 0, sizeof(rpcInitial)); memset(&rpcInitial, 0, sizeof(rpcInitial));
rpcInitial.label = "DND-C"; rpcInitial.label = "DND-C";
rpcInitial.numOfThreads = 1; rpcInitial.numOfThreads = 1;
rpcInitial.cfp = dnodeProcessRspFromDnode; rpcInitial.cfp = dnodeProcessRspFromDnode;
rpcInitial.sessions = TSDB_MAX_VNODES << 4; rpcInitial.sessions = TSDB_MAX_VNODES << 4;
rpcInitial.connType = TAOS_CONN_CLIENT; rpcInitial.connType = TAOS_CONN_CLIENT;
rpcInitial.idleTime = tsShellActivityTimer * 1000; rpcInitial.idleTime = tsShellActivityTimer * 1000;
rpcInitial.user = "t"; rpcInitial.user = "t";
rpcInitial.ckey = "key"; rpcInitial.ckey = "key";
rpcInitial.secret = secret; rpcInitial.secret = secret;
tsClientRpc = rpcOpen(&rpcInitial); tsClientRpc = rpcOpen(&rpcInitial);
if (tsClientRpc == NULL) { if (tsClientRpc == NULL) {
...@@ -165,7 +161,7 @@ static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { ...@@ -165,7 +161,7 @@ static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
dnodeUpdateEpSetForPeer(pEpSet); dnodeUpdateEpSetForPeer(pEpSet);
} }
if (dnodeProcessRspMsgFp[pMsg->msgType]) { if (dnodeProcessRspMsgFp[pMsg->msgType]) {
(*dnodeProcessRspMsgFp[pMsg->msgType])(pMsg); (*dnodeProcessRspMsgFp[pMsg->msgType])(pMsg);
} else { } else {
mnodeProcessPeerRsp(pMsg); mnodeProcessPeerRsp(pMsg);
...@@ -174,13 +170,9 @@ static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { ...@@ -174,13 +170,9 @@ static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
} }
void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) { void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) { dnodeProcessRspMsgFp[msgType] = fp; }
dnodeProcessRspMsgFp[msgType] = fp;
}
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { rpcSendRequest(tsClientRpc, epSet, rpcMsg, NULL); }
rpcSendRequest(tsClientRpc, epSet, rpcMsg, NULL);
}
void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
SRpcEpSet epSet = {0}; SRpcEpSet epSet = {0};
...@@ -189,6 +181,13 @@ void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { ...@@ -189,6 +181,13 @@ void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
assert(tsClientRpc != 0); assert(tsClientRpc != 0);
rpcSendRecv(tsClientRpc, &epSet, rpcMsg, rpcRsp); rpcSendRecv(tsClientRpc, &epSet, rpcMsg, rpcRsp);
} }
void dnodeSendMsgToMnodeRecvWithTimeout(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
SRpcEpSet epSet = {0};
dnodeGetEpSetForPeer(&epSet);
assert(tsClientRpc != 0);
rpcSendRecvWithTimeout(tsClientRpc, &epSet, rpcMsg, rpcRsp);
}
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet) { void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet) {
rpcSendRecv(tsClientRpc, epSet, rpcMsg, rpcRsp); rpcSendRecv(tsClientRpc, epSet, rpcMsg, rpcRsp);
......
...@@ -14,70 +14,70 @@ ...@@ -14,70 +14,70 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "dnodeShell.h"
#include "http.h"
#include "mnode.h"
#include "dnodeVRead.h"
#include "dnodeVWrite.h"
#include "dnodeMRead.h" #include "dnodeMRead.h"
#include "dnodeMWrite.h" #include "dnodeMWrite.h"
#include "dnodeShell.h"
#include "dnodeStep.h" #include "dnodeStep.h"
#include "dnodeVRead.h"
#include "dnodeVWrite.h"
#include "http.h"
#include "mnode.h"
#include "os.h"
static void (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); static void (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *); static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *);
static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey);
static void * tsShellRpc = NULL; static void *tsShellRpc = NULL;
static int64_t tsQueryReqNum = 0; static int64_t tsQueryReqNum = 0;
static int64_t tsSubmitReqNum = 0; static int64_t tsSubmitReqNum = 0;
int32_t dnodeInitShell() { int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVWriteQueue;
// the following message shall be treated as mnode write // the following message shall be treated as mnode write
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE]= dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TP] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TP] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_FUNCTION] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_FUNCTION] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SYNC_DB] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SYNC_DB] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_TP] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_TP] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_FUNCTION] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_FUNCTION] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TP] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TP] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE]= dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TABLE] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TABLE] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM]= dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE]= dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_COMPACT_VNODE]= dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_COMPACT_VNODE] = dnodeDispatchToMWriteQueue;
// the following message shall be treated as mnode query // the following message shall be treated as mnode query
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP]= dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_PROBE_CONN] = dnodeResponseProbeMsg; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_PROBE_CONN] = dnodeResponseProbeMsg;
int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0); int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
if (numOfThreads < 1) { if (numOfThreads < 1) {
...@@ -86,14 +86,14 @@ int32_t dnodeInitShell() { ...@@ -86,14 +86,14 @@ int32_t dnodeInitShell() {
SRpcInit rpcInitial; SRpcInit rpcInitial;
memset(&rpcInitial, 0, sizeof(rpcInitial)); memset(&rpcInitial, 0, sizeof(rpcInitial));
rpcInitial.localPort = tsDnodeShellPort; rpcInitial.localPort = tsDnodeShellPort;
rpcInitial.label = "SHELL"; rpcInitial.label = "SHELL";
rpcInitial.numOfThreads = numOfThreads; rpcInitial.numOfThreads = numOfThreads;
rpcInitial.cfp = dnodeProcessMsgFromShell; rpcInitial.cfp = dnodeProcessMsgFromShell;
rpcInitial.sessions = tsMaxShellConns; rpcInitial.sessions = tsMaxShellConns;
rpcInitial.connType = TAOS_CONN_SERVER; rpcInitial.connType = TAOS_CONN_SERVER;
rpcInitial.idleTime = tsShellActivityTimer * 1000; rpcInitial.idleTime = tsShellActivityTimer * 1000;
rpcInitial.afp = dnodeRetrieveUserAuthInfo; rpcInitial.afp = dnodeRetrieveUserAuthInfo;
tsShellRpc = rpcOpen(&rpcInitial); tsShellRpc = rpcOpen(&rpcInitial);
if (tsShellRpc == NULL) { if (tsShellRpc == NULL) {
...@@ -113,11 +113,7 @@ void dnodeCleanupShell() { ...@@ -113,11 +113,7 @@ void dnodeCleanupShell() {
} }
static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0};
.handle = pMsg->handle,
.pCont = NULL,
.contLen = 0
};
if (pMsg->pCont == NULL) return; if (pMsg->pCont == NULL) return;
...@@ -148,9 +144,10 @@ static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { ...@@ -148,9 +144,10 @@ static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
atomic_fetch_add_64(&tsQueryReqNum, 1); atomic_fetch_add_64(&tsQueryReqNum, 1);
} else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) { } else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
atomic_fetch_add_64(&tsSubmitReqNum, 1); atomic_fetch_add_64(&tsSubmitReqNum, 1);
} else {} } else {
}
if ( dnodeProcessShellMsgFp[pMsg->msgType] ) { if (dnodeProcessShellMsgFp[pMsg->msgType]) {
(*dnodeProcessShellMsgFp[pMsg->msgType])(pMsg); (*dnodeProcessShellMsgFp[pMsg->msgType])(pMsg);
} else { } else {
dError("RPC %p, shell msg:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]); dError("RPC %p, shell msg:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]);
...@@ -191,10 +188,10 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char ...@@ -191,10 +188,10 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char
dDebug("user:%s, send auth msg to mnodes", user); dDebug("user:%s, send auth msg to mnodes", user);
SRpcMsg rpcRsp = {0}; SRpcMsg rpcRsp = {0};
dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp); dnodeSendMsgToMnodeRecvWithTimeout(&rpcMsg, &rpcRsp);
if (rpcRsp.code != 0) { if (rpcRsp.code != 0) {
dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code)); dError("user:%s, auth msg received from mnodes, error:%s, may be timeout", user, tstrerror(rpcRsp.code));
} else { } else {
SAuthRsp *pRsp = rpcRsp.pCont; SAuthRsp *pRsp = rpcRsp.pCont;
dDebug("user:%s, auth msg received from mnodes", user); dDebug("user:%s, auth msg received from mnodes", user);
...@@ -211,7 +208,7 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char ...@@ -211,7 +208,7 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char
void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid) { void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid) {
dDebug("vgId:%d, tid:%d send config table msg to mnode", vgId, tid); dDebug("vgId:%d, tid:%d send config table msg to mnode", vgId, tid);
int32_t contLen = sizeof(SConfigTableMsg); int32_t contLen = sizeof(SConfigTableMsg);
SConfigTableMsg *pMsg = rpcMallocCont(contLen); SConfigTableMsg *pMsg = rpcMallocCont(contLen);
pMsg->dnodeId = htonl(dnodeGetDnodeId()); pMsg->dnodeId = htonl(dnodeGetDnodeId());
...@@ -236,11 +233,12 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid) { ...@@ -236,11 +233,12 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid) {
// delete this after debug finished // delete this after debug finished
SMDCreateTableMsg *pTable = rpcRsp.pCont; SMDCreateTableMsg *pTable = rpcRsp.pCont;
int16_t numOfColumns = htons(pTable->numOfColumns); int16_t numOfColumns = htons(pTable->numOfColumns);
int16_t numOfTags = htons(pTable->numOfTags); int16_t numOfTags = htons(pTable->numOfTags);
int32_t tableId = htonl(pTable->tid); int32_t tableId = htonl(pTable->tid);
uint64_t uid = htobe64(pTable->uid); uint64_t uid = htobe64(pTable->uid);
dInfo("table:%s, numOfColumns:%d numOfTags:%d tid:%d uid:%" PRIu64, pTable->tableFname, numOfColumns, numOfTags, tableId, uid); dInfo("table:%s, numOfColumns:%d numOfTags:%d tid:%d uid:%" PRIu64, pTable->tableFname, numOfColumns, numOfTags,
tableId, uid);
return rpcRsp.pCont; return rpcRsp.pCont;
} }
...@@ -250,9 +248,9 @@ SDnodeStatisInfo dnodeGetStatisInfo() { ...@@ -250,9 +248,9 @@ SDnodeStatisInfo dnodeGetStatisInfo() {
SDnodeStatisInfo info = {0}; SDnodeStatisInfo info = {0};
if (dnodeGetRunStatus() == TSDB_RUN_STATUS_RUNING) { if (dnodeGetRunStatus() == TSDB_RUN_STATUS_RUNING) {
#ifdef HTTP_EMBEDDED #ifdef HTTP_EMBEDDED
info.httpReqNum = httpGetReqCount(); info.httpReqNum = httpGetReqCount();
#endif #endif
info.queryReqNum = atomic_exchange_64(&tsQueryReqNum, 0); info.queryReqNum = atomic_exchange_64(&tsQueryReqNum, 0);
info.submitReqNum = atomic_exchange_64(&tsSubmitReqNum, 0); info.submitReqNum = atomic_exchange_64(&tsSubmitReqNum, 0);
} }
...@@ -262,7 +260,7 @@ SDnodeStatisInfo dnodeGetStatisInfo() { ...@@ -262,7 +260,7 @@ SDnodeStatisInfo dnodeGetStatisInfo() {
int32_t dnodeGetHttpStatusInfo(int32_t idx) { int32_t dnodeGetHttpStatusInfo(int32_t idx) {
int32_t httpStatus = 0; int32_t httpStatus = 0;
#ifdef HTTP_EMBEDDED #ifdef HTTP_EMBEDDED
httpStatus = httpGetStatusCodeCount(idx); httpStatus = httpGetStatusCodeCount(idx);
#endif #endif
return httpStatus; return httpStatus;
} }
......
...@@ -20,8 +20,8 @@ ...@@ -20,8 +20,8 @@
extern "C" { extern "C" {
#endif #endif
#include "trpc.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "trpc.h"
#define MAX_HTTP_STATUS_CODE_NUM 63 #define MAX_HTTP_STATUS_CODE_NUM 63
typedef struct { typedef struct {
...@@ -50,6 +50,7 @@ int32_t dnodeStartMnode(SMInfos *pMinfos); ...@@ -50,6 +50,7 @@ int32_t dnodeStartMnode(SMInfos *pMinfos);
void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg); void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg);
void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp); void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp);
void dnodeSendMsgToMnodeRecvWithTimeout(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp);
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet); void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet);
void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid); void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid);
...@@ -71,7 +72,7 @@ void dnodeSendRpcMWriteRsp(void *pMsg, int32_t code); ...@@ -71,7 +72,7 @@ void dnodeSendRpcMWriteRsp(void *pMsg, int32_t code);
void dnodeReprocessMWriteMsg(void *pMsg); void dnodeReprocessMWriteMsg(void *pMsg);
void dnodeDelayReprocessMWriteMsg(void *pMsg); void dnodeDelayReprocessMWriteMsg(void *pMsg);
void dnodeSendStatusMsgToMnode(); void dnodeSendStatusMsgToMnode();
typedef struct { typedef struct {
char *name; char *name;
......
...@@ -23,28 +23,28 @@ extern "C" { ...@@ -23,28 +23,28 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include "taosdef.h" #include "taosdef.h"
#define TAOS_CONN_SERVER 0 #define TAOS_CONN_SERVER 0
#define TAOS_CONN_CLIENT 1 #define TAOS_CONN_CLIENT 1
extern int tsRpcHeadSize; extern int tsRpcHeadSize;
typedef struct SRpcEpSet { typedef struct SRpcEpSet {
int8_t inUse; int8_t inUse;
int8_t numOfEps; int8_t numOfEps;
uint16_t port[TSDB_MAX_REPLICA]; uint16_t port[TSDB_MAX_REPLICA];
char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN]; char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN];
} SRpcEpSet; } SRpcEpSet;
typedef struct SRpcCorEpSet { typedef struct SRpcCorEpSet {
int32_t version; int32_t version;
SRpcEpSet epSet; SRpcEpSet epSet;
} SRpcCorEpSet; } SRpcCorEpSet;
typedef struct SRpcConnInfo { typedef struct SRpcConnInfo {
uint32_t clientIp; uint32_t clientIp;
uint16_t clientPort; uint16_t clientPort;
uint32_t serverIp; uint32_t serverIp;
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
} SRpcConnInfo; } SRpcConnInfo;
typedef struct SRpcMsg { typedef struct SRpcMsg {
...@@ -57,46 +57,47 @@ typedef struct SRpcMsg { ...@@ -57,46 +57,47 @@ typedef struct SRpcMsg {
} SRpcMsg; } SRpcMsg;
typedef struct SRpcInit { typedef struct SRpcInit {
uint16_t localPort; // local port uint16_t localPort; // local port
char *label; // for debug purpose char *label; // for debug purpose
int numOfThreads; // number of threads to handle connections int numOfThreads; // number of threads to handle connections
int sessions; // number of sessions allowed int sessions; // number of sessions allowed
int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
int idleTime; // milliseconds, 0 means idle timer is disabled int idleTime; // milliseconds, 0 means idle timer is disabled
// the following is for client app ecurity only // the following is for client app ecurity only
char *user; // user name char *user; // user name
char spi; // security parameter index char spi; // security parameter index
char encrypt; // encrypt algorithm char encrypt; // encrypt algorithm
char *secret; // key for authentication char *secret; // key for authentication
char *ckey; // ciphering key char *ckey; // ciphering key
// call back to process incoming msg, code shall be ignored by server app // call back to process incoming msg, code shall be ignored by server app
void (*cfp)(SRpcMsg *, SRpcEpSet *); void (*cfp)(SRpcMsg *, SRpcEpSet *);
// call back to retrieve the client auth info, for server app only // call back to retrieve the client auth info, for server app only
int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey); int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
} SRpcInit; } SRpcInit;
int32_t rpcInit(); int32_t rpcInit();
void rpcCleanup(); void rpcCleanup();
void *rpcOpen(const SRpcInit *pRpc); void *rpcOpen(const SRpcInit *pRpc);
void rpcClose(void *); void rpcClose(void *);
void *rpcMallocCont(int contLen); void *rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont); void rpcFreeCont(void *pCont);
void *rpcReallocCont(void *ptr, int contLen); void *rpcReallocCont(void *ptr, int contLen);
TBOOL rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); TBOOL rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
void rpcSendResponse(const SRpcMsg *pMsg); void rpcSendResponse(const SRpcMsg *pMsg);
void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet); void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcReportProgress(void *pConn, char *pCont, int contLen); void rpcSendRecvWithTimeout(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
void rpcCancelRequest(int64_t rid); int rpcReportProgress(void *pConn, char *pCont, int contLen);
int32_t rpcUnusedSession(void * rpcInfo, bool bLock); void rpcCancelRequest(int64_t rid);
// send rpc Refid connection probe alive message int32_t rpcUnusedSession(void *rpcInfo, bool bLock);
bool rpcSendProbe(int64_t rpcRid, void* pPrevContext, bool *pReqOver); // send rpc Refid connection probe alive message
bool rpcSendProbe(int64_t rpcRid, void *pPrevContext, bool *pReqOver);
// after sql request send , save conn info // after sql request send , save conn info
bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext); bool rpcSaveSendInfo(int64_t rpcRid, void **ppContext);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -20,42 +20,46 @@ ...@@ -20,42 +20,46 @@
extern "C" { extern "C" {
#endif #endif
#if defined (_TD_DARWIN_64) #include <semaphore.h>
typedef struct tsem_s *tsem_t; #if defined(_TD_DARWIN_64)
int tsem_init(tsem_t *sem, int pshared, unsigned int value); typedef struct tsem_s *tsem_t;
int tsem_wait(tsem_t *sem); int tsem_init(tsem_t *sem, int pshared, unsigned int value);
int tsem_post(tsem_t *sem); int tsem_wait(tsem_t *sem);
int tsem_destroy(tsem_t *sem); int tsem_post(tsem_t *sem);
int tsem_timewait(tsem_t *sim, int64_t milis);
int tsem_destroy(tsem_t *sem);
#else #else
#define tsem_t sem_t #define tsem_t sem_t
#define tsem_init sem_init #define tsem_init sem_init
int tsem_wait(tsem_t* sem); int tsem_wait(tsem_t *sem);
#define tsem_post sem_post int tsem_timewait(tsem_t *sim, int64_t milis);
#define tsem_destroy sem_destroy #define tsem_post sem_post
#define tsem_destroy sem_destroy
#endif #endif
#if defined (_TD_DARWIN_64) #if defined(_TD_DARWIN_64)
#define pthread_rwlock_t pthread_mutex_t #define pthread_rwlock_t pthread_mutex_t
#define pthread_rwlock_init(lock, NULL) pthread_mutex_init(lock, NULL) #define pthread_rwlock_init(lock, NULL) pthread_mutex_init(lock, NULL)
#define pthread_rwlock_destroy(lock) pthread_mutex_destroy(lock) #define pthread_rwlock_destroy(lock) pthread_mutex_destroy(lock)
#define pthread_rwlock_wrlock(lock) pthread_mutex_lock(lock) #define pthread_rwlock_wrlock(lock) pthread_mutex_lock(lock)
#define pthread_rwlock_rdlock(lock) pthread_mutex_lock(lock) #define pthread_rwlock_rdlock(lock) pthread_mutex_lock(lock)
#define pthread_rwlock_unlock(lock) pthread_mutex_unlock(lock) #define pthread_rwlock_unlock(lock) pthread_mutex_unlock(lock)
#define pthread_spinlock_t pthread_mutex_t #define pthread_spinlock_t pthread_mutex_t
#define pthread_spin_init(lock, NULL) pthread_mutex_init(lock, NULL) #define pthread_spin_init(lock, NULL) pthread_mutex_init(lock, NULL)
#define pthread_spin_destroy(lock) pthread_mutex_destroy(lock) #define pthread_spin_destroy(lock) pthread_mutex_destroy(lock)
#define pthread_spin_lock(lock) pthread_mutex_lock(lock) #define pthread_spin_lock(lock) pthread_mutex_lock(lock)
#define pthread_spin_unlock(lock) pthread_mutex_unlock(lock) #define pthread_spin_unlock(lock) pthread_mutex_unlock(lock)
#endif #endif
bool taosCheckPthreadValid(pthread_t thread); bool taosCheckPthreadValid(pthread_t thread);
int64_t taosGetSelfPthreadId(); int64_t taosGetSelfPthreadId();
int64_t taosGetPthreadId(pthread_t thread); int64_t taosGetPthreadId(pthread_t thread);
void taosResetPthread(pthread_t* thread); void taosResetPthread(pthread_t *thread);
bool taosComparePthread(pthread_t first, pthread_t second); bool taosComparePthread(pthread_t first, pthread_t second);
int32_t taosGetPId(); int32_t taosGetPId();
int32_t taosGetCurrentAPPName(char* name, int32_t* len); int32_t taosGetCurrentAPPName(char *name, int32_t *len);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -561,6 +561,29 @@ void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) ...@@ -561,6 +561,29 @@ void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp)
return; return;
} }
void rpcSendRecvWithTimeout(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
SRpcReqContext *pContext;
pContext = (SRpcReqContext *)((char *)pMsg->pCont - sizeof(SRpcHead) - sizeof(SRpcReqContext));
memset(pRsp, 0, sizeof(SRpcMsg));
tsem_t sem;
tsem_init(&sem, 0, 0);
pContext->pSem = &sem;
pContext->pRsp = pRsp;
pContext->pSet = pEpSet;
int64_t rid = 0;
rpcSendRequest(shandle, pEpSet, pMsg, &rid);
tsem_timewait(&sem, 3 * 1000);
rpcCancelRequest(rid);
tsem_destroy(&sem);
pRsp->code = -1;
return;
}
// this API is used by server app to keep an APP context in case connection is broken // this API is used by server app to keep an APP context in case connection is broken
int rpcReportProgress(void *handle, char *pCont, int contLen) { int rpcReportProgress(void *handle, char *pCont, int contLen) {
SRpcConn *pConn = (SRpcConn *)handle; SRpcConn *pConn = (SRpcConn *)handle;
...@@ -1469,7 +1492,7 @@ static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { ...@@ -1469,7 +1492,7 @@ static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
if (pConn->connType != RPC_CONN_TCPC) { if (pConn->connType != RPC_CONN_TCPC) {
pContext->code = terrno; pContext->code = terrno;
return BOOL_ASYNC; return BOOL_ASYNC;
} }
// try next ip again // try next ip again
pContext->code = terrno; pContext->code = terrno;
// in rpcProcessConnError if numOfTry over limit, could call rpcNotifyClient to stop query // in rpcProcessConnError if numOfTry over limit, could call rpcNotifyClient to stop query
...@@ -1503,7 +1526,7 @@ static bool rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { ...@@ -1503,7 +1526,7 @@ static bool rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
if (writtenLen != msgLen) { if (writtenLen != msgLen) {
tError("%s, failed to send, msgLen:%d written:%d, reason:%s", pConn->info, msgLen, writtenLen, strerror(errno)); tError("%s, failed to send, msgLen:%d written:%d, reason:%s", pConn->info, msgLen, writtenLen, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
ret = false; ret = false;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册