提交 cee2e18b 编写于 作者: S slguan

change message name

上级 cdee51f6
...@@ -209,16 +209,16 @@ void tscKillStream(STscObj *pObj, uint32_t killId) { ...@@ -209,16 +209,16 @@ void tscKillStream(STscObj *pObj, uint32_t killId) {
} }
char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) { char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) {
SCMQqueryList *pQList = (SCMQqueryList *)pMsg; SQqueryList *pQList = (SQqueryList *)pMsg;
char * pMax = pMsg + TSDB_PAYLOAD_SIZE - 256; char * pMax = pMsg + TSDB_PAYLOAD_SIZE - 256;
SCMQueryDesc *pQdesc = pQList->qdesc; SQueryDesc *pQdesc = pQList->qdesc;
pQList->numOfQueries = 0; pQList->numOfQueries = 0;
// We extract the lock to tscBuildHeartBeatMsg function. // We extract the lock to tscBuildHeartBeatMsg function.
/* pthread_mutex_lock (&pObj->mutex); */ /* pthread_mutex_lock (&pObj->mutex); */
pMsg += sizeof(SCMQqueryList); pMsg += sizeof(SQqueryList);
SSqlObj *pSql = pObj->sqlList; SSqlObj *pSql = pObj->sqlList;
while (pSql) { while (pSql) {
/* /*
...@@ -239,15 +239,15 @@ char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) { ...@@ -239,15 +239,15 @@ char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) {
pQList->numOfQueries++; pQList->numOfQueries++;
pQdesc++; pQdesc++;
pSql = pSql->next; pSql = pSql->next;
pMsg += sizeof(SCMQueryDesc); pMsg += sizeof(SQueryDesc);
if (pMsg > pMax) break; if (pMsg > pMax) break;
} }
SCMStreamList *pSList = (SCMStreamList *)pMsg; SStreamList *pSList = (SStreamList *)pMsg;
SCMStreamDesc *pSdesc = pSList->sdesc; SStreamDesc *pSdesc = pSList->sdesc;
pSList->numOfStreams = 0; pSList->numOfStreams = 0;
pMsg += sizeof(SCMStreamList); pMsg += sizeof(SStreamList);
SSqlStream *pStream = pObj->streamList; SSqlStream *pStream = pObj->streamList;
while (pStream) { while (pStream) {
strncpy(pSdesc->sql, pStream->pSql->sqlstr, TSDB_SHOW_SQL_LEN - 1); strncpy(pSdesc->sql, pStream->pSql->sqlstr, TSDB_SHOW_SQL_LEN - 1);
...@@ -265,7 +265,7 @@ char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) { ...@@ -265,7 +265,7 @@ char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) {
pSList->numOfStreams++; pSList->numOfStreams++;
pSdesc++; pSdesc++;
pStream = pStream->next; pStream = pStream->next;
pMsg += sizeof(SCMStreamDesc); pMsg += sizeof(SStreamDesc);
if (pMsg > pMax) break; if (pMsg > pMax) break;
} }
......
...@@ -116,7 +116,7 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { ...@@ -116,7 +116,7 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
if (code == 0) { if (code == 0) {
SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp; SHeartBeatRsp *pRsp = (SHeartBeatRsp *)pRes->pRsp;
SRpcIpSet * pIpList = &pRsp->ipList; SRpcIpSet * pIpList = &pRsp->ipList;
tscSetMgmtIpList(pIpList); tscSetMgmtIpList(pIpList);
...@@ -1693,13 +1693,13 @@ int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1693,13 +1693,13 @@ int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pCmd->payloadLen = sizeof(SCMCreateAcctMsg); pCmd->payloadLen = sizeof(SCreateAcctMsg);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
tscError("%p failed to malloc for query msg", pSql); tscError("%p failed to malloc for query msg", pSql);
return TSDB_CODE_CLI_OUT_OF_MEMORY; return TSDB_CODE_CLI_OUT_OF_MEMORY;
} }
SCMCreateAcctMsg *pAlterMsg = (SCMCreateAcctMsg *)pCmd->payload; SCreateAcctMsg *pAlterMsg = (SCreateAcctMsg *)pCmd->payload;
SSQLToken *pName = &pInfo->pDCLInfo->user.user; SSQLToken *pName = &pInfo->pDCLInfo->user.user;
SSQLToken *pPwd = &pInfo->pDCLInfo->user.passwd; SSQLToken *pPwd = &pInfo->pDCLInfo->user.passwd;
...@@ -1737,13 +1737,13 @@ int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1737,13 +1737,13 @@ int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
} }
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SCMCreateUserMsg *pAlterMsg; SCreateUserMsg *pAlterMsg;
char * pMsg, *pStart; char * pMsg, *pStart;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pMsg = doBuildMsgHeader(pSql, &pStart); pMsg = doBuildMsgHeader(pSql, &pStart);
pAlterMsg = (SCMCreateUserMsg *)pMsg; pAlterMsg = (SCreateUserMsg *)pMsg;
SUserInfo *pUser = &pInfo->pDCLInfo->user; SUserInfo *pUser = &pInfo->pDCLInfo->user;
strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n); strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
...@@ -1758,7 +1758,7 @@ int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1758,7 +1758,7 @@ int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n); strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
} }
pMsg += sizeof(SCMCreateUserMsg); pMsg += sizeof(SCreateUserMsg);
pCmd->payloadLen = pMsg - pStart; pCmd->payloadLen = pMsg - pStart;
if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) { if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
...@@ -1871,18 +1871,18 @@ int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1871,18 +1871,18 @@ int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
} }
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SCMDropUserMsg *pDropMsg; SDropUserMsg *pDropMsg;
char * pMsg, *pStart; char * pMsg, *pStart;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pMsg = doBuildMsgHeader(pSql, &pStart); pMsg = doBuildMsgHeader(pSql, &pStart);
pDropMsg = (SCMDropUserMsg *)pMsg; pDropMsg = (SDropUserMsg *)pMsg;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
strcpy(pDropMsg->user, pMeterMetaInfo->name); strcpy(pDropMsg->user, pMeterMetaInfo->name);
pMsg += sizeof(SCMDropUserMsg); pMsg += sizeof(SDropUserMsg);
pCmd->payloadLen = pMsg - pStart; pCmd->payloadLen = pMsg - pStart;
pCmd->msgType = TSDB_MSG_TYPE_DROP_USER; pCmd->msgType = TSDB_MSG_TYPE_DROP_USER;
...@@ -2334,14 +2334,14 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -2334,14 +2334,14 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
STscObj *pObj = pSql->pTscObj; STscObj *pObj = pSql->pTscObj;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pCmd->msgType = TSDB_MSG_TYPE_CONNECT; pCmd->msgType = TSDB_MSG_TYPE_CONNECT;
pCmd->payloadLen = sizeof(SCMConnectMsg); pCmd->payloadLen = sizeof(SConnectMsg);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
tscError("%p failed to malloc for query msg", pSql); tscError("%p failed to malloc for query msg", pSql);
return TSDB_CODE_CLI_OUT_OF_MEMORY; return TSDB_CODE_CLI_OUT_OF_MEMORY;
} }
SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload; SConnectMsg *pConnect = (SConnectMsg*)pCmd->payload;
char *db; // ugly code to move the space char *db; // ugly code to move the space
db = strstr(pObj->db, TS_PATH_DELIMITER); db = strstr(pObj->db, TS_PATH_DELIMITER);
...@@ -2611,18 +2611,18 @@ int tscEstimateHeartBeatMsgLength(SSqlObj *pSql) { ...@@ -2611,18 +2611,18 @@ int tscEstimateHeartBeatMsgLength(SSqlObj *pSql) {
STscObj *pObj = pSql->pTscObj; STscObj *pObj = pSql->pTscObj;
size += tsRpcHeadSize + sizeof(SMgmtHead); size += tsRpcHeadSize + sizeof(SMgmtHead);
size += sizeof(SCMQqueryList); size += sizeof(SQqueryList);
SSqlObj *tpSql = pObj->sqlList; SSqlObj *tpSql = pObj->sqlList;
while (tpSql) { while (tpSql) {
size += sizeof(SCMQueryDesc); size += sizeof(SQueryDesc);
tpSql = tpSql->next; tpSql = tpSql->next;
} }
size += sizeof(SCMStreamList); size += sizeof(SStreamList);
SSqlStream *pStream = pObj->streamList; SSqlStream *pStream = pObj->streamList;
while (pStream) { while (pStream) {
size += sizeof(SCMStreamDesc); size += sizeof(SStreamDesc);
pStream = pStream->next; pStream = pStream->next;
} }
...@@ -3043,7 +3043,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) { ...@@ -3043,7 +3043,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
STscObj *pObj = pSql->pTscObj; STscObj *pObj = pSql->pTscObj;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp; SConnectRsp *pConnect = (SConnectRsp *)pRes->pRsp;
strcpy(pObj->acctId, pConnect->acctId); // copy acctId from response strcpy(pObj->acctId, pConnect->acctId); // copy acctId from response
int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db); int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);
...@@ -3051,7 +3051,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) { ...@@ -3051,7 +3051,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
strncpy(pObj->db, temp, tListLen(pObj->db)); strncpy(pObj->db, temp, tListLen(pObj->db));
// SIpList * pIpList; // SIpList * pIpList;
// char *rsp = pRes->pRsp + sizeof(SCMConnectRsp); // char *rsp = pRes->pRsp + sizeof(SConnectRsp);
// pIpList = (SIpList *)rsp; // pIpList = (SIpList *)rsp;
// tscSetMgmtIpList(pIpList); // tscSetMgmtIpList(pIpList);
......
...@@ -257,8 +257,8 @@ typedef struct _user_obj { ...@@ -257,8 +257,8 @@ typedef struct _user_obj {
char updateEnd[1]; char updateEnd[1];
struct _user_obj *prev, *next; struct _user_obj *prev, *next;
struct _acctObj * pAcct; struct _acctObj * pAcct;
SCMQqueryList * pQList; // query list SQqueryList * pQList; // query list
SCMStreamList * pSList; // stream list SStreamList * pSList; // stream list
} SUserObj; } SUserObj;
typedef struct { typedef struct {
...@@ -282,7 +282,7 @@ typedef struct { ...@@ -282,7 +282,7 @@ typedef struct {
typedef struct _acctObj { typedef struct _acctObj {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN]; char pass[TSDB_KEY_LEN];
SCMAcctCfg cfg; SAcctCfg cfg;
int32_t acctId; int32_t acctId;
int64_t createdTime; int64_t createdTime;
int8_t reserved[15]; int8_t reserved[15];
......
...@@ -315,7 +315,7 @@ typedef struct { ...@@ -315,7 +315,7 @@ typedef struct {
char clientVersion[TSDB_VERSION_LEN]; char clientVersion[TSDB_VERSION_LEN];
char msgVersion[TSDB_VERSION_LEN]; char msgVersion[TSDB_VERSION_LEN];
char db[TSDB_TABLE_ID_LEN]; char db[TSDB_TABLE_ID_LEN];
} SCMConnectMsg; } SConnectMsg;
typedef struct { typedef struct {
char acctId[TSDB_ACCT_LEN]; char acctId[TSDB_ACCT_LEN];
...@@ -323,7 +323,7 @@ typedef struct { ...@@ -323,7 +323,7 @@ typedef struct {
int8_t writeAuth; int8_t writeAuth;
int8_t superAuth; int8_t superAuth;
SRpcIpSet ipList; SRpcIpSet ipList;
} SCMConnectRsp; } SConnectRsp;
typedef struct { typedef struct {
int32_t maxUsers; int32_t maxUsers;
...@@ -337,24 +337,24 @@ typedef struct { ...@@ -337,24 +337,24 @@ typedef struct {
int64_t maxInbound; int64_t maxInbound;
int64_t maxOutbound; int64_t maxOutbound;
int8_t accessState; // Configured only by command int8_t accessState; // Configured only by command
} SCMAcctCfg; } SAcctCfg;
typedef struct { typedef struct {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN]; char pass[TSDB_KEY_LEN];
SCMAcctCfg cfg; SAcctCfg cfg;
} SCMCreateAcctMsg, SCMAlterAcctMsg; } SCreateAcctMsg, SAlterAcctMsg;
typedef struct { typedef struct {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
} SCMDropUserMsg, SCMDropAcctMsg; } SDropUserMsg, SDropAcctMsg;
typedef struct { typedef struct {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN]; char pass[TSDB_KEY_LEN];
int8_t privilege; int8_t privilege;
int8_t flag; int8_t flag;
} SCMCreateUserMsg, SCMAlterUserMsg; } SCreateUserMsg, SAlterUserMsg;
typedef struct { typedef struct {
char db[TSDB_TABLE_ID_LEN]; char db[TSDB_TABLE_ID_LEN];
...@@ -785,7 +785,7 @@ typedef struct { ...@@ -785,7 +785,7 @@ typedef struct {
uint32_t queryId; uint32_t queryId;
int64_t useconds; int64_t useconds;
int64_t stime; int64_t stime;
} SCMQueryDesc; } SQueryDesc;
typedef struct { typedef struct {
char sql[TSDB_SHOW_SQL_LEN]; char sql[TSDB_SHOW_SQL_LEN];
...@@ -796,29 +796,29 @@ typedef struct { ...@@ -796,29 +796,29 @@ typedef struct {
int64_t stime; int64_t stime;
int64_t slidingTime; int64_t slidingTime;
int64_t interval; int64_t interval;
} SCMStreamDesc; } SStreamDesc;
typedef struct { typedef struct {
int32_t numOfQueries; int32_t numOfQueries;
SCMQueryDesc qdesc[]; SQueryDesc qdesc[];
} SCMQqueryList; } SQqueryList;
typedef struct { typedef struct {
int32_t numOfStreams; int32_t numOfStreams;
SCMStreamDesc sdesc[]; SStreamDesc sdesc[];
} SCMStreamList; } SStreamList;
typedef struct { typedef struct {
SCMQqueryList qlist; SQqueryList qlist;
SCMStreamList slist; SStreamList slist;
} SCMHeartBeatMsg; } SHeartBeatMsg;
typedef struct { typedef struct {
uint32_t queryId; uint32_t queryId;
uint32_t streamId; uint32_t streamId;
int8_t killConnection; int8_t killConnection;
SRpcIpSet ipList; SRpcIpSet ipList;
} SCMHeartBeatRsp; } SHeartBeatRsp;
typedef struct { typedef struct {
uint64_t handle; uint64_t handle;
......
...@@ -30,7 +30,7 @@ int32_t mgmtRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, void *pCo ...@@ -30,7 +30,7 @@ int32_t mgmtRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, void *pCo
int32_t mgmtRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn); int32_t mgmtRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn);
int32_t mgmtSaveQueryStreamList(SCMHeartBeatMsg *pHBMsg); int32_t mgmtSaveQueryStreamList(SHeartBeatMsg *pHBMsg);
int32_t mgmtKillQuery(char *qidstr, void *pConn); int32_t mgmtKillQuery(char *qidstr, void *pConn);
......
...@@ -27,7 +27,7 @@ extern "C" { ...@@ -27,7 +27,7 @@ extern "C" {
int32_t mgmtInitShell(); int32_t mgmtInitShell();
void mgmtCleanUpShell(); void mgmtCleanUpShell();
extern int32_t (*mgmtCheckRedirectMsg)(void *pConn, int32_t msgType); extern int32_t (*mgmtCheckRedirectMsg)(void *pConn);
extern int32_t (*mgmtProcessAlterAcctMsg)(void *pCont, int32_t contLen, void *ahandle); extern int32_t (*mgmtProcessAlterAcctMsg)(void *pCont, int32_t contLen, void *ahandle);
extern int32_t (*mgmtProcessCreateDnodeMsg)(void *pCont, int32_t contLen, void *ahandle); extern int32_t (*mgmtProcessCreateDnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
extern int32_t (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle); extern int32_t (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
......
...@@ -155,3 +155,14 @@ static int32_t mgmtRetrieveAcctsImp(SShowObj *pShow, char *data, int32_t rows, v ...@@ -155,3 +155,14 @@ static int32_t mgmtRetrieveAcctsImp(SShowObj *pShow, char *data, int32_t rows, v
} }
int32_t (*mgmtRetrieveAccts)(SShowObj *pShow, char *data, int32_t rows, void *pConn) = mgmtRetrieveAcctsImp; int32_t (*mgmtRetrieveAccts)(SShowObj *pShow, char *data, int32_t rows, void *pConn) = mgmtRetrieveAcctsImp;
SAcctObj *mgmtGetAcctFromConn(void *pConn) {
SRpcConnInfo connInfo;
rpcGetConnInfo(pConn, &connInfo);
SUserObj *pUser = mgmtGetUser(connInfo.user);
if(pUser != NULL) {
return pUser->pAcct;
}
return NULL;
}
...@@ -32,7 +32,7 @@ typedef struct { ...@@ -32,7 +32,7 @@ typedef struct {
int32_t numOfQueries; int32_t numOfQueries;
SCDesc * connInfo; SCDesc * connInfo;
SCDesc **cdesc; SCDesc **cdesc;
SCMQueryDesc qdesc[]; SQueryDesc qdesc[];
} SQueryShow; } SQueryShow;
typedef struct { typedef struct {
...@@ -40,10 +40,10 @@ typedef struct { ...@@ -40,10 +40,10 @@ typedef struct {
int32_t numOfStreams; int32_t numOfStreams;
SCDesc * connInfo; SCDesc * connInfo;
SCDesc **cdesc; SCDesc **cdesc;
SCMStreamDesc sdesc[]; SStreamDesc sdesc[];
} SStreamShow; } SStreamShow;
int32_t mgmtSaveQueryStreamList(SCMHeartBeatMsg *pHBMsg) { int32_t mgmtSaveQueryStreamList(SHeartBeatMsg *pHBMsg) {
// SAcctObj *pAcct = pConn->pAcct; // SAcctObj *pAcct = pConn->pAcct;
// //
// if (contLen <= 0 || pAcct == NULL) { // if (contLen <= 0 || pAcct == NULL) {
...@@ -60,7 +60,7 @@ int32_t mgmtSaveQueryStreamList(SCMHeartBeatMsg *pHBMsg) { ...@@ -60,7 +60,7 @@ int32_t mgmtSaveQueryStreamList(SCMHeartBeatMsg *pHBMsg) {
// pConn->pQList = realloc(pConn->pQList, contLen); // pConn->pQList = realloc(pConn->pQList, contLen);
// memcpy(pConn->pQList, cont, contLen); // memcpy(pConn->pQList, cont, contLen);
// //
// pConn->pSList = (SCMStreamList *)(((char *)pConn->pQList) + pConn->pQList->numOfQueries * sizeof(SCMQueryDesc) + sizeof(SCMQqueryList)); // pConn->pSList = (SStreamList *)(((char *)pConn->pQList) + pConn->pQList->numOfQueries * sizeof(SQueryDesc) + sizeof(SQqueryList));
// //
// pAcct->acctInfo.numOfQueries += pConn->pQList->numOfQueries; // pAcct->acctInfo.numOfQueries += pConn->pQList->numOfQueries;
// pAcct->acctInfo.numOfStreams += pConn->pSList->numOfStreams; // pAcct->acctInfo.numOfStreams += pConn->pSList->numOfStreams;
...@@ -76,7 +76,7 @@ int32_t mgmtGetQueries(SShowObj *pShow, void *pConn) { ...@@ -76,7 +76,7 @@ int32_t mgmtGetQueries(SShowObj *pShow, void *pConn) {
// //
// pthread_mutex_lock(&pAcct->mutex); // pthread_mutex_lock(&pAcct->mutex);
// //
// pQueryShow = malloc(sizeof(SCMQueryDesc) * pAcct->acctInfo.numOfQueries + sizeof(SQueryShow)); // pQueryShow = malloc(sizeof(SQueryDesc) * pAcct->acctInfo.numOfQueries + sizeof(SQueryShow));
// pQueryShow->numOfQueries = 0; // pQueryShow->numOfQueries = 0;
// pQueryShow->index = 0; // pQueryShow->index = 0;
// pQueryShow->connInfo = NULL; // pQueryShow->connInfo = NULL;
...@@ -87,7 +87,7 @@ int32_t mgmtGetQueries(SShowObj *pShow, void *pConn) { ...@@ -87,7 +87,7 @@ int32_t mgmtGetQueries(SShowObj *pShow, void *pConn) {
// pQueryShow->cdesc = (SCDesc **)malloc(pAcct->acctInfo.numOfQueries * sizeof(SCDesc *)); // pQueryShow->cdesc = (SCDesc **)malloc(pAcct->acctInfo.numOfQueries * sizeof(SCDesc *));
// //
// pConn = pAcct->pConn; // pConn = pAcct->pConn;
// SCMQueryDesc * pQdesc = pQueryShow->qdesc; // SQueryDesc * pQdesc = pQueryShow->qdesc;
// SCDesc * pCDesc = pQueryShow->connInfo; // SCDesc * pCDesc = pQueryShow->connInfo;
// SCDesc **ppCDesc = pQueryShow->cdesc; // SCDesc **ppCDesc = pQueryShow->cdesc;
// //
...@@ -97,7 +97,7 @@ int32_t mgmtGetQueries(SShowObj *pShow, void *pConn) { ...@@ -97,7 +97,7 @@ int32_t mgmtGetQueries(SShowObj *pShow, void *pConn) {
// pCDesc->port = pConn->port; // pCDesc->port = pConn->port;
// strcpy(pCDesc->user, pConn->pUser->user); // strcpy(pCDesc->user, pConn->pUser->user);
// //
// memcpy(pQdesc, pConn->pQList->qdesc, sizeof(SCMQueryDesc) * pConn->pQList->numOfQueries); // memcpy(pQdesc, pConn->pQList->qdesc, sizeof(SQueryDesc) * pConn->pQList->numOfQueries);
// pQdesc += pConn->pQList->numOfQueries; // pQdesc += pConn->pQList->numOfQueries;
// pQueryShow->numOfQueries += pConn->pQList->numOfQueries; // pQueryShow->numOfQueries += pConn->pQList->numOfQueries;
// for (int32_t i = 0; i < pConn->pQList->numOfQueries; ++i, ++ppCDesc) *ppCDesc = pCDesc; // for (int32_t i = 0; i < pConn->pQList->numOfQueries; ++i, ++ppCDesc) *ppCDesc = pCDesc;
...@@ -193,7 +193,7 @@ int32_t mgmtKillQuery(char *qidstr, void *pConn) { ...@@ -193,7 +193,7 @@ int32_t mgmtKillQuery(char *qidstr, void *pConn) {
// while (pConn) { // while (pConn) {
// if (pConn->ip == ip && pConn->port == port && pConn->pQList) { // if (pConn->ip == ip && pConn->port == port && pConn->pQList) {
// int32_t i; // int32_t i;
// SCMQueryDesc *pQDesc = pConn->pQList->qdesc; // SQueryDesc *pQDesc = pConn->pQList->qdesc;
// for (i = 0; i < pConn->pQList->numOfQueries; ++i, ++pQDesc) { // for (i = 0; i < pConn->pQList->numOfQueries; ++i, ++pQDesc) {
// if (pQDesc->queryId == queryId) break; // if (pQDesc->queryId == queryId) break;
// } // }
...@@ -229,7 +229,7 @@ int32_t mgmtRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, void *pCo ...@@ -229,7 +229,7 @@ int32_t mgmtRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, void *pCo
if (rows > pQueryShow->numOfQueries - pQueryShow->index) rows = pQueryShow->numOfQueries - pQueryShow->index; if (rows > pQueryShow->numOfQueries - pQueryShow->index) rows = pQueryShow->numOfQueries - pQueryShow->index;
while (numOfRows < rows) { while (numOfRows < rows) {
SCMQueryDesc *pNode = pQueryShow->qdesc + pQueryShow->index; SQueryDesc *pNode = pQueryShow->qdesc + pQueryShow->index;
SCDesc *pCDesc = pQueryShow->cdesc[pQueryShow->index]; SCDesc *pCDesc = pQueryShow->cdesc[pQueryShow->index];
cols = 0; cols = 0;
...@@ -275,7 +275,7 @@ int32_t mgmtGetStreams(SShowObj *pShow, void *pConn) { ...@@ -275,7 +275,7 @@ int32_t mgmtGetStreams(SShowObj *pShow, void *pConn) {
// //
// pthread_mutex_lock(&pAcct->mutex); // pthread_mutex_lock(&pAcct->mutex);
// //
// pStreamShow = malloc(sizeof(SCMStreamDesc) * pAcct->acctInfo.numOfStreams + sizeof(SQueryShow)); // pStreamShow = malloc(sizeof(SStreamDesc) * pAcct->acctInfo.numOfStreams + sizeof(SQueryShow));
// pStreamShow->numOfStreams = 0; // pStreamShow->numOfStreams = 0;
// pStreamShow->index = 0; // pStreamShow->index = 0;
// pStreamShow->connInfo = NULL; // pStreamShow->connInfo = NULL;
...@@ -286,7 +286,7 @@ int32_t mgmtGetStreams(SShowObj *pShow, void *pConn) { ...@@ -286,7 +286,7 @@ int32_t mgmtGetStreams(SShowObj *pShow, void *pConn) {
// pStreamShow->cdesc = (SCDesc **)malloc(pAcct->acctInfo.numOfStreams * sizeof(SCDesc *)); // pStreamShow->cdesc = (SCDesc **)malloc(pAcct->acctInfo.numOfStreams * sizeof(SCDesc *));
// //
// pConn = pAcct->pConn; // pConn = pAcct->pConn;
// SCMStreamDesc * pSdesc = pStreamShow->sdesc; // SStreamDesc * pSdesc = pStreamShow->sdesc;
// SCDesc * pCDesc = pStreamShow->connInfo; // SCDesc * pCDesc = pStreamShow->connInfo;
// SCDesc **ppCDesc = pStreamShow->cdesc; // SCDesc **ppCDesc = pStreamShow->cdesc;
// //
...@@ -296,7 +296,7 @@ int32_t mgmtGetStreams(SShowObj *pShow, void *pConn) { ...@@ -296,7 +296,7 @@ int32_t mgmtGetStreams(SShowObj *pShow, void *pConn) {
// pCDesc->port = pConn->port; // pCDesc->port = pConn->port;
// strcpy(pCDesc->user, pConn->pUser->user); // strcpy(pCDesc->user, pConn->pUser->user);
// //
// memcpy(pSdesc, pConn->pSList->sdesc, sizeof(SCMStreamDesc) * pConn->pSList->numOfStreams); // memcpy(pSdesc, pConn->pSList->sdesc, sizeof(SStreamDesc) * pConn->pSList->numOfStreams);
// pSdesc += pConn->pSList->numOfStreams; // pSdesc += pConn->pSList->numOfStreams;
// pStreamShow->numOfStreams += pConn->pSList->numOfStreams; // pStreamShow->numOfStreams += pConn->pSList->numOfStreams;
// for (int32_t i = 0; i < pConn->pSList->numOfStreams; ++i, ++ppCDesc) *ppCDesc = pCDesc; // for (int32_t i = 0; i < pConn->pSList->numOfStreams; ++i, ++ppCDesc) *ppCDesc = pCDesc;
...@@ -386,7 +386,7 @@ int32_t mgmtRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pCo ...@@ -386,7 +386,7 @@ int32_t mgmtRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pCo
if (rows > pStreamShow->numOfStreams - pStreamShow->index) rows = pStreamShow->numOfStreams - pStreamShow->index; if (rows > pStreamShow->numOfStreams - pStreamShow->index) rows = pStreamShow->numOfStreams - pStreamShow->index;
while (numOfRows < rows) { while (numOfRows < rows) {
SCMStreamDesc *pNode = pStreamShow->sdesc + pStreamShow->index; SStreamDesc *pNode = pStreamShow->sdesc + pStreamShow->index;
SCDesc *pCDesc = pStreamShow->cdesc[pStreamShow->index]; SCDesc *pCDesc = pStreamShow->cdesc[pStreamShow->index];
cols = 0; cols = 0;
...@@ -461,7 +461,7 @@ int32_t mgmtKillStream(char *qidstr, void *pConn) { ...@@ -461,7 +461,7 @@ int32_t mgmtKillStream(char *qidstr, void *pConn) {
// while (pConn) { // while (pConn) {
// if (pConn->ip == ip && pConn->port == port && pConn->pSList) { // if (pConn->ip == ip && pConn->port == port && pConn->pSList) {
// int32_t i; // int32_t i;
// SCMStreamDesc *pSDesc = pConn->pSList->sdesc; // SStreamDesc *pSDesc = pConn->pSList->sdesc;
// for (i = 0; i < pConn->pSList->numOfStreams; ++i, ++pSDesc) { // for (i = 0; i < pConn->pSList->numOfStreams; ++i, ++pSDesc) {
// if (pSDesc->streamId == streamId) break; // if (pSDesc->streamId == streamId) break;
// } // }
......
...@@ -15,7 +15,11 @@ ...@@ -15,7 +15,11 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taosmsg.h"
#include "tlog.h"
#include "trpc.h"
#include "tstatus.h"
#include "tsched.h"
#include "dnodeSystem.h" #include "dnodeSystem.h"
#include "mnode.h" #include "mnode.h"
#include "mgmtAcct.h" #include "mgmtAcct.h"
...@@ -30,11 +34,6 @@ ...@@ -30,11 +34,6 @@
#include "mgmtTable.h" #include "mgmtTable.h"
#include "mgmtUser.h" #include "mgmtUser.h"
#include "mgmtVgroup.h" #include "mgmtVgroup.h"
#include "taosmsg.h"
#include "tlog.h"
#include "tstatus.h"
#include "tsched.h"
#include "trpc.h"
#define MAX_LEN_OF_METER_META (sizeof(SMultiMeterMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS + sizeof(SSchema) * TSDB_MAX_TAGS + TSDB_MAX_TAGS_LEN) #define MAX_LEN_OF_METER_META (sizeof(SMultiMeterMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS + sizeof(SSchema) * TSDB_MAX_TAGS + TSDB_MAX_TAGS_LEN)
...@@ -634,196 +633,190 @@ int32_t mgmtProcessKillConnectionMsg(void *pCont, int32_t contLen, void *ahandle ...@@ -634,196 +633,190 @@ int32_t mgmtProcessKillConnectionMsg(void *pCont, int32_t contLen, void *ahandle
} }
int32_t mgmtProcessCreateUserMsg(void *pCont, int32_t contLen, void *ahandle) { int32_t mgmtProcessCreateUserMsg(void *pCont, int32_t contLen, void *ahandle) {
// SCMCreateUserMsg *pCreate = (SCMCreateUserMsg *)pMsg; if (mgmtCheckRedirectMsg(ahandle) != 0) {
// int32_t code = 0; return TSDB_CODE_REDIRECT;
// }
// if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_CREATE_USER_RSP) != 0) {
// return 0;
// }
//
// if (pConn->superAuth) {
// code = mgmtCreateUser(pConn->pAcct, pCreate->user, pCreate->pass);
// if (code == TSDB_CODE_SUCCESS) {
// mLPrint("user:%s is created by %s", pCreate->user, pConn->pUser->user);
// }
// } else {
// code = TSDB_CODE_NO_RIGHTS;
// }
//
// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_CREATE_USER_RSP, code);
return 0; SUserObj *pUser = mgmtGetUserFromConn(ahandle);
if (pUser == NULL) {
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
return TSDB_CODE_INVALID_USER;
}
int32_t code;
if (pUser->superAuth) {
SCreateUserMsg *pCreate = pCont;
code = mgmtCreateUser(pUser->pAcct, pCreate->user, pCreate->pass);
if (code == TSDB_CODE_SUCCESS) {
mLPrint("user:%s is created by %s", pCreate->user, pUser->user);
}
} else {
code = TSDB_CODE_NO_RIGHTS;
}
rpcSendResponse(ahandle, code, NULL, 0);
return code;
} }
int32_t mgmtProcessAlterUserMsg(void *pCont, int32_t contLen, void *ahandle) { int32_t mgmtProcessAlterUserMsg(void *pCont, int32_t contLen, void *ahandle) {
// SCMAlterUserMsg *pAlter = (SCMAlterUserMsg *)pMsg; if (mgmtCheckRedirectMsg(ahandle) != 0) {
// int32_t code = 0; return TSDB_CODE_REDIRECT;
// SUserObj * pUser; }
// SUserObj * pOperUser;
// SUserObj *pOperUser = mgmtGetUserFromConn(ahandle);
// if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_ALTER_USER_RSP) != 0) { if (pOperUser == NULL) {
// return 0; rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
// } return TSDB_CODE_INVALID_USER;
// }
// pUser = mgmtGetUser(pAlter->user);
// pOperUser = mgmtGetUser(pConn->pUser->user); SAlterUserMsg *pAlter = pCont;
// SUserObj *pUser = mgmtGetUser(pAlter->user);
// if (pUser == NULL) { if (pUser == NULL) {
// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_ALTER_USER_RSP, TSDB_CODE_INVALID_USER); rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
// return 0; return TSDB_CODE_INVALID_USER;
// } }
//
// if (pOperUser == NULL) { if (strcmp(pUser->user, "monitor") == 0 || (strcmp(pUser->user + 1, pUser->acct) == 0 && pUser->user[0] == '_')) {
// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_ALTER_USER_RSP, TSDB_CODE_INVALID_USER); rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0);
// return 0; return TSDB_CODE_NO_RIGHTS;
// } }
//
// if (strcmp(pUser->user, "monitor") == 0 || (strcmp(pUser->user + 1, pUser->acct) == 0 && pUser->user[0] == '_')) { int code;
// code = TSDB_CODE_NO_RIGHTS; if ((pAlter->flag & TSDB_ALTER_USER_PASSWD) != 0) {
// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_ALTER_USER_RSP, code); bool hasRight = false;
// return 0; if (strcmp(pOperUser->user, "root") == 0) {
// } hasRight = true;
// } else if (strcmp(pUser->user, pOperUser->user) == 0) {
// if ((pAlter->flag & TSDB_ALTER_USER_PASSWD) != 0) { hasRight = true;
// bool hasRight = false; } else if (pOperUser->superAuth) {
// if (strcmp(pOperUser->user, "root") == 0) { if (strcmp(pUser->user, "root") == 0) {
// hasRight = true; hasRight = false;
// } else if (strcmp(pUser->user, pOperUser->user) == 0) { } else if (strcmp(pOperUser->acct, pUser->acct) != 0) {
// hasRight = true; hasRight = false;
// } else if (pOperUser->superAuth) { } else {
// if (strcmp(pUser->user, "root") == 0) { hasRight = true;
// hasRight = false; }
// } else if (strcmp(pOperUser->acct, pUser->acct) != 0) { }
// hasRight = false;
// } else { if (hasRight) {
// hasRight = true; memset(pUser->pass, 0, sizeof(pUser->pass));
// } taosEncryptPass((uint8_t*)pAlter->pass, strlen(pAlter->pass), pUser->pass);
// } code = mgmtUpdateUser(pUser);
// mLPrint("user:%s password is altered by %s, code:%d", pAlter->user, pUser->user, code);
// if (hasRight) { } else {
// memset(pUser->pass, 0, sizeof(pUser->pass)); code = TSDB_CODE_NO_RIGHTS;
// taosEncryptPass((uint8_t*)pAlter->pass, strlen(pAlter->pass), pUser->pass); }
// code = mgmtUpdateUser(pUser);
// mLPrint("user:%s password is altered by %s, code:%d", pAlter->user, pConn->pUser->user, code); rpcSendResponse(ahandle, code, NULL, 0);
// } else { return code;
// code = TSDB_CODE_NO_RIGHTS; }
// }
// if ((pAlter->flag & TSDB_ALTER_USER_PRIVILEGES) != 0) {
// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_ALTER_USER_RSP, code); bool hasRight = false;
// return 0;
// } if (strcmp(pUser->user, "root") == 0) {
// hasRight = false;
// if ((pAlter->flag & TSDB_ALTER_USER_PRIVILEGES) != 0) { } else if (strcmp(pUser->user, pUser->acct) == 0) {
// bool hasRight = false; hasRight = false;
// } else if (strcmp(pOperUser->user, "root") == 0) {
// if (strcmp(pUser->user, "root") == 0) { hasRight = true;
// hasRight = false; } else if (strcmp(pUser->user, pOperUser->user) == 0) {
// } else if (strcmp(pUser->user, pUser->acct) == 0) { hasRight = false;
// hasRight = false; } else if (pOperUser->superAuth) {
// } else if (strcmp(pOperUser->user, "root") == 0) { if (strcmp(pUser->user, "root") == 0) {
// hasRight = true; hasRight = false;
// } else if (strcmp(pUser->user, pOperUser->user) == 0) { } else if (strcmp(pOperUser->acct, pUser->acct) != 0) {
// hasRight = false; hasRight = false;
// } else if (pOperUser->superAuth) { } else {
// if (strcmp(pUser->user, "root") == 0) { hasRight = true;
// hasRight = false; }
// } else if (strcmp(pOperUser->acct, pUser->acct) != 0) { }
// hasRight = false;
// } else { if (pAlter->privilege == 1) { // super
// hasRight = true; hasRight = false;
// } }
// }
// if (hasRight) {
// if (pAlter->privilege == 1) { // super //if (pAlter->privilege == 1) { // super
// hasRight = false; // pUser->superAuth = 1;
// } // pUser->writeAuth = 1;
// //}
// if (hasRight) { if (pAlter->privilege == 2) { // read
// //if (pAlter->privilege == 1) { // super pUser->superAuth = 0;
// // pUser->superAuth = 1; pUser->writeAuth = 0;
// // pUser->writeAuth = 1; }
// //} if (pAlter->privilege == 3) { // write
// if (pAlter->privilege == 2) { // read pUser->superAuth = 0;
// pUser->superAuth = 0; pUser->writeAuth = 1;
// pUser->writeAuth = 0; }
// }
// if (pAlter->privilege == 3) { // write code = mgmtUpdateUser(pUser);
// pUser->superAuth = 0; mLPrint("user:%s privilege is altered by %s, code:%d", pAlter->user, pUser->user, code);
// pUser->writeAuth = 1; } else {
// } code = TSDB_CODE_NO_RIGHTS;
// }
// code = mgmtUpdateUser(pUser);
// mLPrint("user:%s privilege is altered by %s, code:%d", pAlter->user, pConn->pUser->user, code); rpcSendResponse(ahandle, code, NULL, 0);
// } else { return code;
// code = TSDB_CODE_NO_RIGHTS; }
// }
// code = TSDB_CODE_NO_RIGHTS;
// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_ALTER_USER_RSP, code); rpcSendResponse(ahandle, code, NULL, 0);
// return 0; return code;
// }
//
// code = TSDB_CODE_NO_RIGHTS;
// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_ALTER_USER_RSP, code);
return 0;
} }
int32_t mgmtProcessDropUserMsg(void *pCont, int32_t contLen, void *ahandle) { int32_t mgmtProcessDropUserMsg(void *pCont, int32_t contLen, void *ahandle) {
// SCMDropUserMsg *pDrop = (SCMDropUserMsg *)pMsg; if (mgmtCheckRedirectMsg(ahandle) != 0) {
// int32_t code = 0; return TSDB_CODE_REDIRECT;
// SUserObj * pUser; }
// SUserObj * pOperUser;
// SUserObj *pOperUser = mgmtGetUserFromConn(ahandle);
// if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_DROP_USER_RSP) != 0) { if (pOperUser == NULL) {
// return 0; rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
// } return TSDB_CODE_INVALID_USER;
// }
// pUser = mgmtGetUser(pDrop->user);
// pOperUser = mgmtGetUser(pConn->pUser->user); SDropUserMsg *pDrop = pCont;
// SUserObj *pUser = mgmtGetUser(pDrop->user);
// if (pUser == NULL) { if (pUser == NULL) {
// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_DROP_USER_RSP, TSDB_CODE_INVALID_USER); rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
// return 0; return TSDB_CODE_INVALID_USER;
// } }
//
// if (pOperUser == NULL) { if (strcmp(pUser->user, "monitor") == 0 || (strcmp(pUser->user + 1, pUser->acct) == 0 && pUser->user[0] == '_')) {
// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_DROP_USER_RSP, TSDB_CODE_INVALID_USER); rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0);
// return 0; return TSDB_CODE_NO_RIGHTS;
// } }
//
// if (strcmp(pUser->user, "monitor") == 0 || (strcmp(pUser->user + 1, pUser->acct) == 0 && pUser->user[0] == '_')) { bool hasRight = false;
// code = TSDB_CODE_NO_RIGHTS; if (strcmp(pUser->user, "root") == 0) {
// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_DROP_USER_RSP, code); hasRight = false;
// return 0; } else if (strcmp(pOperUser->user, "root") == 0) {
// } hasRight = true;
// } else if (strcmp(pUser->user, pOperUser->user) == 0) {
// bool hasRight = false; hasRight = false;
// if (strcmp(pUser->user, "root") == 0) { } else if (pOperUser->superAuth) {
// hasRight = false; if (strcmp(pUser->user, "root") == 0) {
// } else if (strcmp(pOperUser->user, "root") == 0) { hasRight = false;
// hasRight = true; } else if (strcmp(pOperUser->acct, pUser->acct) != 0) {
// } else if (strcmp(pUser->user, pOperUser->user) == 0) { hasRight = false;
// hasRight = false; } else {
// } else if (pOperUser->superAuth) { hasRight = true;
// if (strcmp(pUser->user, "root") == 0) { }
// hasRight = false; }
// } else if (strcmp(pOperUser->acct, pUser->acct) != 0) {
// hasRight = false; int32_t code;
// } else { if (hasRight) {
// hasRight = true; code = mgmtDropUser(pUser->pAcct, pDrop->user);
// } if (code == TSDB_CODE_SUCCESS) {
// } mLPrint("user:%s is dropped by %s", pDrop->user, pUser->user);
// }
// if (hasRight) { } else {
// code = mgmtDropUser(pConn->pAcct, pDrop->user); code = TSDB_CODE_NO_RIGHTS;
// if (code == 0) { }
// mLPrint("user:%s is dropped by %s", pDrop->user, pConn->pUser->user);
// } rpcSendResponse(ahandle, code, NULL, 0);
// } else { return code;
// code = TSDB_CODE_NO_RIGHTS;
// }
//
// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_DROP_USER_RSP, code);
return 0;
} }
int32_t mgmtProcessDropDbMsg(void *pCont, int32_t contLen, void *ahandle) { int32_t mgmtProcessDropDbMsg(void *pCont, int32_t contLen, void *ahandle) {
...@@ -1188,10 +1181,10 @@ int32_t mgmtProcessCfgDnodeMsg(void *pCont, int32_t contLen, void *ahandle) { ...@@ -1188,10 +1181,10 @@ int32_t mgmtProcessCfgDnodeMsg(void *pCont, int32_t contLen, void *ahandle) {
} }
int32_t mgmtProcessHeartBeatMsg(void *pCont, int32_t contLen, void *ahandle) { int32_t mgmtProcessHeartBeatMsg(void *pCont, int32_t contLen, void *ahandle) {
SCMHeartBeatMsg *pHBMsg = (SCMHeartBeatMsg *) pCont; SHeartBeatMsg *pHBMsg = (SHeartBeatMsg *) pCont;
mgmtSaveQueryStreamList(pHBMsg); mgmtSaveQueryStreamList(pHBMsg);
SCMHeartBeatRsp *pHBRsp = (SCMHeartBeatRsp *) rpcMallocCont(contLen); SHeartBeatRsp *pHBRsp = (SHeartBeatRsp *) rpcMallocCont(contLen);
if (pHBRsp == NULL) { if (pHBRsp == NULL) {
rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0); rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0);
rpcFreeCont(pCont); rpcFreeCont(pCont);
...@@ -1225,9 +1218,7 @@ int32_t mgmtProcessHeartBeatMsg(void *pCont, int32_t contLen, void *ahandle) { ...@@ -1225,9 +1218,7 @@ int32_t mgmtProcessHeartBeatMsg(void *pCont, int32_t contLen, void *ahandle) {
pHBRsp->streamId = 0; pHBRsp->streamId = 0;
pHBRsp->killConnection = 0; pHBRsp->killConnection = 0;
rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, pHBRsp, sizeof(SCMHeartBeatMsg)); rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, pHBRsp, sizeof(SHeartBeatMsg));
rpcFreeCont(pCont);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1247,7 +1238,7 @@ int32_t mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secr ...@@ -1247,7 +1238,7 @@ int32_t mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secr
} }
static int32_t mgmtProcessConnectMsg(void *pCont, int32_t contLen, void *thandle) { static int32_t mgmtProcessConnectMsg(void *pCont, int32_t contLen, void *thandle) {
SCMConnectMsg *pConnectMsg = (SCMConnectMsg *) pCont; SConnectMsg *pConnectMsg = (SConnectMsg *) pCont;
SRpcConnInfo connInfo; SRpcConnInfo connInfo;
rpcGetConnInfo(thandle, &connInfo); rpcGetConnInfo(thandle, &connInfo);
int32_t code; int32_t code;
...@@ -1284,7 +1275,7 @@ static int32_t mgmtProcessConnectMsg(void *pCont, int32_t contLen, void *thandle ...@@ -1284,7 +1275,7 @@ static int32_t mgmtProcessConnectMsg(void *pCont, int32_t contLen, void *thandle
} }
} }
SCMConnectRsp *pConnectRsp = rpcMallocCont(sizeof(SCMConnectRsp)); SConnectRsp *pConnectRsp = rpcMallocCont(sizeof(SConnectRsp));
if (pConnectRsp == NULL) { if (pConnectRsp == NULL) {
code = TSDB_CODE_SERV_OUT_OF_MEMORY; code = TSDB_CODE_SERV_OUT_OF_MEMORY;
goto connect_over; goto connect_over;
...@@ -1316,7 +1307,7 @@ connect_over: ...@@ -1316,7 +1307,7 @@ connect_over:
rpcSendResponse(thandle, code, NULL, 0); rpcSendResponse(thandle, code, NULL, 0);
} else { } else {
mLPrint("user:%s login from %s, code:%d", connInfo.user, taosIpStr(connInfo.clientIp), code); mLPrint("user:%s login from %s, code:%d", connInfo.user, taosIpStr(connInfo.clientIp), code);
rpcSendResponse(thandle, code, pConnectRsp, sizeof(SCMConnectRsp)); rpcSendResponse(thandle, code, pConnectRsp, sizeof(SConnectRsp));
} }
return code; return code;
...@@ -1404,10 +1395,10 @@ void mgmtInitProcessShellMsg() { ...@@ -1404,10 +1395,10 @@ void mgmtInitProcessShellMsg() {
mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_CONNECTION] = mgmtProcessKillConnectionMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_CONNECTION] = mgmtProcessKillConnectionMsg;
} }
int32_t mgmtCheckRedirectMsgImp(void *pConn, int32_t msgType) { int32_t mgmtCheckRedirectMsgImp(void *pConn) {
return 0; return 0;
} }
int32_t (*mgmtCheckRedirectMsg)(void *pConn, int32_t msgType) = mgmtCheckRedirectMsgImp; int32_t (*mgmtCheckRedirectMsg)(void *pConn) = mgmtCheckRedirectMsgImp;
static int32_t mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle) { static int32_t mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle) {
rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0); rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册