diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 794e41cc6c741848e26af4f0ab88968fbfc26d6a..cce403e2bf826cd430ebeb615ba877b96aae1d2f 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -254,6 +254,8 @@ typedef struct _user_obj { char reserved[16]; char updateEnd[1]; struct _user_obj *prev, *next; + int8_t writeAuth; + int8_t superAuth; } SUserObj; typedef struct { diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index ea8045c8515432aa0a9d6e56c259b6eada7757f3..3de7834b3d8d170b9770738c70754ed509d84e33 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -187,6 +187,13 @@ typedef enum { extern char *taosMsg[]; +#define TSDB_MSG_DEF_MAX_MPEERS 5 +#define TSDB_MSG_DEF_VERSION_LEN 64 +#define TSDB_MSG_DEF_DB_LEN 128 +#define TSDB_MSG_DEF_USER_LEN 128 +#define TSDB_MSG_DEF_TABLE_LEN 128 +#define TSDB_MSG_DEF_ACCT_LEN 128 + #pragma pack(push, 1) typedef struct { @@ -325,9 +332,23 @@ typedef struct { } SAlterTableMsg; typedef struct { - char clientVersion[TSDB_VERSION_LEN]; - char db[TSDB_TABLE_ID_LEN]; -} SConnectMsg; + char clientVersion[TSDB_MSG_DEF_VERSION_LEN]; + char msgVersion[TSDB_MSG_DEF_VERSION_LEN]; + char db[TSDB_MSG_DEF_DB_LEN]; +} SCMConnectMsg; + +typedef struct { + char acctId[TSDB_MSG_DEF_ACCT_LEN]; + char serverVersion[TSDB_MSG_DEF_VERSION_LEN]; + int8_t writeAuth; + int8_t superAuth; + int8_t usePublicIp; + int16_t index; + int16_t numOfIps; + uint16_t port; + uint32_t ip[TSDB_MSG_DEF_MAX_MPEERS]; +} SCMConnectRsp; + typedef struct { int32_t maxUsers; @@ -360,13 +381,6 @@ typedef struct { char db[TSDB_TABLE_ID_LEN]; } SMgmtHead; -typedef struct { - char acctId[TSDB_ACCT_LEN]; - char version[TSDB_VERSION_LEN]; - char writeAuth; - char superAuth; -} SConnectRsp; - typedef struct { short vnode; int32_t sid; diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 0e4e43f76aad0e802841b10cc5a3fd745bf74de3..100d3a6cac7986b24bc50b1e2ba520b18fc8fb26 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -46,8 +46,9 @@ typedef struct { } SRpcIpSet; typedef struct { - uint32_t sourceIp; - uint16_t sourcePort; + uint32_t clientIp; + uint16_t clientPort; + uint32_t serverIp; char *user; } SRpcConnInfo; diff --git a/src/mnode/inc/mgmtShell.h b/src/mnode/inc/mgmtShell.h index e1181ff0f347ab8234353ae3774f6eb3c4d72932..529a4396045cb0b17528c07c8e01d1ec4ba2713e 100644 --- a/src/mnode/inc/mgmtShell.h +++ b/src/mnode/inc/mgmtShell.h @@ -24,7 +24,7 @@ extern "C" { #include #include "mnode.h" -int mgmtInitShell(); +int32_t mgmtInitShell(); void mgmtCleanUpShell(); extern int32_t (*mgmtCheckRedirectMsg)(SConnObj *pConn, int32_t msgType); diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 1ddf6500ee6a5f3f9892915b9fa7129a901b03b5..f46d0e9a378fd2d7a8d85d269a20306596d514e0 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -46,32 +46,32 @@ static void mgmtInitShowMsgFp(); void * tsShellConn = NULL; SConnObj *connList; -void mgmtProcessMsgFromShell(char type, void *pCont, int contLen, void *ahandle, int32_t code); -int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); -int (*mgmtProcessShellMsg[TSDB_MSG_TYPE_MAX])(char *, int, SConnObj *); +void mgmtProcessMsgFromShell(char type, void *pCont, int32_t contLen, void *ahandle, int32_t code); +int32_t mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); +int32_t (*mgmtProcessShellMsg[TSDB_MSG_TYPE_MAX])(char *, int32_t, SConnObj *); void mgmtInitProcessShellMsg(); -int mgmtRedirectMsg(SConnObj *pConn, int msgType); -int mgmtKillQuery(char *queryId, SConnObj *pConn); +int32_t mgmtRedirectMsg(SConnObj *pConn, int32_t msgType); +int32_t mgmtKillQuery(char *queryId, SConnObj *pConn); void mgmtProcessTranRequest(SSchedMsg *pSchedMsg) { SIntMsg * pMsg = (SIntMsg *)(pSchedMsg->msg); SConnObj *pConn = (SConnObj *)(pSchedMsg->thandle); char *cont = (char *)pMsg->content + sizeof(SMgmtHead); - int contLen = pMsg->msgLen - sizeof(SIntMsg) - sizeof(SMgmtHead); + int32_t contLen = pMsg->msgLen - sizeof(SIntMsg) - sizeof(SMgmtHead); if (pConn->pAcct) (*mgmtProcessShellMsg[pMsg->msgType])(cont, contLen, pConn); if (pSchedMsg->msg) free(pSchedMsg->msg); } -int mgmtInitShell() { +int32_t mgmtInitShell() { SRpcInit rpcInit; mgmtInitProcessShellMsg(); mgmtInitShowMsgFp(); - int size = sizeof(SConnObj) * tsMaxShellConns; + int32_t size = sizeof(SConnObj) * tsMaxShellConns; connList = (SConnObj *)malloc(size); if (connList == NULL) { mError("failed to malloc for connList to shell"); @@ -79,7 +79,7 @@ int mgmtInitShell() { } memset(connList, 0, size); - int numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore / 4.0; + int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore / 4.0; if (numOfThreads < 1) numOfThreads = 1; memset(&rpcInit, 0, sizeof(rpcInit)); @@ -113,7 +113,7 @@ void mgmtCleanUpShell() { static void mgmtSetSchemaFromMeters(SSchema *pSchema, STabObj *pMeterObj, uint32_t numOfCols) { SSchema *pMeterSchema = (SSchema *)(pMeterObj->schema); - for (int i = 0; i < numOfCols; ++i) { + for (int32_t i = 0; i < numOfCols; ++i) { pSchema->type = pMeterSchema[i].type; strcpy(pSchema->name, pMeterSchema[i].name); pSchema->bytes = htons(pMeterSchema[i].bytes); @@ -180,7 +180,7 @@ bool mgmtCheckMeterMetaMsgType(char *pMsg) { return 0; } -int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { +int32_t mgmtProcessMeterMetaMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { // SMeterInfoMsg *pInfo = (SMeterInfoMsg *)pMsg; // STabObj * pMeterObj = NULL; // SVgObj * pVgroup = NULL; @@ -191,7 +191,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { // // pInfo->createFlag = htons(pInfo->createFlag); // -// int size = sizeof(STaosHeader) + sizeof(STaosRsp) + sizeof(SMeterMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS + +// int32_t size = sizeof(STaosHeader) + sizeof(STaosRsp) + sizeof(SMeterMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS + // sizeof(SSchema) * TSDB_MAX_TAGS + TSDB_MAX_TAGS_LEN + TSDB_EXTRA_PAYLOAD_SIZE; // // SDbObj *pDb = NULL; @@ -320,7 +320,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { // pRsp->code = TSDB_CODE_INVALID_TABLE; // goto _exit_code; // } -// for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) { +// for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) { // if (pConn->usePublicIp) { // pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp; // pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); @@ -352,7 +352,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { * | | | * pStart pCurMeter pTail **/ -int mgmtProcessMultiMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { +int32_t mgmtProcessMultiMeterMetaMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { // SDbObj * pDbObj = NULL; // STabObj * pMeterObj = NULL; // SVgObj * pVgroup = NULL; @@ -365,7 +365,7 @@ int mgmtProcessMultiMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { // char * str = pMsg + sizeof(SMultiMeterInfoMsg); // pInfo->numOfMeters = htonl(pInfo->numOfMeters); // -// int size = 4*1024*1024; // first malloc 4 MB, subsequent reallocation as twice +// int32_t size = 4*1024*1024; // first malloc 4 MB, subsequent reallocation as twice // // char *pNewMsg; // if ((pStart = mgmtForMultiAllocMsg(pConn, size, &pNewMsg, &pRsp)) == NULL) { @@ -465,7 +465,7 @@ int mgmtProcessMultiMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { // goto _error_exit_code; // } // -// for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) { +// for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) { // if (pConn->usePublicIp) { // pMeta->meta.vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp; // pMeta->meta.vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); @@ -507,7 +507,7 @@ int mgmtProcessMultiMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { return 0; } -int mgmtProcessMetricMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { +int32_t mgmtProcessMetricMetaMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { // SSuperTableMetaMsg *pSuperTableMetaMsg = (SSuperTableMetaMsg *)pMsg; // STabObj * pMetric; // STaosRsp * pRsp; @@ -558,9 +558,9 @@ int mgmtProcessMetricMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { return 0; } -int mgmtProcessCreateDbMsg(char *pMsg, int msgLen, SConnObj *pConn) { +int32_t mgmtProcessCreateDbMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { // SCreateDbMsg *pCreate = (SCreateDbMsg *)pMsg; -// int code = 0; +// int32_t code = 0; // // if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_CREATE_DB_RSP) != 0) { // return 0; @@ -593,14 +593,14 @@ int mgmtProcessCreateDbMsg(char *pMsg, int msgLen, SConnObj *pConn) { return 0; } -int mgmtProcessCreateMnodeMsg(char *pMsg, int msgLen, SConnObj *pConn) { +int32_t mgmtProcessCreateMnodeMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { // return rpcSendResponse(pConn->thandle, TSDB_MSG_TYPE_CREATE_MNODE_RSP, TSDB_CODE_OPS_NOT_SUPPORT); return 0; } -int mgmtProcessAlterDbMsg(char *pMsg, int msgLen, SConnObj *pConn) { +int32_t mgmtProcessAlterDbMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { // SAlterDbMsg *pAlter = (SAlterDbMsg *)pMsg; -// int code = 0; +// int32_t code = 0; // // if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_ALTER_DB_RSP) != 0) { // return 0; @@ -624,8 +624,8 @@ int mgmtProcessAlterDbMsg(char *pMsg, int msgLen, SConnObj *pConn) { return 0; } -int mgmtProcessKillQueryMsg(char *pMsg, int msgLen, SConnObj *pConn) { -// int code = 0; +int32_t mgmtProcessKillQueryMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { +// int32_t code = 0; // SKillQuery *pKill = (SKillQuery *)pMsg; // // if (!pConn->writeAuth) { @@ -639,8 +639,8 @@ int mgmtProcessKillQueryMsg(char *pMsg, int msgLen, SConnObj *pConn) { return 0; } -int mgmtProcessKillStreamMsg(char *pMsg, int msgLen, SConnObj *pConn) { -// int code = 0; +int32_t mgmtProcessKillStreamMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { +// int32_t code = 0; // SKillStream *pKill = (SKillStream *)pMsg; // // if (!pConn->writeAuth) { @@ -654,8 +654,8 @@ int mgmtProcessKillStreamMsg(char *pMsg, int msgLen, SConnObj *pConn) { return 0; } -int mgmtProcessKillConnectionMsg(char *pMsg, int msgLen, SConnObj *pConn) { -// int code = 0; +int32_t mgmtProcessKillConnectionMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { +// int32_t code = 0; // SKillConnection *pKill = (SKillConnection *)pMsg; // // if (!pConn->superAuth) { @@ -669,9 +669,9 @@ int mgmtProcessKillConnectionMsg(char *pMsg, int msgLen, SConnObj *pConn) { return 0; } -int mgmtProcessCreateUserMsg(char *pMsg, int msgLen, SConnObj *pConn) { +int32_t mgmtProcessCreateUserMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { // SCreateUserMsg *pCreate = (SCreateUserMsg *)pMsg; -// int code = 0; +// int32_t code = 0; // // if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_CREATE_USER_RSP) != 0) { // return 0; @@ -691,9 +691,9 @@ int mgmtProcessCreateUserMsg(char *pMsg, int msgLen, SConnObj *pConn) { return 0; } -int mgmtProcessAlterUserMsg(char *pMsg, int msgLen, SConnObj *pConn) { +int32_t mgmtProcessAlterUserMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { // SAlterUserMsg *pAlter = (SAlterUserMsg *)pMsg; -// int code = 0; +// int32_t code = 0; // SUserObj * pUser; // SUserObj * pOperUser; // @@ -803,9 +803,9 @@ int mgmtProcessAlterUserMsg(char *pMsg, int msgLen, SConnObj *pConn) { return 0; } -int mgmtProcessDropUserMsg(char *pMsg, int msgLen, SConnObj *pConn) { +int32_t mgmtProcessDropUserMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { // SDropUserMsg *pDrop = (SDropUserMsg *)pMsg; -// int code = 0; +// int32_t code = 0; // SUserObj * pUser; // SUserObj * pOperUser; // @@ -862,9 +862,9 @@ int mgmtProcessDropUserMsg(char *pMsg, int msgLen, SConnObj *pConn) { return 0; } -int mgmtProcessDropDbMsg(char *pMsg, int msgLen, SConnObj *pConn) { +int32_t mgmtProcessDropDbMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { // SDropDbMsg *pDrop = (SDropDbMsg *)pMsg; -// int code; +// int32_t code; // // if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_DROP_DB_RSP) != 0) { // return 0; @@ -883,9 +883,9 @@ int mgmtProcessDropDbMsg(char *pMsg, int msgLen, SConnObj *pConn) { return 0; } -int mgmtProcessUseDbMsg(char *pMsg, int msgLen, SConnObj *pConn) { +int32_t mgmtProcessUseDbMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { // SUseDbMsg *pUse = (SUseDbMsg *)pMsg; -// int code; +// int32_t code; // // code = mgmtUseDb(pConn, pUse->db); // if (code == 0) mTrace("DB is change to:%s by %s", pUse->db, pConn->pUser->user); @@ -933,11 +933,11 @@ static void mgmtInitShowMsgFp() { mgmtRetrieveFp[TSDB_MGMT_TABLE_VNODES] = mgmtRetrieveVnodes; } -int mgmtProcessShowMsg(char *pMsg, int msgLen, SConnObj *pConn) { +int32_t mgmtProcessShowMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { // SShowMsg * pShowMsg = (SShowMsg *)pMsg; // STaosRsp * pRsp; // char * pStart; -// int code = 0; +// int32_t code = 0; // SShowRspMsg *pShowRsp; // SShowObj * pShow = NULL; // @@ -947,7 +947,7 @@ int mgmtProcessShowMsg(char *pMsg, int msgLen, SConnObj *pConn) { // } // } // -// int size = sizeof(STaosHeader) + sizeof(STaosRsp) + sizeof(SShowRspMsg) + sizeof(SSchema) * TSDB_MAX_COLUMNS + +// int32_t size = sizeof(STaosHeader) + sizeof(STaosRsp) + sizeof(SShowRspMsg) + sizeof(SSchema) * TSDB_MAX_COLUMNS + // TSDB_EXTRA_PAYLOAD_SIZE; // pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_SHOW_RSP, size); // if (pStart == NULL) { @@ -992,12 +992,12 @@ int mgmtProcessShowMsg(char *pMsg, int msgLen, SConnObj *pConn) { return 0; } -int mgmtProcessRetrieveMsg(char *pMsg, int msgLen, SConnObj *pConn) { +int32_t mgmtProcessRetrieveMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { // SRetrieveMeterMsg *pRetrieve; // SRetrieveMeterRsp *pRsp; -// int rowsToRead = 0, size = 0, rowsRead = 0; +// int32_t rowsToRead = 0, size = 0, rowsRead = 0; // char * pStart; -// int code = 0; +// int32_t code = 0; // SShowObj * pShow; // // pRetrieve = (SRetrieveMeterMsg *)pMsg; @@ -1080,9 +1080,9 @@ int mgmtProcessRetrieveMsg(char *pMsg, int msgLen, SConnObj *pConn) { return 0; } -int mgmtProcessCreateTableMsg(char *pMsg, int msgLen, SConnObj *pConn) { +int32_t mgmtProcessCreateTableMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { // SCreateTableMsg *pCreate = (SCreateTableMsg *)pMsg; -// int code; +// int32_t code; // SSchema * pSchema; // // if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_CREATE_TABLE_RSP) != 0) { @@ -1097,7 +1097,7 @@ int mgmtProcessCreateTableMsg(char *pMsg, int msgLen, SConnObj *pConn) { // // pCreate->sqlLen = htons(pCreate->sqlLen); // pSchema = pCreate->schema; -// for (int i = 0; i < pCreate->numOfColumns + pCreate->numOfTags; ++i) { +// for (int32_t i = 0; i < pCreate->numOfColumns + pCreate->numOfTags; ++i) { // pSchema->bytes = htons(pSchema->bytes); // pSchema->colId = i; // pSchema++; @@ -1135,9 +1135,9 @@ int mgmtProcessCreateTableMsg(char *pMsg, int msgLen, SConnObj *pConn) { return 0; } -int mgmtProcessDropTableMsg(char *pMsg, int msgLen, SConnObj *pConn) { +int32_t mgmtProcessDropTableMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { // SDropTableMsg *pDrop = (SDropTableMsg *)pMsg; -// int code; +// int32_t code; // // if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_DROP_TABLE_RSP) != 0) { // return 0; @@ -1161,9 +1161,9 @@ int mgmtProcessDropTableMsg(char *pMsg, int msgLen, SConnObj *pConn) { return 0; } -int mgmtProcessAlterTableMsg(char *pMsg, int msgLen, SConnObj *pConn) { +int32_t mgmtProcessAlterTableMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { // SAlterTableMsg *pAlter = (SAlterTableMsg *)pMsg; -// int code; +// int32_t code; // // if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_ALTER_TABLE_RSP) != 0) { // return 0; @@ -1202,8 +1202,8 @@ int mgmtProcessAlterTableMsg(char *pMsg, int msgLen, SConnObj *pConn) { return 0; } -int mgmtProcessCfgDnodeMsg(char *pMsg, int msgLen, SConnObj *pConn) { -// int code = 0; +int32_t mgmtProcessCfgDnodeMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { +// int32_t code = 0; // SCfgMsg *pCfg = (SCfgMsg *)pMsg; // // if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_CFG_MNODE_RSP) != 0) { @@ -1223,9 +1223,9 @@ int mgmtProcessCfgDnodeMsg(char *pMsg, int msgLen, SConnObj *pConn) { // return 0; //} // -//int mgmtProcessHeartBeatMsg(char *cont, int contLen, SConnObj *pConn) { +//int32_t mgmtProcessHeartBeatMsg(char *cont, int32_t contLen, SConnObj *pConn) { // char * pStart, *pMsg; -// int msgLen; +// int32_t msgLen; // STaosRsp *pRsp; // // mgmtSaveQueryStreamList(cont, contLen, pConn); @@ -1247,7 +1247,7 @@ int mgmtProcessCfgDnodeMsg(char *pMsg, int msgLen, SConnObj *pConn) { // // if (pConn->usePublicIp) { // if (pSdbPublicIpList != NULL) { -// int size = pSdbPublicIpList->numOfIps * 4; +// int32_t size = pSdbPublicIpList->numOfIps * 4; // pHBRsp->ipList.numOfIps = pSdbPublicIpList->numOfIps; // memcpy(pHBRsp->ipList.ip, pSdbPublicIpList->ip, size); // pMsg += sizeof(SHeartBeatRsp) + size; @@ -1258,7 +1258,7 @@ int mgmtProcessCfgDnodeMsg(char *pMsg, int msgLen, SConnObj *pConn) { // // } else { // if (pSdbIpList != NULL) { -// int size = pSdbIpList->numOfIps * 4; +// int32_t size = pSdbIpList->numOfIps * 4; // pHBRsp->ipList.numOfIps = pSdbIpList->numOfIps; // memcpy(pHBRsp->ipList.ip, pSdbIpList->ip, size); // pMsg += sizeof(SHeartBeatRsp) + size; @@ -1296,132 +1296,102 @@ void mgmtEstablishConn(SConnObj *pConn) { // mgmtAddConnIntoAcct(pConn); } -int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) { - SUserObj *pUser = NULL; - - *spi = 0; - *encrypt = 0; - secret[0] = 0; - ckey[0] = 0; - - pUser = mgmtGetUser(user); - if (pUser == NULL) return TSDB_CODE_INVALID_USER; +int32_t mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) { + SUserObj *pUser = mgmtGetUser(user); + if (pUser == NULL) { + *spi = 0; + *encrypt = 0; + *ckey = 0; + *secret = 0; + return TSDB_CODE_INVALID_USER; + } *spi = 1; *encrypt = 0; + *ckey = 0; memcpy(secret, pUser->pass, TSDB_KEY_LEN); - - return 0; + return TSDB_CODE_SUCCESS; } -int mgmtProcessConnectMsg(char *pMsg, int msgLen, SConnObj *pConn) { -// STaosRsp * pRsp; -// SConnectRsp *pConnectRsp; -// SConnectMsg *pConnectMsg; -// char * pStart; -// int code = TSDB_CODE_INVALID_USER; -// SAcctObj * pAcct = NULL; -// SUserObj * pUser = NULL; -// SDbObj * pDb = NULL; -// char dbName[256] = {0}; -// -// pConnectMsg = (SConnectMsg *)pMsg; -// -// pUser = mgmtGetUser(pConn->user); -// if (pUser == NULL) { -// code = TSDB_CODE_INVALID_USER; -// goto _rsp; -// } -// -// if (mgmtCheckExpired()) { -// code = TSDB_CODE_GRANT_EXPIRED; -// goto _rsp; -// } -// -// pAcct = mgmtGetAcct(pUser->acct); -// -// code = taosCheckVersion(pConnectMsg->clientVersion, version, 3); -// if (code != 0) { -// mError("invalid client version:%s", pConnectMsg->clientVersion); -// goto _rsp; -// } -// -// if (pConnectMsg->db[0]) { -// sprintf(dbName, "%x%s%s", pAcct->acctId, TS_PATH_DELIMITER, pConnectMsg->db); -// pDb = mgmtGetDb(dbName); -// if (pDb == NULL) { -// code = TSDB_CODE_INVALID_DB; -// goto _rsp; -// } -// } -// -// if (pConn->pAcct) { -// mgmtRemoveConnFromAcct(pConn); -// atomic_fetch_sub_32(&mgmtShellConns, 1); -// atomic_fetch_sub_32(&sdbExtConns, 1); -// } -// -// code = 0; -// pConn->pAcct = pAcct; -// pConn->pDb = pDb; -// pConn->pUser = pUser; -// mgmtEstablishConn(pConn); -// -//_rsp: -// pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_CONNECT_RSP, 128); -// if (pStart == NULL) return 0; -// -// pMsg = pStart; -// pRsp = (STaosRsp *)pMsg; -// pRsp->code = code; -// pMsg += sizeof(STaosRsp); -// -// if (code == 0) { -// pConnectRsp = (SConnectRsp *)pRsp->more; -// sprintf(pConnectRsp->acctId, "%x", pConn->pAcct->acctId); -// strcpy(pConnectRsp->version, version); -// pConnectRsp->writeAuth = pConn->writeAuth; -// pConnectRsp->superAuth = pConn->superAuth; -// pMsg += sizeof(SConnectRsp); -// -// int size; -// if (pSdbPublicIpList != NULL && pSdbIpList != NULL) { -// size = pSdbPublicIpList->numOfIps * 4 + sizeof(SIpList); -// if (pConn->usePublicIp) { -// memcpy(pMsg, pSdbPublicIpList, size); -// } else { -// memcpy(pMsg, pSdbIpList, size); -// } -// } else { -// SIpList tmpIpList; -// tmpIpList.numOfIps = 0; -// size = tmpIpList.numOfIps * 4 + sizeof(SIpList); -// memcpy(pMsg, &tmpIpList, size); -// } -// -// pMsg += size; -// -// // set the time resolution: millisecond or microsecond -// *((uint32_t *)pMsg) = tsTimePrecision; -// pMsg += sizeof(uint32_t); -// -// } else { -// pConn->pAcct = NULL; -// pConn->pUser = NULL; -// } -// -// msgLen = pMsg - pStart; -// taosSendMsgToPeer(pConn->thandle, pStart, msgLen); -// -// char ipstr[24]; -// tinet_ntoa(ipstr, pConn->ip); -// mLPrint("user:%s login from %s, code:%d", pConn->user, ipstr, code); -// -// return code; - return 0; +int32_t mgmtProcessConnectMsg(int8_t type, void *pCont, int32_t contLen, void *ahandle, int32_t code) { + SCMConnectMsg *pConnectMsg = (SCMConnectMsg *) pCont; + uint32_t destIp = 0; + uint32_t srcIp = 0; + + SUserObj *pUser = mgmtGetUser(pConnectMsg->head.userId); + if (pUser == NULL) { + mLError("user:%s login from %s to %s, code:%d", pConnectMsg->head.userId, taosIpStr(srcIp), taosIpStr(destIp), code); + rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + return TSDB_CODE_INVALID_USER; + } + + if (mgmtCheckExpired()) { + mLError("user:%s login from %s to %s, code:%d", pConnectMsg->head.userId, taosIpStr(srcIp), taosIpStr(destIp), code); + rpcSendResponse(ahandle, TSDB_CODE_GRANT_EXPIRED, NULL, 0); + return TSDB_CODE_GRANT_EXPIRED; + } + + SAcctObj *pAcct = mgmtGetAcct(pUser->acct); + if (pAcct == NULL) { + mLError("user:%s login from %s to %s, code:%d", pConnectMsg->head.userId, taosIpStr(srcIp), taosIpStr(destIp), code); + rpcSendResponse(ahandle, TSDB_CODE_INVALID_ACCT, NULL, 0); + return TSDB_CODE_INVALID_ACCT; + } + + code = taosCheckVersion(pConnectMsg->clientVersion, version, 3); + if (code != TSDB_CODE_SUCCESS) { + mLError("user:%s login from %s to %s, code:%d", pConnectMsg->head.userId, taosIpStr(srcIp), taosIpStr(destIp), code); + rpcSendResponse(ahandle, code, NULL, 0); + return code; + } + + if (pConnectMsg->db[0]) { + char dbName[TSDB_TABLE_ID_LEN] = {0}; + sprintf(dbName, "%x%s%s", pAcct->acctId, TS_PATH_DELIMITER, pConnectMsg->db); + SDbObj *pDb = mgmtGetDb(dbName); + if (pDb == NULL) { + mLError("user:%s login from %s to %s, code:%d", pConnectMsg->head.userId, taosIpStr(srcIp), taosIpStr(destIp), code); + rpcSendResponse(ahandle, TSDB_CODE_INVALID_DB, NULL, 0); + return TSDB_CODE_INVALID_DB; + } + } + + SCMConnectRsp *pConnectRsp = rpcMallocCont(sizeof(SCMConnectRsp)); + if (pConnectRsp == NULL) { + mLError("user:%s login from %s to %s, code:%d", pConnectMsg->head.userId, taosIpStr(srcIp), taosIpStr(destIp), code); + rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0); + return TSDB_CODE_SERV_OUT_OF_MEMORY; + } + + sprintf(pConnectRsp->acctId, "%x", pAcct->acctId); + strcpy(pConnectRsp->serverVersion, version); + pConnectRsp->writeAuth = pConn->writeAuth; + pConnectRsp->superAuth = pConn->superAuth; + + pConnectRsp->index = 0; + pConnectRsp->usePublicIp = (destIp == tsPublicIpInt ? 1 : 0); + if (pSdbPublicIpList != NULL && pSdbIpList != NULL) { + pConnectRsp->numOfIps = htons(pSdbPublicIpList->numOfIps); + pConnectRsp->port = htons(tsMgmtShellPort); + if (pConnectRsp->usePublicIp) { + for (int i = 0; i < pSdbPublicIpList->numOfIps; ++i) { + pConnectRsp->ip[i] = htonl(pSdbPublicIpList->ip[i]); + } + } else { + for (int i = 0; i < pSdbIpList->numOfIps; ++i) { + pConnectRsp->ip[i] = htonl(pSdbIpList->ip[i]); + } + } + } else { + pConnectRsp->numOfIps = 0; + pConnectRsp->port = htons(tsMgmtShellPort); + } + + mLPrint("user:%s login from %s to %s, code:%d", pConnectMsg->head.userId, taosIpStr(srcIp), taosIpStr(destIp), code); + return TSDB_CODE_SUCCESS; } -void mgmtProcessMsgFromShell(char type, void *pCont, int contLen, void *ahandle, int32_t code) { +void mgmtProcessMsgFromShell(char type, void *pCont, int32_t contLen, void *ahandle, int32_t code) { // SIntMsg * pMsg = (SIntMsg *)msg; // SConnObj *pConn = (SConnObj *)ahandle; // @@ -1472,7 +1442,7 @@ void mgmtProcessMsgFromShell(char type, void *pCont, int contLen, void *ahandle, // } // // char *cont = (char *)pMsg->content + sizeof(SMgmtHead); -// int contLen = pMsg->msgLen - sizeof(SIntMsg) - sizeof(SMgmtHead); +// int32_t contLen = pMsg->msgLen - sizeof(SIntMsg) - sizeof(SMgmtHead); // // // read-only request can be executed concurrently // if ((pMsg->msgType == TSDB_MSG_TYPE_TABLE_META && (!mgmtCheckMeterMetaMsgType(cont))) || diff --git a/src/rpc/inc/rpcHead.h b/src/rpc/inc/rpcHead.h index 3f7efa9f46f96d97bc95281a3d0176fa76dec7e1..c157e3ba300c474369cec80c3459a3108ea75731 100644 --- a/src/rpc/inc/rpcHead.h +++ b/src/rpc/inc/rpcHead.h @@ -32,6 +32,7 @@ typedef struct { uint32_t uid; // for unique ID inside a client uint32_t sourceId; // source ID, an index for connection list uint32_t destId; // destination ID, an index for connection list + uint32_t destIp; // destination IP address, for NAT scenario char user[TSDB_UNI_LEN]; uint16_t port; // for UDP only, port may be changed char empty[1]; // reserved diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 32e41ccfdd86e9200298c67651e5c73df7a156ff..1b250a0f3fda8f4d885a20bc72912ebddf3ca7d2 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -94,6 +94,7 @@ typedef struct _RpcConn { uint16_t localPort; // for UDP only uint32_t peerUid; // peer UID uint32_t peerIp; // peer IP + uint32_t destIp; // server destination IP to handle NAT uint16_t peerPort; // peer port char peerIpstr[TSDB_IPv4ADDR_LEN]; // peer IP string uint16_t tranId; // outgoing transcation ID, for build message @@ -389,8 +390,9 @@ void rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { SRpcConn *pConn = (SRpcConn *)thandle; SRpcInfo *pRpc = pConn->pRpc; - pInfo->sourceIp = pConn->peerIp; - pInfo->sourcePort = pConn->peerPort; + pInfo->clientIp = pConn->peerIp; + pInfo->clientPort = pConn->peerPort; + pInfo->serverIp = pConn->destIp; strcpy(pInfo->user, pConn->user); } @@ -546,6 +548,7 @@ SRpcConn *rpcSetConnToServer(SRpcInfo *pRpc, SRpcIpSet ipSet) { char ipstr[20] = {0}; tinet_ntoa(ipstr, ipSet.ip[ipSet.index]); pConn = rpcOpenConn(pRpc, ipstr, ipSet.port); + pConn->destIp = ipSet.ip[ipSet.index]; } return pConn; @@ -772,11 +775,13 @@ static void *rpcProcessMsgFromPeer(void *msg, int msgLen, uint32_t ip, uint16_t static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { SRpcInfo *pRpc = pConn->pRpc; + pHead = rpcDecompressRpcMsg(pHead); - int contLen = rpcContLenFromMsg(pHead->msgLen); - uint8_t *pCont = pHead->content; + int contLen = rpcContLenFromMsg(pHead->msgLen); + uint8_t *pCont = pHead->content; if ( rpcIsReq(pHead->msgType) ) { + pConn->destIp = pHead->destIp; taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer); (*(pRpc->cfp))(pHead->msgType, pCont, contLen, pConn, 0); } else { @@ -886,6 +891,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { pHead->tranId = pConn->tranId; pHead->sourceId = pConn->ownId; pHead->destId = pConn->peerId; + pHead->destIp = pConn->destIp; pHead->port = 0; pHead->uid = (uint32_t)((int64_t)pConn + (int64_t)getpid()); memcpy(pHead->user, pConn->user, tListLen(pHead->user));