未验证 提交 11824a52 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #22193 from taosdata/fix/TS-3727-2.6

add timeout to auth
......@@ -13,58 +13,58 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/* this file is mainly responsible for the communication between DNODEs. Each
/* this file is mainly responsible for the communication between DNODEs. Each
* dnode works as both server and client. Dnode may send status, grant, config
* messages to mnode, mnode may send create/alter/drop table/vnode messages
* messages to mnode, mnode may send create/alter/drop table/vnode messages
* to dnode. All theses messages are handled from here
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "mnode.h"
#include "dnodeVMgmt.h"
#include "dnodeVWrite.h"
#include "dnodeMPeer.h"
#include "dnodeMInfos.h"
#include "dnodeMPeer.h"
#include "dnodeStep.h"
#include "dnodeVMgmt.h"
#include "dnodeVWrite.h"
#include "mnode.h"
#include "os.h"
static void (*dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *);
static void (*dnodeProcessRspMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg);
static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet);
static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet);
static void *tsServerRpc = NULL;
static void *tsClientRpc = NULL;
int32_t dnodeInitServer() {
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMPeerQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMPeerQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_AUTH] = dnodeDispatchToMPeerQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_GRANT] = dnodeDispatchToMPeerQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_STATUS] = dnodeDispatchToMPeerQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_AUTH] = dnodeDispatchToMPeerQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_GRANT] = dnodeDispatchToMPeerQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_STATUS] = dnodeDispatchToMPeerQueue;
SRpcInit rpcInitial;
memset(&rpcInitial, 0, sizeof(rpcInitial));
rpcInitial.localPort = tsDnodeDnodePort;
rpcInitial.label = "DND-S";
rpcInitial.localPort = tsDnodeDnodePort;
rpcInitial.label = "DND-S";
rpcInitial.numOfThreads = 1;
rpcInitial.cfp = dnodeProcessReqMsgFromDnode;
rpcInitial.sessions = TSDB_MAX_VNODES << 4;
rpcInitial.connType = TAOS_CONN_SERVER;
rpcInitial.idleTime = tsShellActivityTimer * 1000;
rpcInitial.cfp = dnodeProcessReqMsgFromDnode;
rpcInitial.sessions = TSDB_MAX_VNODES << 4;
rpcInitial.connType = TAOS_CONN_SERVER;
rpcInitial.idleTime = tsShellActivityTimer * 1000;
tsServerRpc = rpcOpen(&rpcInitial);
if (tsServerRpc == NULL) {
......@@ -85,11 +85,7 @@ void dnodeCleanupServer() {
}
static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
SRpcMsg rspMsg = {
.handle = pMsg->handle,
.pCont = NULL,
.contLen = 0
};
SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0};
if (pMsg->pCont == NULL) return;
if (pMsg->msgType == TSDB_MSG_TYPE_NETWORK_TEST) {
......@@ -122,18 +118,18 @@ static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
}
int32_t dnodeInitClient() {
char secret[TSDB_KEY_LEN] = "secret";
char secret[TSDB_KEY_LEN] = "secret";
SRpcInit rpcInitial;
memset(&rpcInitial, 0, sizeof(rpcInitial));
rpcInitial.label = "DND-C";
rpcInitial.label = "DND-C";
rpcInitial.numOfThreads = 1;
rpcInitial.cfp = dnodeProcessRspFromDnode;
rpcInitial.sessions = TSDB_MAX_VNODES << 4;
rpcInitial.connType = TAOS_CONN_CLIENT;
rpcInitial.idleTime = tsShellActivityTimer * 1000;
rpcInitial.user = "t";
rpcInitial.ckey = "key";
rpcInitial.secret = secret;
rpcInitial.cfp = dnodeProcessRspFromDnode;
rpcInitial.sessions = TSDB_MAX_VNODES << 4;
rpcInitial.connType = TAOS_CONN_CLIENT;
rpcInitial.idleTime = tsShellActivityTimer * 1000;
rpcInitial.user = "t";
rpcInitial.ckey = "key";
rpcInitial.secret = secret;
tsClientRpc = rpcOpen(&rpcInitial);
if (tsClientRpc == NULL) {
......@@ -165,7 +161,7 @@ static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
dnodeUpdateEpSetForPeer(pEpSet);
}
if (dnodeProcessRspMsgFp[pMsg->msgType]) {
if (dnodeProcessRspMsgFp[pMsg->msgType]) {
(*dnodeProcessRspMsgFp[pMsg->msgType])(pMsg);
} else {
mnodeProcessPeerRsp(pMsg);
......@@ -174,13 +170,9 @@ static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
rpcFreeCont(pMsg->pCont);
}
void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) {
dnodeProcessRspMsgFp[msgType] = fp;
}
void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) { dnodeProcessRspMsgFp[msgType] = fp; }
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) {
rpcSendRequest(tsClientRpc, epSet, rpcMsg, NULL);
}
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { rpcSendRequest(tsClientRpc, epSet, rpcMsg, NULL); }
void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
SRpcEpSet epSet = {0};
......@@ -189,6 +181,13 @@ void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
assert(tsClientRpc != 0);
rpcSendRecv(tsClientRpc, &epSet, rpcMsg, rpcRsp);
}
void dnodeSendMsgToMnodeRecvWithTimeout(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
SRpcEpSet epSet = {0};
dnodeGetEpSetForPeer(&epSet);
assert(tsClientRpc != 0);
rpcSendRecvWithTimeout(tsClientRpc, &epSet, rpcMsg, rpcRsp);
}
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet) {
rpcSendRecv(tsClientRpc, epSet, rpcMsg, rpcRsp);
......
......@@ -14,70 +14,70 @@
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "http.h"
#include "mnode.h"
#include "dnodeVRead.h"
#include "dnodeVWrite.h"
#include "dnodeShell.h"
#include "dnodeMRead.h"
#include "dnodeMWrite.h"
#include "dnodeShell.h"
#include "dnodeStep.h"
#include "dnodeVRead.h"
#include "dnodeVWrite.h"
#include "http.h"
#include "mnode.h"
#include "os.h"
static void (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
static void (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *);
static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey);
static void * tsShellRpc = NULL;
static int64_t tsQueryReqNum = 0;
static void *tsShellRpc = NULL;
static int64_t tsQueryReqNum = 0;
static int64_t tsSubmitReqNum = 0;
int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVWriteQueue;
// the following message shall be treated as mnode write
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE]= dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TP] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TP] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_FUNCTION] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SYNC_DB] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_TP] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SYNC_DB] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_TP] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_FUNCTION] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TP] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE]= dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TP] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TABLE] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM]= dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE]= dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_COMPACT_VNODE]= dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_COMPACT_VNODE] = dnodeDispatchToMWriteQueue;
// the following message shall be treated as mnode query
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP]= dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_PROBE_CONN] = dnodeResponseProbeMsg;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_PROBE_CONN] = dnodeResponseProbeMsg;
int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
if (numOfThreads < 1) {
......@@ -86,14 +86,14 @@ int32_t dnodeInitShell() {
SRpcInit rpcInitial;
memset(&rpcInitial, 0, sizeof(rpcInitial));
rpcInitial.localPort = tsDnodeShellPort;
rpcInitial.label = "SHELL";
rpcInitial.localPort = tsDnodeShellPort;
rpcInitial.label = "SHELL";
rpcInitial.numOfThreads = numOfThreads;
rpcInitial.cfp = dnodeProcessMsgFromShell;
rpcInitial.sessions = tsMaxShellConns;
rpcInitial.connType = TAOS_CONN_SERVER;
rpcInitial.idleTime = tsShellActivityTimer * 1000;
rpcInitial.afp = dnodeRetrieveUserAuthInfo;
rpcInitial.cfp = dnodeProcessMsgFromShell;
rpcInitial.sessions = tsMaxShellConns;
rpcInitial.connType = TAOS_CONN_SERVER;
rpcInitial.idleTime = tsShellActivityTimer * 1000;
rpcInitial.afp = dnodeRetrieveUserAuthInfo;
tsShellRpc = rpcOpen(&rpcInitial);
if (tsShellRpc == NULL) {
......@@ -113,11 +113,7 @@ void dnodeCleanupShell() {
}
static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
SRpcMsg rpcMsg = {
.handle = pMsg->handle,
.pCont = NULL,
.contLen = 0
};
SRpcMsg rpcMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0};
if (pMsg->pCont == NULL) return;
......@@ -148,9 +144,10 @@ static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
atomic_fetch_add_64(&tsQueryReqNum, 1);
} else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
atomic_fetch_add_64(&tsSubmitReqNum, 1);
} else {}
} else {
}
if ( dnodeProcessShellMsgFp[pMsg->msgType] ) {
if (dnodeProcessShellMsgFp[pMsg->msgType]) {
(*dnodeProcessShellMsgFp[pMsg->msgType])(pMsg);
} else {
dError("RPC %p, shell msg:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]);
......@@ -191,10 +188,10 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char
dDebug("user:%s, send auth msg to mnodes", user);
SRpcMsg rpcRsp = {0};
dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp);
dnodeSendMsgToMnodeRecvWithTimeout(&rpcMsg, &rpcRsp);
if (rpcRsp.code != 0) {
dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code));
dError("user:%s, auth msg received from mnodes, error:%s, may be timeout", user, tstrerror(rpcRsp.code));
} else {
SAuthRsp *pRsp = rpcRsp.pCont;
dDebug("user:%s, auth msg received from mnodes", user);
......@@ -211,7 +208,7 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char
void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid) {
dDebug("vgId:%d, tid:%d send config table msg to mnode", vgId, tid);
int32_t contLen = sizeof(SConfigTableMsg);
int32_t contLen = sizeof(SConfigTableMsg);
SConfigTableMsg *pMsg = rpcMallocCont(contLen);
pMsg->dnodeId = htonl(dnodeGetDnodeId());
......@@ -236,11 +233,12 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid) {
// delete this after debug finished
SMDCreateTableMsg *pTable = rpcRsp.pCont;
int16_t numOfColumns = htons(pTable->numOfColumns);
int16_t numOfTags = htons(pTable->numOfTags);
int32_t tableId = htonl(pTable->tid);
uint64_t uid = htobe64(pTable->uid);
dInfo("table:%s, numOfColumns:%d numOfTags:%d tid:%d uid:%" PRIu64, pTable->tableFname, numOfColumns, numOfTags, tableId, uid);
int16_t numOfColumns = htons(pTable->numOfColumns);
int16_t numOfTags = htons(pTable->numOfTags);
int32_t tableId = htonl(pTable->tid);
uint64_t uid = htobe64(pTable->uid);
dInfo("table:%s, numOfColumns:%d numOfTags:%d tid:%d uid:%" PRIu64, pTable->tableFname, numOfColumns, numOfTags,
tableId, uid);
return rpcRsp.pCont;
}
......@@ -250,9 +248,9 @@ SDnodeStatisInfo dnodeGetStatisInfo() {
SDnodeStatisInfo info = {0};
if (dnodeGetRunStatus() == TSDB_RUN_STATUS_RUNING) {
#ifdef HTTP_EMBEDDED
info.httpReqNum = httpGetReqCount();
info.httpReqNum = httpGetReqCount();
#endif
info.queryReqNum = atomic_exchange_64(&tsQueryReqNum, 0);
info.queryReqNum = atomic_exchange_64(&tsQueryReqNum, 0);
info.submitReqNum = atomic_exchange_64(&tsSubmitReqNum, 0);
}
......@@ -262,7 +260,7 @@ SDnodeStatisInfo dnodeGetStatisInfo() {
int32_t dnodeGetHttpStatusInfo(int32_t idx) {
int32_t httpStatus = 0;
#ifdef HTTP_EMBEDDED
httpStatus = httpGetStatusCodeCount(idx);
httpStatus = httpGetStatusCodeCount(idx);
#endif
return httpStatus;
}
......
......@@ -20,8 +20,8 @@
extern "C" {
#endif
#include "trpc.h"
#include "taosmsg.h"
#include "trpc.h"
#define MAX_HTTP_STATUS_CODE_NUM 63
typedef struct {
......@@ -50,6 +50,7 @@ int32_t dnodeStartMnode(SMInfos *pMinfos);
void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg);
void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp);
void dnodeSendMsgToMnodeRecvWithTimeout(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp);
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet);
void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid);
......@@ -71,7 +72,7 @@ void dnodeSendRpcMWriteRsp(void *pMsg, int32_t code);
void dnodeReprocessMWriteMsg(void *pMsg);
void dnodeDelayReprocessMWriteMsg(void *pMsg);
void dnodeSendStatusMsgToMnode();
void dnodeSendStatusMsgToMnode();
typedef struct {
char *name;
......
......@@ -23,28 +23,28 @@ extern "C" {
#include <stdint.h>
#include "taosdef.h"
#define TAOS_CONN_SERVER 0
#define TAOS_CONN_CLIENT 1
#define TAOS_CONN_SERVER 0
#define TAOS_CONN_CLIENT 1
extern int tsRpcHeadSize;
typedef struct SRpcEpSet {
int8_t inUse;
int8_t numOfEps;
uint16_t port[TSDB_MAX_REPLICA];
char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN];
int8_t inUse;
int8_t numOfEps;
uint16_t port[TSDB_MAX_REPLICA];
char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN];
} SRpcEpSet;
typedef struct SRpcCorEpSet {
int32_t version;
SRpcEpSet epSet;
int32_t version;
SRpcEpSet epSet;
} SRpcCorEpSet;
typedef struct SRpcConnInfo {
uint32_t clientIp;
uint16_t clientPort;
uint32_t serverIp;
char user[TSDB_USER_LEN];
uint32_t clientIp;
uint16_t clientPort;
uint32_t serverIp;
char user[TSDB_USER_LEN];
} SRpcConnInfo;
typedef struct SRpcMsg {
......@@ -57,46 +57,47 @@ typedef struct SRpcMsg {
} SRpcMsg;
typedef struct SRpcInit {
uint16_t localPort; // local port
char *label; // for debug purpose
int numOfThreads; // number of threads to handle connections
int sessions; // number of sessions allowed
int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
int idleTime; // milliseconds, 0 means idle timer is disabled
uint16_t localPort; // local port
char *label; // for debug purpose
int numOfThreads; // number of threads to handle connections
int sessions; // number of sessions allowed
int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
int idleTime; // milliseconds, 0 means idle timer is disabled
// the following is for client app ecurity only
char *user; // user name
char spi; // security parameter index
char encrypt; // encrypt algorithm
char *secret; // key for authentication
char *ckey; // ciphering key
char *user; // user name
char spi; // security parameter index
char encrypt; // encrypt algorithm
char *secret; // key for authentication
char *ckey; // ciphering key
// call back to process incoming msg, code shall be ignored by server app
void (*cfp)(SRpcMsg *, SRpcEpSet *);
void (*cfp)(SRpcMsg *, SRpcEpSet *);
// call back to retrieve the client auth info, for server app only
int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
// call back to retrieve the client auth info, for server app only
int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
} SRpcInit;
int32_t rpcInit();
void rpcCleanup();
void *rpcOpen(const SRpcInit *pRpc);
void rpcClose(void *);
void *rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont);
void *rpcReallocCont(void *ptr, int contLen);
TBOOL rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
void rpcSendResponse(const SRpcMsg *pMsg);
void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(int64_t rid);
int32_t rpcUnusedSession(void * rpcInfo, bool bLock);
// send rpc Refid connection probe alive message
bool rpcSendProbe(int64_t rpcRid, void* pPrevContext, bool *pReqOver);
void rpcCleanup();
void *rpcOpen(const SRpcInit *pRpc);
void rpcClose(void *);
void *rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont);
void *rpcReallocCont(void *ptr, int contLen);
TBOOL rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
void rpcSendResponse(const SRpcMsg *pMsg);
void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
void rpcSendRecvWithTimeout(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(int64_t rid);
int32_t rpcUnusedSession(void *rpcInfo, bool bLock);
// send rpc Refid connection probe alive message
bool rpcSendProbe(int64_t rpcRid, void *pPrevContext, bool *pReqOver);
// after sql request send , save conn info
bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext);
bool rpcSaveSendInfo(int64_t rpcRid, void **ppContext);
#ifdef __cplusplus
}
......
......@@ -20,42 +20,46 @@
extern "C" {
#endif
#if defined (_TD_DARWIN_64)
typedef struct tsem_s *tsem_t;
int tsem_init(tsem_t *sem, int pshared, unsigned int value);
int tsem_wait(tsem_t *sem);
int tsem_post(tsem_t *sem);
int tsem_destroy(tsem_t *sem);
#include <semaphore.h>
#if defined(_TD_DARWIN_64)
typedef struct tsem_s *tsem_t;
int tsem_init(tsem_t *sem, int pshared, unsigned int value);
int tsem_wait(tsem_t *sem);
int tsem_post(tsem_t *sem);
int tsem_timewait(tsem_t *sim, int64_t milis);
int tsem_destroy(tsem_t *sem);
#else
#define tsem_t sem_t
#define tsem_init sem_init
int tsem_wait(tsem_t* sem);
#define tsem_post sem_post
#define tsem_destroy sem_destroy
#define tsem_t sem_t
#define tsem_init sem_init
int tsem_wait(tsem_t *sem);
int tsem_timewait(tsem_t *sim, int64_t milis);
#define tsem_post sem_post
#define tsem_destroy sem_destroy
#endif
#if defined (_TD_DARWIN_64)
#define pthread_rwlock_t pthread_mutex_t
#define pthread_rwlock_init(lock, NULL) pthread_mutex_init(lock, NULL)
#define pthread_rwlock_destroy(lock) pthread_mutex_destroy(lock)
#define pthread_rwlock_wrlock(lock) pthread_mutex_lock(lock)
#define pthread_rwlock_rdlock(lock) pthread_mutex_lock(lock)
#define pthread_rwlock_unlock(lock) pthread_mutex_unlock(lock)
#if defined(_TD_DARWIN_64)
#define pthread_rwlock_t pthread_mutex_t
#define pthread_rwlock_init(lock, NULL) pthread_mutex_init(lock, NULL)
#define pthread_rwlock_destroy(lock) pthread_mutex_destroy(lock)
#define pthread_rwlock_wrlock(lock) pthread_mutex_lock(lock)
#define pthread_rwlock_rdlock(lock) pthread_mutex_lock(lock)
#define pthread_rwlock_unlock(lock) pthread_mutex_unlock(lock)
#define pthread_spinlock_t pthread_mutex_t
#define pthread_spin_init(lock, NULL) pthread_mutex_init(lock, NULL)
#define pthread_spin_destroy(lock) pthread_mutex_destroy(lock)
#define pthread_spin_lock(lock) pthread_mutex_lock(lock)
#define pthread_spin_unlock(lock) pthread_mutex_unlock(lock)
#define pthread_spinlock_t pthread_mutex_t
#define pthread_spin_init(lock, NULL) pthread_mutex_init(lock, NULL)
#define pthread_spin_destroy(lock) pthread_mutex_destroy(lock)
#define pthread_spin_lock(lock) pthread_mutex_lock(lock)
#define pthread_spin_unlock(lock) pthread_mutex_unlock(lock)
#endif
bool taosCheckPthreadValid(pthread_t thread);
int64_t taosGetSelfPthreadId();
int64_t taosGetPthreadId(pthread_t thread);
void taosResetPthread(pthread_t* thread);
void taosResetPthread(pthread_t *thread);
bool taosComparePthread(pthread_t first, pthread_t second);
int32_t taosGetPId();
int32_t taosGetCurrentAPPName(char* name, int32_t* len);
int32_t taosGetCurrentAPPName(char *name, int32_t *len);
#ifdef __cplusplus
}
......
......@@ -195,6 +195,7 @@ int tsem_wait(tsem_t *sem) {
#endif // SEM_USE_PTHREAD
}
int tsem_post(tsem_t *sem) {
if (!*sem) {
fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", basename(__FILE__), __LINE__, __func__, sem);
......
......@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#include "os.h"
#if !defined (_TD_DARWIN_64)
#if !defined(_TD_DARWIN_64)
int32_t tsem_wait(tsem_t* sem) {
int ret = 0;
......@@ -28,7 +28,25 @@ int32_t tsem_wait(tsem_t* sem) {
#endif
#if !(defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) || defined (_TD_DARWIN_64))
#if !(defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) || defined(_TD_DARWIN_64))
int32_t tsem_timewait(tsem_t* sem, int64_t ms) {
int ret = 0;
struct timespec ts = {0};
if (clock_gettime(CLOCK_REALTIME, &ts) == -1) {
return -1;
}
ts.tv_nsec += ms * 1000000;
ts.tv_sec += ts.tv_nsec / 1000000000;
ts.tv_nsec %= 1000000000;
while ((ret = sem_timedwait(sem, &ts)) == -1 && errno == EINTR) continue;
return ret;
}
bool taosCheckPthreadValid(pthread_t thread) { return thread != 0; }
......@@ -40,13 +58,13 @@ int64_t taosGetSelfPthreadId() {
}
int64_t taosGetPthreadId(pthread_t thread) { return (int64_t)thread; }
void taosResetPthread(pthread_t *thread) { *thread = 0; }
void taosResetPthread(pthread_t* thread) { *thread = 0; }
bool taosComparePthread(pthread_t first, pthread_t second) { return first == second; }
int32_t taosGetPId() { return getpid(); }
int32_t taosGetCurrentAPPName(char *name, int32_t* len) {
int32_t taosGetCurrentAPPName(char* name, int32_t* len) {
const char* self = "/proc/self/exe";
char path[PATH_MAX] = {0};
char path[PATH_MAX] = {0};
if (readlink(self, path, PATH_MAX) <= 0) {
return -1;
......@@ -70,3 +88,44 @@ int32_t taosGetCurrentAPPName(char *name, int32_t* len) {
}
#endif
#if (defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32))
int32_t tsem_timewait(tsem_t* sem, int64_t ms) {
return tsem_wait(sem);
// struct timespec ts;
// taosClockGetTime(0, &ts);
// ts.tv_nsec += ms * 1000000;
// ts.tv_sec += ts.tv_nsec / 1000000000;
// ts.tv_nsec %= 1000000000;
// int rc;
// while ((rc = sem_timedwait(sem, &ts)) == -1 && errno == EINTR) continue;
// return rc;
// /* This should have timed out */
// // ASSERT(errno == ETIMEDOUT);
// // ASSERT(rc != 0);
// // GetSystemTimeAsFileTime(&ft_after);
// // // We specified a non-zero wait. Time must advance.
// // if (ft_before.dwLowDateTime == ft_after.dwLowDateTime && ft_before.dwHighDateTime == ft_after.dwHighDateTime)
// // {
// // printf("nanoseconds: %d, rc: %d, code:0x%x. before filetime: %d, %d; after filetime: %d, %d\n",
// // nanosecs, rc, errno,
// // (int)ft_before.dwLowDateTime, (int)ft_before.dwHighDateTime,
// // (int)ft_after.dwLowDateTime, (int)ft_after.dwHighDateTime);
// // printf("time must advance during sem_timedwait.");
// // return 1;
// // }
}
#endif
// #if defined(_TD_DARWIN_64)
// int tsem_timewait(tsem_t* psem, int64_t milis) {
// if (psem == NULL || *psem == NULL) return -1;
// dispatch_time_t time = dispatch_time(DISPATCH_TIME_NOW, (int64_t)(milis * USEC_PER_SEC));
// dispatch_semaphore_wait(*psem, time);
// return 0;
// }
// #endif
\ No newline at end of file
......@@ -561,6 +561,37 @@ void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp)
return;
}
void rpcSendRecvWithTimeout(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
SRpcReqContext *pContext;
pContext = (SRpcReqContext *)((char *)pMsg->pCont - sizeof(SRpcHead) - sizeof(SRpcReqContext));
memset(pRsp, 0, sizeof(SRpcMsg));
tsem_t sem;
tsem_init(&sem, 0, 0);
pContext->pSem = &sem;
pContext->pRsp = pRsp;
pContext->pSet = pEpSet;
int64_t rid = 0;
rpcSendRequest(shandle, pEpSet, pMsg, &rid);
#if defined (LINUX)
if (tsem_timewait(&sem, 3 * 1000) == 0) {
// do nothing
} else {
rpcCancelRequest(rid);
pRsp->code = -1;
}
#else
tsem_wait(&sem);
#endif
tsem_destroy(&sem);
return;
}
// this API is used by server app to keep an APP context in case connection is broken
int rpcReportProgress(void *handle, char *pCont, int contLen) {
SRpcConn *pConn = (SRpcConn *)handle;
......@@ -1469,7 +1500,7 @@ static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
if (pConn->connType != RPC_CONN_TCPC) {
pContext->code = terrno;
return BOOL_ASYNC;
}
}
// try next ip again
pContext->code = terrno;
// in rpcProcessConnError if numOfTry over limit, could call rpcNotifyClient to stop query
......@@ -1503,7 +1534,7 @@ static bool rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
if (writtenLen != msgLen) {
tError("%s, failed to send, msgLen:%d written:%d, reason:%s", pConn->info, msgLen, writtenLen, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
terrno = TAOS_SYSTEM_ERROR(errno);
ret = false;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册