提交 b6225968 编写于 作者: J jtao1735

second draft

上级 fb27589c
...@@ -298,7 +298,6 @@ typedef struct STscObj { ...@@ -298,7 +298,6 @@ typedef struct STscObj {
char sversion[TSDB_VERSION_LEN]; char sversion[TSDB_VERSION_LEN];
char writeAuth : 1; char writeAuth : 1;
char superAuth : 1; char superAuth : 1;
void* pMgmtConn;
struct SSqlObj * pSql; struct SSqlObj * pSql;
struct SSqlObj * pHb; struct SSqlObj * pHb;
struct SSqlObj * sqlList; struct SSqlObj * sqlList;
...@@ -358,7 +357,7 @@ typedef struct SSqlStream { ...@@ -358,7 +357,7 @@ typedef struct SSqlStream {
struct SSqlStream *prev, *next; struct SSqlStream *prev, *next;
} SSqlStream; } SSqlStream;
int32_t tscInitRpc(const char *user, const char *secret, void** pMgmtConn); int32_t tscInitRpc(const char *user, const char *secret);
void tscInitMsgsFp(); void tscInitMsgsFp();
int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion); int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion);
...@@ -425,7 +424,7 @@ void tscQueueAsyncFreeResult(SSqlObj *pSql); ...@@ -425,7 +424,7 @@ void tscQueueAsyncFreeResult(SSqlObj *pSql);
int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo); int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
char * tscGetResultColumnChr(SSqlRes *pRes, SQueryInfo *pQueryInfo, int32_t column); char * tscGetResultColumnChr(SSqlRes *pRes, SQueryInfo *pQueryInfo, int32_t column);
extern void * pVnodeConn; extern void * pDnodeConn;
extern void * tscCacheHandle; extern void * tscCacheHandle;
extern void * tscTmr; extern void * tscTmr;
extern void * tscQhandle; extern void * tscQhandle;
......
...@@ -191,7 +191,6 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { ...@@ -191,7 +191,6 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
} }
int tscSendMsgToServer(SSqlObj *pSql) { int tscSendMsgToServer(SSqlObj *pSql) {
STscObj* pObj = pSql->pTscObj;
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
char *pMsg = rpcMallocCont(pCmd->payloadLen); char *pMsg = rpcMallocCont(pCmd->payloadLen);
...@@ -201,30 +200,22 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -201,30 +200,22 @@ int tscSendMsgToServer(SSqlObj *pSql) {
} }
if (pSql->cmd.command < TSDB_SQL_MGMT) { if (pSql->cmd.command < TSDB_SQL_MGMT) {
tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList.port);
memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen); memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen);
} else {
pSql->ipList = tscMgmtIpSet;
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
}
tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList.port);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.msgType = pSql->cmd.msgType, .msgType = pSql->cmd.msgType,
.pCont = pMsg, .pCont = pMsg,
.contLen = pSql->cmd.payloadLen, .contLen = pSql->cmd.payloadLen,
.handle = pSql, .handle = pSql,
.code = 0 .code = 0
}; };
rpcSendRequest(pVnodeConn, &pSql->ipList, &rpcMsg); rpcSendRequest(pDnodeConn, &pSql->ipList, &rpcMsg);
} else {
pSql->ipList = tscMgmtIpSet;
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
SRpcMsg rpcMsg = {
.msgType = pSql->cmd.msgType,
.pCont = pMsg,
.contLen = pSql->cmd.payloadLen,
.handle = pSql,
.code = 0
};
tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]);
rpcSendRequest(pObj->pMgmtConn, &pSql->ipList, &rpcMsg);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -66,8 +66,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con ...@@ -66,8 +66,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
return NULL; return NULL;
} }
void* pMgmtConn = NULL; if (tscInitRpc(user, pass) != 0) {
if (tscInitRpc(user, pass, &pMgmtConn) != 0) {
terrno = TSDB_CODE_NETWORK_UNAVAIL; terrno = TSDB_CODE_NETWORK_UNAVAIL;
return NULL; return NULL;
} }
...@@ -119,7 +118,6 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con ...@@ -119,7 +118,6 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
strtolower(pObj->db, tmp); strtolower(pObj->db, tmp);
} }
pObj->pMgmtConn = pMgmtConn;
pthread_mutex_init(&pObj->mutex, NULL); pthread_mutex_init(&pObj->mutex, NULL);
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
......
...@@ -30,7 +30,7 @@ ...@@ -30,7 +30,7 @@
#include "tlocale.h" #include "tlocale.h"
// global, not configurable // global, not configurable
void * pVnodeConn; void * pDnodeConn;
void * tscCacheHandle; void * tscCacheHandle;
void * tscTmr; void * tscTmr;
void * tscQhandle; void * tscQhandle;
...@@ -48,12 +48,12 @@ void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) { ...@@ -48,12 +48,12 @@ void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) {
taosTmrReset(tscCheckDiskUsage, 1000, NULL, tscTmr, &tscCheckDiskUsageTmr); taosTmrReset(tscCheckDiskUsage, 1000, NULL, tscTmr, &tscCheckDiskUsageTmr);
} }
int32_t tscInitRpc(const char *user, const char *secret, void** pMgmtConn) { int32_t tscInitRpc(const char *user, const char *secret) {
SRpcInit rpcInit; SRpcInit rpcInit;
char secretEncrypt[32] = {0}; char secretEncrypt[32] = {0};
taosEncryptPass((uint8_t *)secret, strlen(secret), secretEncrypt); taosEncryptPass((uint8_t *)secret, strlen(secret), secretEncrypt);
if (pVnodeConn == NULL) { if (pDnodeConn == NULL) {
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 0; rpcInit.localPort = 0;
rpcInit.label = "TSC-vnode"; rpcInit.label = "TSC-vnode";
...@@ -66,35 +66,13 @@ int32_t tscInitRpc(const char *user, const char *secret, void** pMgmtConn) { ...@@ -66,35 +66,13 @@ int32_t tscInitRpc(const char *user, const char *secret, void** pMgmtConn) {
rpcInit.ckey = "key"; rpcInit.ckey = "key";
rpcInit.secret = secretEncrypt; rpcInit.secret = secretEncrypt;
pVnodeConn = rpcOpen(&rpcInit); pDnodeConn = rpcOpen(&rpcInit);
if (pVnodeConn == NULL) { if (pDnodeConn == NULL) {
tscError("failed to init connection to vnode"); tscError("failed to init connection to vnode");
return -1; return -1;
} }
} }
if (*pMgmtConn == NULL) {
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 0;
rpcInit.label = "TSC-mgmt";
rpcInit.numOfThreads = 1;
rpcInit.cfp = tscProcessMsgFromServer;
rpcInit.ufp = tscUpdateIpSet;
rpcInit.sessions = tsMaxMgmtConnections;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = 2000;
rpcInit.user = (char*)user;
rpcInit.ckey = "key";
rpcInit.spi = 1;
rpcInit.secret = secretEncrypt;
*pMgmtConn = rpcOpen(&rpcInit);
if (*pMgmtConn == NULL) {
tscError("failed to init connection to mgmt");
return -1;
}
}
return 0; return 0;
} }
...@@ -190,9 +168,9 @@ void taos_cleanup() { ...@@ -190,9 +168,9 @@ void taos_cleanup() {
taosCloseLog(); taosCloseLog();
if (pVnodeConn != NULL) { if (pDnodeConn != NULL) {
rpcClose(pVnodeConn); rpcClose(pDnodeConn);
pVnodeConn = NULL; pDnodeConn = NULL;
} }
taosTmrCleanUp(tscTmr); taosTmrCleanUp(tscTmr);
......
...@@ -760,7 +760,6 @@ void tscCloseTscObj(STscObj* pObj) { ...@@ -760,7 +760,6 @@ void tscCloseTscObj(STscObj* pObj) {
if (pSql) { if (pSql) {
sem_destroy(&pSql->rspSem); sem_destroy(&pSql->rspSem);
} }
rpcClose(pObj->pMgmtConn);
pthread_mutex_destroy(&pObj->mutex); pthread_mutex_destroy(&pObj->mutex);
......
...@@ -1244,7 +1244,6 @@ bool taosCheckGlobalCfg() { ...@@ -1244,7 +1244,6 @@ bool taosCheckGlobalCfg() {
tsVersion = 10 * tsVersion; tsVersion = 10 * tsVersion;
tsMnodeShellPort = tsServerPort + TSDB_PORT_MNODESHELL; // udp[6030-6034] tcp[6030]
tsDnodeShellPort = tsServerPort + TSDB_PORT_DNODESHELL; // udp[6035-6039] tcp[6035] tsDnodeShellPort = tsServerPort + TSDB_PORT_DNODESHELL; // udp[6035-6039] tcp[6035]
tsMnodeDnodePort = tsServerPort + TSDB_PORT_MNODEDNODE; // udp/tcp tsMnodeDnodePort = tsServerPort + TSDB_PORT_MNODEDNODE; // udp/tcp
tsDnodeMnodePort = tsServerPort + TSDB_PORT_DNODEMNODE; // udp/tcp tsDnodeMnodePort = tsServerPort + TSDB_PORT_DNODEMNODE; // udp/tcp
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "os.h" #include "os.h"
#include "taosdef.h" #include "taosdef.h"
#include "tglobal.h" #include "tglobal.h"
#include "trpc.h"
#include "mnode.h" #include "mnode.h"
#include "http.h" #include "http.h"
#include "monitor.h" #include "monitor.h"
......
...@@ -34,6 +34,8 @@ static void * tsDnodeShellRpc = NULL; ...@@ -34,6 +34,8 @@ static void * tsDnodeShellRpc = NULL;
static int32_t tsDnodeQueryReqNum = 0; static int32_t tsDnodeQueryReqNum = 0;
static int32_t tsDnodeSubmitReqNum = 0; static int32_t tsDnodeSubmitReqNum = 0;
void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg);
int32_t dnodeInitShell() { int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeWrite; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeWrite;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeRead; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeRead;
...@@ -47,8 +49,8 @@ int32_t dnodeInitShell() { ...@@ -47,8 +49,8 @@ int32_t dnodeInitShell() {
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = tsDnodeShellPort; rpcInit.localPort = tsMnodeShellPort;
rpcInit.label = "DND-shell"; rpcInit.label = "SHELL";
rpcInit.numOfThreads = numOfThreads; rpcInit.numOfThreads = numOfThreads;
rpcInit.cfp = dnodeProcessMsgFromShell; rpcInit.cfp = dnodeProcessMsgFromShell;
rpcInit.sessions = TSDB_SESSIONS_PER_DNODE; rpcInit.sessions = TSDB_SESSIONS_PER_DNODE;
...@@ -96,13 +98,11 @@ void dnodeProcessMsgFromShell(SRpcMsg *pMsg) { ...@@ -96,13 +98,11 @@ void dnodeProcessMsgFromShell(SRpcMsg *pMsg) {
if ( dnodeProcessShellMsgFp[pMsg->msgType] ) { if ( dnodeProcessShellMsgFp[pMsg->msgType] ) {
(*dnodeProcessShellMsgFp[pMsg->msgType])(pMsg); (*dnodeProcessShellMsgFp[pMsg->msgType])(pMsg);
} else { } else {
dError("RPC %p, msg:%s from shell is not handled", pMsg->handle, taosMsg[pMsg->msgType]); mgmtProcessMsgFromShell(pMsg);
rpcMsg.code = TSDB_CODE_MSG_NOT_PROCESSED;
rpcSendResponse(&rpcMsg);
rpcFreeCont(pMsg->pCont);
} }
} }
static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) { static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -26,6 +26,8 @@ void mgmtCleanUpSystem(); ...@@ -26,6 +26,8 @@ void mgmtCleanUpSystem();
void mgmtStopSystem(); void mgmtStopSystem();
void sdbUpdateSync(); void sdbUpdateSync();
void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -329,8 +329,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); ...@@ -329,8 +329,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_MAX_NORMAL_TABLES 1000 #define TSDB_MAX_NORMAL_TABLES 1000
#define TSDB_MAX_CHILD_TABLES 100000 #define TSDB_MAX_CHILD_TABLES 100000
#define TSDB_PORT_MNODESHELL 0 #define TSDB_PORT_DNODESHELL 0
#define TSDB_PORT_DNODESHELL 5
#define TSDB_PORT_DNODEMNODE 10 #define TSDB_PORT_DNODEMNODE 10
#define TSDB_PORT_MNODEDNODE 15 #define TSDB_PORT_MNODEDNODE 15
#define TSDB_PORT_SYNC 20 #define TSDB_PORT_SYNC 20
......
...@@ -41,9 +41,9 @@ ...@@ -41,9 +41,9 @@
typedef int32_t (*SShowMetaFp)(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); typedef int32_t (*SShowMetaFp)(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn); typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey); //static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey);
static bool mgmtCheckMsgReadOnly(SQueuedMsg *pMsg); static bool mgmtCheckMsgReadOnly(SQueuedMsg *pMsg);
static void mgmtProcessMsgFromShell(SRpcMsg *pMsg); //static void mgmtProcessMsgFromShell(SRpcMsg *pMsg);
static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg); static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg);
static void mgmtProcessShowMsg(SQueuedMsg *queuedMsg); static void mgmtProcessShowMsg(SQueuedMsg *queuedMsg);
static void mgmtProcessRetrieveMsg(SQueuedMsg *queuedMsg); static void mgmtProcessRetrieveMsg(SQueuedMsg *queuedMsg);
...@@ -52,7 +52,7 @@ static void mgmtProcessConnectMsg(SQueuedMsg *queuedMsg); ...@@ -52,7 +52,7 @@ static void mgmtProcessConnectMsg(SQueuedMsg *queuedMsg);
static void mgmtProcessUseMsg(SQueuedMsg *queuedMsg); static void mgmtProcessUseMsg(SQueuedMsg *queuedMsg);
void *tsMgmtTmr; void *tsMgmtTmr;
static void *tsMgmtShellRpc = NULL; //static void *tsMgmtShellRpc = NULL;
static void *tsMgmtTranQhandle = NULL; static void *tsMgmtTranQhandle = NULL;
static void (*tsMgmtProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SQueuedMsg *) = {0}; static void (*tsMgmtProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SQueuedMsg *) = {0};
static void *tsQhandleCache = NULL; static void *tsQhandleCache = NULL;
...@@ -70,28 +70,6 @@ int32_t mgmtInitShell() { ...@@ -70,28 +70,6 @@ int32_t mgmtInitShell() {
tsMgmtTranQhandle = taosInitScheduler(tsMaxShellConns, 1, "mnodeT"); tsMgmtTranQhandle = taosInitScheduler(tsMaxShellConns, 1, "mnodeT");
tsQhandleCache = taosCacheInit(tsMgmtTmr, 2); tsQhandleCache = taosCacheInit(tsMgmtTmr, 2);
int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore / 4.0;
if (numOfThreads < 1) {
numOfThreads = 1;
}
SRpcInit rpcInit = {0};
rpcInit.localPort = tsMnodeShellPort;
rpcInit.label = "MND-shell";
rpcInit.numOfThreads = numOfThreads;
rpcInit.cfp = mgmtProcessMsgFromShell;
rpcInit.sessions = tsMaxShellConns;
rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.afp = mgmtShellRetriveAuth;
tsMgmtShellRpc = rpcOpen(&rpcInit);
if (tsMgmtShellRpc == NULL) {
mError("failed to init server connection to shell");
return -1;
}
mPrint("server connection to shell is opened");
return 0; return 0;
} }
...@@ -101,12 +79,6 @@ void mgmtCleanUpShell() { ...@@ -101,12 +79,6 @@ void mgmtCleanUpShell() {
tsMgmtTranQhandle = NULL; tsMgmtTranQhandle = NULL;
} }
if (tsMgmtShellRpc) {
rpcClose(tsMgmtShellRpc);
tsMgmtShellRpc = NULL;
mPrint("server connection to shell is closed");
}
if (tsQhandleCache) { if (tsQhandleCache) {
taosCacheEmpty(tsQhandleCache); taosCacheEmpty(tsQhandleCache);
taosCacheCleanup(tsQhandleCache); taosCacheCleanup(tsQhandleCache);
...@@ -148,7 +120,7 @@ void mgmtDealyedAddToShellQueue(SQueuedMsg *queuedMsg) { ...@@ -148,7 +120,7 @@ void mgmtDealyedAddToShellQueue(SQueuedMsg *queuedMsg) {
taosTmrReset(mgmtDoDealyedAddToShellQueue, 1000, queuedMsg, tsMgmtTmr, &unUsed); taosTmrReset(mgmtDoDealyedAddToShellQueue, 1000, queuedMsg, tsMgmtTmr, &unUsed);
} }
static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
assert(rpcMsg); assert(rpcMsg);
if (rpcMsg->pCont == NULL) { if (rpcMsg->pCont == NULL) {
...@@ -370,6 +342,7 @@ static void mgmtProcessHeartBeatMsg(SQueuedMsg *pMsg) { ...@@ -370,6 +342,7 @@ static void mgmtProcessHeartBeatMsg(SQueuedMsg *pMsg) {
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
/*
static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) { static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
*spi = 1; *spi = 1;
*encrypt = 0; *encrypt = 0;
...@@ -390,6 +363,7 @@ static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secr ...@@ -390,6 +363,7 @@ static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secr
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
*/
static void mgmtProcessConnectMsg(SQueuedMsg *pMsg) { static void mgmtProcessConnectMsg(SQueuedMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册