diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index ec839e25758793f7ada6a12684274903f6bfdfa0..8753142c301047cde62bc88837fd685289d55367 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -325,7 +325,7 @@ typedef struct _sql_obj { int64_t stime; uint32_t queryId; void * thandle; - SRpcIpSet ipSet; + SRpcIpSet *ipSet; void * pStream; void * pSubscription; char * sqlstr; diff --git a/src/client/src/tscProfile.c b/src/client/src/tscProfile.c index c6abfabf93370bac4995b6d8f6384b4fc98a6273..b7c76239981b064b340f20d47b18d4020f77c015 100644 --- a/src/client/src/tscProfile.c +++ b/src/client/src/tscProfile.c @@ -209,16 +209,16 @@ void tscKillStream(STscObj *pObj, uint32_t killId) { } char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) { - SQList *pQList = (SQList *)pMsg; + SCMQqueryList *pQList = (SCMQqueryList *)pMsg; char * pMax = pMsg + TSDB_PAYLOAD_SIZE - 256; - SQDesc *pQdesc = pQList->qdesc; + SCMQueryDesc *pQdesc = pQList->qdesc; pQList->numOfQueries = 0; // We extract the lock to tscBuildHeartBeatMsg function. /* pthread_mutex_lock (&pObj->mutex); */ - pMsg += sizeof(SQList); + pMsg += sizeof(SCMQqueryList); SSqlObj *pSql = pObj->sqlList; while (pSql) { /* @@ -239,15 +239,15 @@ char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) { pQList->numOfQueries++; pQdesc++; pSql = pSql->next; - pMsg += sizeof(SQDesc); + pMsg += sizeof(SCMQueryDesc); if (pMsg > pMax) break; } - SSList *pSList = (SSList *)pMsg; - SSDesc *pSdesc = pSList->sdesc; + SCMStreamList *pSList = (SCMStreamList *)pMsg; + SCMStreamDesc *pSdesc = pSList->sdesc; pSList->numOfStreams = 0; - pMsg += sizeof(SSList); + pMsg += sizeof(SCMStreamList); SSqlStream *pStream = pObj->streamList; while (pStream) { strncpy(pSdesc->sql, pStream->pSql->sqlstr, TSDB_SHOW_SQL_LEN - 1); @@ -265,7 +265,7 @@ char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) { pSList->numOfStreams++; pSdesc++; pStream = pStream->next; - pMsg += sizeof(SSDesc); + pMsg += sizeof(SCMStreamDesc); if (pMsg > pMax) break; } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index fb68f6dfaf5c6192714514989a1ab92412a03899..ac760cc2969f13179167c56766fe9e6e5f33ed5f 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -31,12 +31,10 @@ #define TSC_MGMT_VNODE 999 -SRpcIpSet tscMgmtIpList; int tsMasterIndex = 0; int tsSlaveIndex = 1; -//temp -SRpcIpSet tscMgmtIpSet; +SRpcIpSet tscMgmtIpList; SRpcIpSet tscDnodeIpSet; int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0}; @@ -62,29 +60,27 @@ void tscPrintMgmtIp() { } } -void tscSetMgmtIpListFromCluster(SIpList *pIpList) { - tscMgmtIpList.numOfIps = pIpList->numOfIps; - if (memcmp(tscMgmtIpList.ip, pIpList->ip, pIpList->numOfIps * 4) != 0) { - for (int i = 0; i < pIpList->numOfIps; ++i) { - //tinet_ntoa(tscMgmtIpList.ipStr[i], pIpList->ip[i]); - tscMgmtIpList.ip[i] = pIpList->ip[i]; - } - tscTrace("cluster mgmt IP list:"); - tscPrintMgmtIp(); +void tscSetMgmtIpListFromCluster(SRpcIpSet *pIpList) { + tscMgmtIpList.numOfIps = htons(pIpList->numOfIps); + tscMgmtIpList.index = htons(pIpList->index); + tscMgmtIpList.port = htons(pIpList->port); + for (int32_t i = 0; i ip[i]; } } void tscSetMgmtIpListFromEdge() { - if (tscMgmtIpList.numOfIps != 2) { - tscMgmtIpList.numOfIps = 2; + if (tscMgmtIpList.numOfIps != 1) { + tscMgmtIpList.numOfIps = 1; + tscMgmtIpList.index = 0; + tscMgmtIpList.port = tsMgmtShellPort; tscMgmtIpList.ip[0] = inet_addr(tsMasterIp); - tscMgmtIpList.ip[1] = inet_addr(tsMasterIp); tscTrace("edge mgmt IP list:"); tscPrintMgmtIp(); } } -void tscSetMgmtIpList(SIpList *pIpList) { +void tscSetMgmtIpList(SRpcIpSet *pIpList) { /* * The iplist returned by the cluster edition is the current management nodes * and the iplist returned by the edge edition is empty @@ -120,8 +116,8 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { SSqlRes *pRes = &pSql->res; if (code == 0) { - SHeartBeatRsp *pRsp = (SHeartBeatRsp *)pRes->pRsp; - SIpList * pIpList = &pRsp->ipList; + SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp; + SRpcIpSet * pIpList = &pRsp->ipList; tscSetMgmtIpList(pIpList); if (pRsp->killConnection) { @@ -296,7 +292,7 @@ void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) { pVPeersDesc[pSql->index].ip, pSql->index, pSql->thandle); //TODO fetch from vpeerdesc - pSql->ipSet = tscMgmtIpSet; + pSql->ipSet = &tscMgmtIpList; break; } @@ -364,17 +360,17 @@ int tscSendMsgToServer(SSqlObj *pSql) { } void tscProcessMgmtRedirect(SSqlObj *pSql, uint8_t *cont) { - SIpList *pIpList = (SIpList *)(cont); - tscSetMgmtIpList(pIpList); - - if (pSql->cmd.command < TSDB_SQL_READ) { - tsMasterIndex = 0; - pSql->index = 0; - } else { - pSql->index++; - } - - tscPrintMgmtIp(); +// SIpList *pIpList = (SIpList *)(cont); +// tscSetMgmtIpList(pIpList); +// +// if (pSql->cmd.command < TSDB_SQL_READ) { +// tsMasterIndex = 0; +// pSql->index = 0; +// } else { +// pSql->index++; +// } +// +// tscPrintMgmtIp(); } void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { @@ -2884,18 +2880,18 @@ int tscEstimateHeartBeatMsgLength(SSqlObj *pSql) { STscObj *pObj = pSql->pTscObj; size += tsRpcHeadSize + sizeof(SMgmtHead); - size += sizeof(SQList); + size += sizeof(SCMQqueryList); SSqlObj *tpSql = pObj->sqlList; while (tpSql) { - size += sizeof(SQDesc); + size += sizeof(SCMQueryDesc); tpSql = tpSql->next; } - size += sizeof(SSList); + size += sizeof(SCMStreamList); SSqlStream *pStream = pObj->streamList; while (pStream) { - size += sizeof(SSDesc); + size += sizeof(SCMStreamDesc); pStream = pStream->next; } @@ -3323,10 +3319,10 @@ int tscProcessConnectRsp(SSqlObj *pSql) { assert(len <= tListLen(pObj->db)); strncpy(pObj->db, temp, tListLen(pObj->db)); - SIpList * pIpList; - char *rsp = pRes->pRsp + sizeof(SCMConnectRsp); - pIpList = (SIpList *)rsp; - tscSetMgmtIpList(pIpList); +// SIpList * pIpList; +// char *rsp = pRes->pRsp + sizeof(SCMConnectRsp); +// pIpList = (SIpList *)rsp; +// tscSetMgmtIpList(pIpList); strcpy(pObj->sversion, pConnect->serverVersion); pObj->writeAuth = pConnect->writeAuth; diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 794e41cc6c741848e26af4f0ab88968fbfc26d6a..2cecaef6adc3a74f78c05a6988e8e56a1bbb0a5d 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -306,8 +306,8 @@ typedef struct _connObj { uint32_t ip; // shell IP uint16_t port; // shell port void * thandle; - SQList * pQList; // query list - SSList * pSList; // stream list + SCMQqueryList * pQList; // query list + SCMStreamList * pSList; // stream list uint64_t qhandle; struct _connObj *prev, *next; } SConnObj; diff --git a/src/inc/sdb.h b/src/inc/sdb.h index 7316fd7ef5275b8edc4a2a32ae6aaa543e78a9bd..d48adc5bbd0b1550a6aaafc5ee58e787ead3ad85 100644 --- a/src/inc/sdb.h +++ b/src/inc/sdb.h @@ -37,8 +37,8 @@ extern int sdbExtConns; extern int sdbMaster; extern uint32_t sdbPublicIp; extern uint32_t sdbMasterStartTime; -extern SIpList *pSdbIpList; -extern SIpList *pSdbPublicIpList; +extern SRpcIpSet *pSdbIpList; +extern SRpcIpSet *pSdbPublicIpList; extern void (*sdbWorkAsMasterCallback)(); // this function pointer will be set by taosd @@ -71,8 +71,6 @@ enum _sdbaction { SDB_MAX_ACTION_TYPES }; -#ifdef CLUSTER - #define SDB_MAX_PEERS 4 typedef struct { uint32_t ip; @@ -103,8 +101,6 @@ extern SSdbPeer *sdbPeer[]; #define sdbInited (sdbPeer[0]) #define sdbStatus (sdbPeer[0]->status) -#endif - void *sdbOpenTable(int maxRows, int32_t maxRowSize, char *name, uint8_t keyType, char *directory, void *(*appTool)(char, void *, char *, int, int *)); @@ -138,6 +134,7 @@ int sdbCfgNode(char *cont); int64_t sdbGetVersion(); +int32_t sdbGetRunStatus(); #define TSDB_MAX_TABLES 1000 extern void* tsChildTableSdb; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index bf4787c449e6796d1b28e8a728a87d2be69ebe28..2f5ceb72d11cddb513c3cafd2dd3c2775918dd0a 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -26,6 +26,7 @@ extern "C" { #include "taosdef.h" #include "taoserror.h" #include "taosdef.h" +#include "trpc.h" // message type #define TSDB_MSG_TYPE_REG 1 @@ -187,25 +188,8 @@ typedef enum { extern char *taosMsg[]; -#define TSDB_MSG_DEF_MAX_MPEERS 5 -#define TSDB_MSG_DEF_VERSION_LEN 64 -#define TSDB_MSG_DEF_DB_LEN 128 -#define TSDB_MSG_DEF_USER_LEN 128 -#define TSDB_MSG_DEF_TABLE_LEN 128 -#define TSDB_MSG_DEF_ACCT_LEN 128 - #pragma pack(push, 1) -typedef struct { - char numOfIps; - uint32_t ip[]; -} SIpList; - -typedef struct { - char numOfIps; - uint32_t ip[TSDB_MAX_MGMT_IPS]; -} SMgmtIpList; - typedef struct { uint32_t customerId; uint32_t osId; @@ -332,20 +316,17 @@ typedef struct { } SAlterTableMsg; typedef struct { - char clientVersion[TSDB_MSG_DEF_VERSION_LEN]; - char msgVersion[TSDB_MSG_DEF_VERSION_LEN]; - char db[TSDB_MSG_DEF_DB_LEN]; + char clientVersion[TSDB_VERSION_LEN]; + char msgVersion[TSDB_VERSION_LEN]; + char db[TSDB_TABLE_ID_LEN]; } SCMConnectMsg; typedef struct { - char acctId[TSDB_MSG_DEF_ACCT_LEN]; - char serverVersion[TSDB_MSG_DEF_VERSION_LEN]; - int8_t writeAuth; - int8_t superAuth; - int16_t index; - int16_t numOfIps; - uint16_t port; - uint32_t ip[TSDB_MSG_DEF_MAX_MPEERS]; + char acctId[TSDB_ACCT_LEN]; + char serverVersion[TSDB_VERSION_LEN]; + int8_t writeAuth; + int8_t superAuth; + SRpcIpSet ipList; } SCMConnectRsp; typedef struct { @@ -799,19 +780,12 @@ typedef struct { char config[60]; } SCfgMsg; -typedef struct { - uint32_t queryId; - uint32_t streamId; - char killConnection; - SIpList ipList; -} SHeartBeatRsp; - typedef struct { char sql[TSDB_SHOW_SQL_LEN]; uint32_t queryId; int64_t useconds; int64_t stime; -} SQDesc; +} SCMQueryDesc; typedef struct { char sql[TSDB_SHOW_SQL_LEN]; @@ -822,17 +796,29 @@ typedef struct { int64_t stime; int64_t slidingTime; int64_t interval; -} SSDesc; +} SCMStreamDesc; typedef struct { int32_t numOfQueries; - SQDesc qdesc[]; -} SQList; + SCMQueryDesc qdesc[]; +} SCMQqueryList; typedef struct { int32_t numOfStreams; - SSDesc sdesc[]; -} SSList; + SCMStreamDesc sdesc[]; +} SCMStreamList; + +typedef struct { + SCMQqueryList qlist; + SCMStreamList slist; +} SCMHeartBeatMsg; + +typedef struct { + uint32_t queryId; + uint32_t streamId; + int8_t killConnection; + SRpcIpSet ipList; +} SCMHeartBeatRsp; typedef struct { uint64_t handle; diff --git a/src/mnode/inc/mgmtProfile.h b/src/mnode/inc/mgmtProfile.h index 6d5a08c4bb51d827f16b4b83f85f2bc1a23241a5..d9155131d7313b7a7bf496aac1d24290c9cb4d09 100644 --- a/src/mnode/inc/mgmtProfile.h +++ b/src/mnode/inc/mgmtProfile.h @@ -22,21 +22,21 @@ extern "C" { #include "mnode.h" -int mgmtGetQueryMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn); +int32_t mgmtGetQueryMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn); -int mgmtGetStreamMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn); +int32_t mgmtGetStreamMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn); -int mgmtRetrieveQueries(SShowObj *pShow, char *data, int rows, SConnObj *pConn); +int32_t mgmtRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn); -int mgmtRetrieveStreams(SShowObj *pShow, char *data, int rows, SConnObj *pConn); +int32_t mgmtRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn); -int mgmtSaveQueryStreamList(char *cont, int contLen, SConnObj *pConn); +int32_t mgmtSaveQueryStreamList(SCMHeartBeatMsg *pHBMsg); -int mgmtKillQuery(char *qidstr, SConnObj *pConn); +int32_t mgmtKillQuery(char *qidstr, SConnObj *pConn); -int mgmtKillStream(char *qidstr, SConnObj *pConn); +int32_t mgmtKillStream(char *qidstr, SConnObj *pConn); -int mgmtKillConnection(char *qidstr, SConnObj *pConn); +int32_t mgmtKillConnection(char *qidstr, SConnObj *pConn); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtShell.h b/src/mnode/inc/mgmtShell.h index 529a4396045cb0b17528c07c8e01d1ec4ba2713e..dfb3e001485b7a575a3d7d8b8466bc5771fcffac 100644 --- a/src/mnode/inc/mgmtShell.h +++ b/src/mnode/inc/mgmtShell.h @@ -28,13 +28,13 @@ int32_t mgmtInitShell(); void mgmtCleanUpShell(); extern int32_t (*mgmtCheckRedirectMsg)(SConnObj *pConn, int32_t msgType); -extern int32_t (*mgmtProcessAlterAcctMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn); -extern int32_t (*mgmtProcessCreateDnodeMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn); -extern int32_t (*mgmtProcessCfgMnodeMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn); -extern int32_t (*mgmtProcessDropMnodeMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn); -extern int32_t (*mgmtProcessDropDnodeMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn); -extern int32_t (*mgmtProcessDropAcctMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn); -extern int32_t (*mgmtProcessCreateAcctMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn); +extern int32_t (*mgmtProcessAlterAcctMsg)(void *pCont, int32_t contLen, void *ahandle); +extern int32_t (*mgmtProcessCreateDnodeMsg)(void *pCont, int32_t contLen, void *ahandle); +extern int32_t (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle); +extern int32_t (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle); +extern int32_t (*mgmtProcessDropDnodeMsg)(void *pCont, int32_t contLen, void *ahandle); +extern int32_t (*mgmtProcessDropAcctMsg)(void *pCont, int32_t contLen, void *ahandle); +extern int32_t (*mgmtProcessCreateAcctMsg)(void *pCont, int32_t contLen, void *ahandle); #ifdef __cplusplus } diff --git a/src/mnode/src/mgmtProfile.c b/src/mnode/src/mgmtProfile.c index 298767d2eae8b674141f5a529929bb338b01b60d..f23ac1fd9f97f27bfaac9d9fbba2d7a465a81769 100644 --- a/src/mnode/src/mgmtProfile.c +++ b/src/mnode/src/mgmtProfile.c @@ -28,55 +28,55 @@ typedef struct { } SCDesc; typedef struct { - int index; - int numOfQueries; + int32_t index; + int32_t numOfQueries; SCDesc * connInfo; SCDesc **cdesc; - SQDesc qdesc[]; + SCMQueryDesc qdesc[]; } SQueryShow; typedef struct { - int index; - int numOfStreams; + int32_t index; + int32_t numOfStreams; SCDesc * connInfo; SCDesc **cdesc; - SSDesc sdesc[]; + SCMStreamDesc sdesc[]; } SStreamShow; -int mgmtSaveQueryStreamList(char *cont, int contLen, SConnObj *pConn) { - SAcctObj *pAcct = pConn->pAcct; - - if (contLen <= 0 || pAcct == NULL) { - return 0; - } - - pthread_mutex_lock(&pAcct->mutex); - - if (pConn->pQList) { - pAcct->acctInfo.numOfQueries -= pConn->pQList->numOfQueries; - pAcct->acctInfo.numOfStreams -= pConn->pSList->numOfStreams; - } - - pConn->pQList = realloc(pConn->pQList, contLen); - memcpy(pConn->pQList, cont, contLen); - - pConn->pSList = (SSList *)(((char *)pConn->pQList) + pConn->pQList->numOfQueries * sizeof(SQDesc) + sizeof(SQList)); - - pAcct->acctInfo.numOfQueries += pConn->pQList->numOfQueries; - pAcct->acctInfo.numOfStreams += pConn->pSList->numOfStreams; - - pthread_mutex_unlock(&pAcct->mutex); - - return 0; +int32_t mgmtSaveQueryStreamList(SCMHeartBeatMsg *pHBMsg) { +// SAcctObj *pAcct = pConn->pAcct; +// +// if (contLen <= 0 || pAcct == NULL) { +// return 0; +// } +// +// pthread_mutex_lock(&pAcct->mutex); +// +// if (pConn->pQList) { +// pAcct->acctInfo.numOfQueries -= pConn->pQList->numOfQueries; +// pAcct->acctInfo.numOfStreams -= pConn->pSList->numOfStreams; +// } +// +// pConn->pQList = realloc(pConn->pQList, contLen); +// memcpy(pConn->pQList, cont, contLen); +// +// pConn->pSList = (SCMStreamList *)(((char *)pConn->pQList) + pConn->pQList->numOfQueries * sizeof(SCMQueryDesc) + sizeof(SCMQqueryList)); +// +// pAcct->acctInfo.numOfQueries += pConn->pQList->numOfQueries; +// pAcct->acctInfo.numOfStreams += pConn->pSList->numOfStreams; +// +// pthread_mutex_unlock(&pAcct->mutex); + + return TSDB_CODE_SUCCESS; } -int mgmtGetQueries(SShowObj *pShow, SConnObj *pConn) { +int32_t mgmtGetQueries(SShowObj *pShow, SConnObj *pConn) { SAcctObj * pAcct = pConn->pAcct; SQueryShow *pQueryShow; pthread_mutex_lock(&pAcct->mutex); - pQueryShow = malloc(sizeof(SQDesc) * pAcct->acctInfo.numOfQueries + sizeof(SQueryShow)); + pQueryShow = malloc(sizeof(SCMQueryDesc) * pAcct->acctInfo.numOfQueries + sizeof(SQueryShow)); pQueryShow->numOfQueries = 0; pQueryShow->index = 0; pQueryShow->connInfo = NULL; @@ -87,7 +87,7 @@ int mgmtGetQueries(SShowObj *pShow, SConnObj *pConn) { pQueryShow->cdesc = (SCDesc **)malloc(pAcct->acctInfo.numOfQueries * sizeof(SCDesc *)); pConn = pAcct->pConn; - SQDesc * pQdesc = pQueryShow->qdesc; + SCMQueryDesc * pQdesc = pQueryShow->qdesc; SCDesc * pCDesc = pQueryShow->connInfo; SCDesc **ppCDesc = pQueryShow->cdesc; @@ -97,10 +97,10 @@ int mgmtGetQueries(SShowObj *pShow, SConnObj *pConn) { pCDesc->port = pConn->port; strcpy(pCDesc->user, pConn->pUser->user); - memcpy(pQdesc, pConn->pQList->qdesc, sizeof(SQDesc) * pConn->pQList->numOfQueries); + memcpy(pQdesc, pConn->pQList->qdesc, sizeof(SCMQueryDesc) * pConn->pQList->numOfQueries); pQdesc += pConn->pQList->numOfQueries; pQueryShow->numOfQueries += pConn->pQList->numOfQueries; - for (int i = 0; i < pConn->pQList->numOfQueries; ++i, ++ppCDesc) *ppCDesc = pCDesc; + for (int32_t i = 0; i < pConn->pQList->numOfQueries; ++i, ++ppCDesc) *ppCDesc = pCDesc; pCDesc++; } @@ -117,8 +117,8 @@ int mgmtGetQueries(SShowObj *pShow, SConnObj *pConn) { return 0; } -int mgmtGetQueryMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { - int cols = 0; +int32_t mgmtGetQueryMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { + int32_t cols = 0; SSchema *pSchema = tsGetSchema(pMeta); @@ -156,7 +156,7 @@ int mgmtGetQueryMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { pShow->numOfColumns = cols; pShow->offset[0] = 0; - for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; pShow->numOfRows = 1000000; pShow->pNode = NULL; @@ -166,7 +166,7 @@ int mgmtGetQueryMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { return 0; } -int mgmtKillQuery(char *qidstr, SConnObj *pConn) { +int32_t mgmtKillQuery(char *qidstr, SConnObj *pConn) { char *temp, *chr, idstr[64]; strcpy(idstr, qidstr); @@ -192,8 +192,8 @@ int mgmtKillQuery(char *qidstr, SConnObj *pConn) { pConn = pAcct->pConn; while (pConn) { if (pConn->ip == ip && pConn->port == port && pConn->pQList) { - int i; - SQDesc *pQDesc = pConn->pQList->qdesc; + int32_t i; + SCMQueryDesc *pQDesc = pConn->pQList->qdesc; for (i = 0; i < pConn->pQList->numOfQueries; ++i, ++pQDesc) { if (pQDesc->queryId == queryId) break; } @@ -219,17 +219,17 @@ _error: return TSDB_CODE_INVALID_QUERY_ID; } -int mgmtRetrieveQueries(SShowObj *pShow, char *data, int rows, SConnObj *pConn) { - int numOfRows = 0; +int32_t mgmtRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) { + int32_t numOfRows = 0; char *pWrite; - int cols = 0; + int32_t cols = 0; SQueryShow *pQueryShow = (SQueryShow *)pShow->pNode; if (rows > pQueryShow->numOfQueries - pQueryShow->index) rows = pQueryShow->numOfQueries - pQueryShow->index; while (numOfRows < rows) { - SQDesc *pNode = pQueryShow->qdesc + pQueryShow->index; + SCMQueryDesc *pNode = pQueryShow->qdesc + pQueryShow->index; SCDesc *pCDesc = pQueryShow->cdesc[pQueryShow->index]; cols = 0; @@ -269,13 +269,13 @@ int mgmtRetrieveQueries(SShowObj *pShow, char *data, int rows, SConnObj *pConn) return numOfRows; } -int mgmtGetStreams(SShowObj *pShow, SConnObj *pConn) { +int32_t mgmtGetStreams(SShowObj *pShow, SConnObj *pConn) { SAcctObj * pAcct = pConn->pAcct; SStreamShow *pStreamShow; pthread_mutex_lock(&pAcct->mutex); - pStreamShow = malloc(sizeof(SSDesc) * pAcct->acctInfo.numOfStreams + sizeof(SQueryShow)); + pStreamShow = malloc(sizeof(SCMStreamDesc) * pAcct->acctInfo.numOfStreams + sizeof(SQueryShow)); pStreamShow->numOfStreams = 0; pStreamShow->index = 0; pStreamShow->connInfo = NULL; @@ -286,7 +286,7 @@ int mgmtGetStreams(SShowObj *pShow, SConnObj *pConn) { pStreamShow->cdesc = (SCDesc **)malloc(pAcct->acctInfo.numOfStreams * sizeof(SCDesc *)); pConn = pAcct->pConn; - SSDesc * pSdesc = pStreamShow->sdesc; + SCMStreamDesc * pSdesc = pStreamShow->sdesc; SCDesc * pCDesc = pStreamShow->connInfo; SCDesc **ppCDesc = pStreamShow->cdesc; @@ -296,10 +296,10 @@ int mgmtGetStreams(SShowObj *pShow, SConnObj *pConn) { pCDesc->port = pConn->port; strcpy(pCDesc->user, pConn->pUser->user); - memcpy(pSdesc, pConn->pSList->sdesc, sizeof(SSDesc) * pConn->pSList->numOfStreams); + memcpy(pSdesc, pConn->pSList->sdesc, sizeof(SCMStreamDesc) * pConn->pSList->numOfStreams); pSdesc += pConn->pSList->numOfStreams; pStreamShow->numOfStreams += pConn->pSList->numOfStreams; - for (int i = 0; i < pConn->pSList->numOfStreams; ++i, ++ppCDesc) *ppCDesc = pCDesc; + for (int32_t i = 0; i < pConn->pSList->numOfStreams; ++i, ++ppCDesc) *ppCDesc = pCDesc; pCDesc++; } @@ -316,8 +316,8 @@ int mgmtGetStreams(SShowObj *pShow, SConnObj *pConn) { return 0; } -int mgmtGetStreamMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { - int cols = 0; +int32_t mgmtGetStreamMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { + int32_t cols = 0; SSchema *pSchema = tsGetSchema(pMeta); pShow->bytes[cols] = TSDB_USER_LEN; @@ -366,7 +366,7 @@ int mgmtGetStreamMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { pShow->numOfColumns = cols; pShow->offset[0] = 0; - for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; pShow->numOfRows = 1000000; pShow->pNode = NULL; @@ -376,17 +376,17 @@ int mgmtGetStreamMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { return 0; } -int mgmtRetrieveStreams(SShowObj *pShow, char *data, int rows, SConnObj *pConn) { - int numOfRows = 0; +int32_t mgmtRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) { + int32_t numOfRows = 0; char *pWrite; - int cols = 0; + int32_t cols = 0; SStreamShow *pStreamShow = (SStreamShow *)pShow->pNode; if (rows > pStreamShow->numOfStreams - pStreamShow->index) rows = pStreamShow->numOfStreams - pStreamShow->index; while (numOfRows < rows) { - SSDesc *pNode = pStreamShow->sdesc + pStreamShow->index; + SCMStreamDesc *pNode = pStreamShow->sdesc + pStreamShow->index; SCDesc *pCDesc = pStreamShow->cdesc[pStreamShow->index]; cols = 0; @@ -434,7 +434,7 @@ int mgmtRetrieveStreams(SShowObj *pShow, char *data, int rows, SConnObj *pConn) return numOfRows; } -int mgmtKillStream(char *qidstr, SConnObj *pConn) { +int32_t mgmtKillStream(char *qidstr, SConnObj *pConn) { char *temp, *chr, idstr[64]; strcpy(idstr, qidstr); @@ -460,8 +460,8 @@ int mgmtKillStream(char *qidstr, SConnObj *pConn) { pConn = pAcct->pConn; while (pConn) { if (pConn->ip == ip && pConn->port == port && pConn->pSList) { - int i; - SSDesc *pSDesc = pConn->pSList->sdesc; + int32_t i; + SCMStreamDesc *pSDesc = pConn->pSList->sdesc; for (i = 0; i < pConn->pSList->numOfStreams; ++i, ++pSDesc) { if (pSDesc->streamId == streamId) break; } @@ -487,7 +487,7 @@ _error: return TSDB_CODE_INVALID_STREAM_ID; } -int mgmtKillConnection(char *qidstr, SConnObj *pConn) { +int32_t mgmtKillConnection(char *qidstr, SConnObj *pConn) { SConnObj *pConn1 = NULL; char * temp, *chr, idstr[64]; strcpy(idstr, qidstr); diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 82a1d6dd1fc0a6e01ad67873cfd8de2c8b63c58a..0b0e627d77dcb231dee2eb9a9156ebcbd97dd620 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -46,9 +46,11 @@ static void mgmtInitShowMsgFp(); void * tsShellConn = NULL; SConnObj *connList; -void mgmtProcessMsgFromShell(char type, void *pCont, int32_t contLen, void *ahandle, int32_t code); -int32_t mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); -int32_t (*mgmtProcessShellMsg[TSDB_MSG_TYPE_MAX])(char *, int32_t, SConnObj *); + +static void mgmtProcessMsgFromShell(char type, void *pCont, int contLen, void *ahandle, int32_t code); +static int32_t (*mgmtProcessShellMsg[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contLen, void *ahandle); +static int32_t mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); + void mgmtInitProcessShellMsg(); int32_t mgmtRedirectMsg(SConnObj *pConn, int32_t msgType); int32_t mgmtKillQuery(char *queryId, SConnObj *pConn); @@ -156,31 +158,8 @@ static uint32_t mgmtSetMeterTagValue(char *pTags, STabObj *pMetric, STabObj *pMe // return 0; //} -/** - * check if we need to add mgmtProcessMeterMetaMsg into tranQueue, which will be executed one-by-one. - * - * @param pMsg - * @return - */ -bool mgmtCheckMeterMetaMsgType(char *pMsg) { -// SMeterInfoMsg *pInfo = (SMeterInfoMsg *)pMsg; -// -// int16_t autoCreate = htons(pInfo->createFlag); -// STableInfo *table = mgmtGetTable(pInfo->meterId); - -// If table does not exists and autoCreate flag is set, we add the handler into another task queue, namely tranQueue -// bool addIntoTranQueue = (pMeterObj == NULL && autoCreate == 1); -// if (addIntoTranQueue) { -// mTrace("meter:%s auto created task added", pInfo->meterId); -// } - -// bool addIntoTranQueue = true; - -// return addIntoTranQueue; - return 0; -} -int32_t mgmtProcessMeterMetaMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { +int32_t mgmtProcessMeterMetaMsg(void *pCont, int32_t contLen, void *ahandle) { // SMeterInfoMsg *pInfo = (SMeterInfoMsg *)pMsg; // STabObj * pMeterObj = NULL; // SVgObj * pVgroup = NULL; @@ -352,7 +331,7 @@ int32_t mgmtProcessMeterMetaMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { * | | | * pStart pCurMeter pTail **/ -int32_t mgmtProcessMultiMeterMetaMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { +int32_t mgmtProcessMultiMeterMetaMsg(void *pCont, int32_t contLen, void *ahandle) { // SDbObj * pDbObj = NULL; // STabObj * pMeterObj = NULL; // SVgObj * pVgroup = NULL; @@ -507,7 +486,7 @@ int32_t mgmtProcessMultiMeterMetaMsg(char *pMsg, int32_t msgLen, SConnObj *pConn return 0; } -int32_t mgmtProcessMetricMetaMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { +int32_t mgmtProcessMetricMetaMsg(void *pCont, int32_t contLen, void *ahandle) { // SSuperTableMetaMsg *pSuperTableMetaMsg = (SSuperTableMetaMsg *)pMsg; // STabObj * pMetric; // STaosRsp * pRsp; @@ -558,7 +537,7 @@ int32_t mgmtProcessMetricMetaMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { return 0; } -int32_t mgmtProcessCreateDbMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { +int32_t mgmtProcessCreateDbMsg(void *pCont, int32_t contLen, void *ahandle) { // SCreateDbMsg *pCreate = (SCreateDbMsg *)pMsg; // int32_t code = 0; // @@ -593,12 +572,12 @@ int32_t mgmtProcessCreateDbMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { return 0; } -int32_t mgmtProcessCreateMnodeMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { +int32_t mgmtProcessCreateMnodeMsg(void *pCont, int32_t contLen, void *ahandle) { // return rpcSendResponse(pConn->thandle, TSDB_MSG_TYPE_CREATE_MNODE_RSP, TSDB_CODE_OPS_NOT_SUPPORT); return 0; } -int32_t mgmtProcessAlterDbMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { +int32_t mgmtProcessAlterDbMsg(void *pCont, int32_t contLen, void *ahandle) { // SAlterDbMsg *pAlter = (SAlterDbMsg *)pMsg; // int32_t code = 0; // @@ -624,7 +603,7 @@ int32_t mgmtProcessAlterDbMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { return 0; } -int32_t mgmtProcessKillQueryMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { +int32_t mgmtProcessKillQueryMsg(void *pCont, int32_t contLen, void *ahandle) { // int32_t code = 0; // SKillQuery *pKill = (SKillQuery *)pMsg; // @@ -639,7 +618,7 @@ int32_t mgmtProcessKillQueryMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { return 0; } -int32_t mgmtProcessKillStreamMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { +int32_t mgmtProcessKillStreamMsg(void *pCont, int32_t contLen, void *ahandle) { // int32_t code = 0; // SKillStream *pKill = (SKillStream *)pMsg; // @@ -654,7 +633,7 @@ int32_t mgmtProcessKillStreamMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { return 0; } -int32_t mgmtProcessKillConnectionMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { +int32_t mgmtProcessKillConnectionMsg(void *pCont, int32_t contLen, void *ahandle) { // int32_t code = 0; // SKillConnection *pKill = (SKillConnection *)pMsg; // @@ -669,7 +648,7 @@ int32_t mgmtProcessKillConnectionMsg(char *pMsg, int32_t msgLen, SConnObj *pConn return 0; } -int32_t mgmtProcessCreateUserMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { +int32_t mgmtProcessCreateUserMsg(void *pCont, int32_t contLen, void *ahandle) { // SCreateUserMsg *pCreate = (SCreateUserMsg *)pMsg; // int32_t code = 0; // @@ -691,7 +670,7 @@ int32_t mgmtProcessCreateUserMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { return 0; } -int32_t mgmtProcessAlterUserMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { +int32_t mgmtProcessAlterUserMsg(void *pCont, int32_t contLen, void *ahandle) { // SAlterUserMsg *pAlter = (SAlterUserMsg *)pMsg; // int32_t code = 0; // SUserObj * pUser; @@ -803,7 +782,7 @@ int32_t mgmtProcessAlterUserMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { return 0; } -int32_t mgmtProcessDropUserMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { +int32_t mgmtProcessDropUserMsg(void *pCont, int32_t contLen, void *ahandle) { // SDropUserMsg *pDrop = (SDropUserMsg *)pMsg; // int32_t code = 0; // SUserObj * pUser; @@ -862,7 +841,7 @@ int32_t mgmtProcessDropUserMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { return 0; } -int32_t mgmtProcessDropDbMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { +int32_t mgmtProcessDropDbMsg(void *pCont, int32_t contLen, void *ahandle) { // SDropDbMsg *pDrop = (SDropDbMsg *)pMsg; // int32_t code; // @@ -883,7 +862,7 @@ int32_t mgmtProcessDropDbMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { return 0; } -int32_t mgmtProcessUseDbMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { +int32_t mgmtProcessUseDbMsg(void *pCont, int32_t contLen, void *ahandle) { // SUseDbMsg *pUse = (SUseDbMsg *)pMsg; // int32_t code; // @@ -933,7 +912,7 @@ static void mgmtInitShowMsgFp() { mgmtRetrieveFp[TSDB_MGMT_TABLE_VNODES] = mgmtRetrieveVnodes; } -int32_t mgmtProcessShowMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { +int32_t mgmtProcessShowMsg(void *pCont, int32_t contLen, void *ahandle) { // SShowMsg * pShowMsg = (SShowMsg *)pMsg; // STaosRsp * pRsp; // char * pStart; @@ -992,7 +971,7 @@ int32_t mgmtProcessShowMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { return 0; } -int32_t mgmtProcessRetrieveMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { +int32_t mgmtProcessRetrieveMsg(void *pCont, int32_t contLen, void *ahandle) { // SRetrieveMeterMsg *pRetrieve; // SRetrieveMeterRsp *pRsp; // int32_t rowsToRead = 0, size = 0, rowsRead = 0; @@ -1080,7 +1059,7 @@ int32_t mgmtProcessRetrieveMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { return 0; } -int32_t mgmtProcessCreateTableMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { +int32_t mgmtProcessCreateTableMsg(void *pCont, int32_t contLen, void *ahandle) { // SCreateTableMsg *pCreate = (SCreateTableMsg *)pMsg; // int32_t code; // SSchema * pSchema; @@ -1135,7 +1114,7 @@ int32_t mgmtProcessCreateTableMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { return 0; } -int32_t mgmtProcessDropTableMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { +int32_t mgmtProcessDropTableMsg(void *pCont, int32_t contLen, void *ahandle) { // SDropTableMsg *pDrop = (SDropTableMsg *)pMsg; // int32_t code; // @@ -1161,7 +1140,7 @@ int32_t mgmtProcessDropTableMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { return 0; } -int32_t mgmtProcessAlterTableMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { +int32_t mgmtProcessAlterTableMsg(void *pCont, int32_t contLen, void *ahandle) { // SAlterTableMsg *pAlter = (SAlterTableMsg *)pMsg; // int32_t code; // @@ -1202,7 +1181,7 @@ int32_t mgmtProcessAlterTableMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { return 0; } -int32_t mgmtProcessCfgDnodeMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { +int32_t mgmtProcessCfgDnodeMsg(void *pCont, int32_t contLen, void *ahandle) { // int32_t code = 0; // SCfgMsg *pCfg = (SCfgMsg *)pMsg; // @@ -1220,80 +1199,51 @@ int32_t mgmtProcessCfgDnodeMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { // // if (code == 0) mTrace("dnode:%s is configured by %s", pCfg->ip, pConn->pUser->user); // -// return 0; -//} -// -//int32_t mgmtProcessHeartBeatMsg(char *cont, int32_t contLen, SConnObj *pConn) { -// char * pStart, *pMsg; -// int32_t msgLen; -// STaosRsp *pRsp; -// -// mgmtSaveQueryStreamList(cont, contLen, pConn); -// -// pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_HEARTBEAT_RSP, 128); -// if (pStart == NULL) return 0; -// pMsg = pStart; -// pRsp = (STaosRsp *)pMsg; -// pRsp->code = 0; -// pMsg = (char *)pRsp->more; -// -// SHeartBeatRsp *pHBRsp = (SHeartBeatRsp *)pRsp->more; -// pHBRsp->queryId = pConn->queryId; -// pConn->queryId = 0; -// pHBRsp->streamId = pConn->streamId; -// pHBRsp->streamId = pConn->streamId; -// pConn->streamId = 0; -// pHBRsp->killConnection = pConn->killConnection; -// -// if (pConn->usePublicIp) { -// if (pSdbPublicIpList != NULL) { -// int32_t size = pSdbPublicIpList->numOfIps * 4; -// pHBRsp->ipList.numOfIps = pSdbPublicIpList->numOfIps; -// memcpy(pHBRsp->ipList.ip, pSdbPublicIpList->ip, size); -// pMsg += sizeof(SHeartBeatRsp) + size; -// } else { -// pHBRsp->ipList.numOfIps = 0; -// pMsg += sizeof(SHeartBeatRsp); -// } -// -// } else { -// if (pSdbIpList != NULL) { -// int32_t size = pSdbIpList->numOfIps * 4; -// pHBRsp->ipList.numOfIps = pSdbIpList->numOfIps; -// memcpy(pHBRsp->ipList.ip, pSdbIpList->ip, size); -// pMsg += sizeof(SHeartBeatRsp) + size; -// } else { -// pHBRsp->ipList.numOfIps = 0; -// pMsg += sizeof(SHeartBeatRsp); -// } -// } -// msgLen = pMsg - pStart; -// -// taosSendMsgToPeer(pConn->thandle, pStart, msgLen); - return 0; } -void mgmtEstablishConn(SConnObj *pConn) { -// atomic_fetch_add_32(&mgmtShellConns, 1); -// atomic_fetch_add_32(&sdbExtConns, 1); -// pConn->stime = taosGetTimestampMs(); -// -// if (strcmp(pConn->pUser->user, "root") == 0) { -// pConn->superAuth = 1; -// pConn->writeAuth = 1; -// } else { -// pConn->superAuth = pConn->pUser->superAuth; -// pConn->writeAuth = pConn->pUser->writeAuth; -// if (pConn->superAuth) { -// pConn->writeAuth = 1; -// } -// } -// -// int32_t tempint32; -// uint32_t tempuint32; -// taosGetRpcConnInfo(pConn->thandle, &tempuint32, &pConn->ip, &pConn->port, &tempint32, &tempint32); -// mgmtAddConnIntoAcct(pConn); +int32_t mgmtProcessHeartBeatMsg(void *pCont, int32_t contLen, void *ahandle) { + SCMHeartBeatMsg *pHBMsg = (SCMHeartBeatMsg *) pCont; + mgmtSaveQueryStreamList(pHBMsg); + + SCMHeartBeatRsp *pHBRsp = (SCMHeartBeatRsp *) rpcMallocCont(contLen); + if (pHBRsp == NULL) { + rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0); + rpcFreeCont(pCont); + return TSDB_CODE_SERV_OUT_OF_MEMORY; + } + + SRpcConnInfo connInfo; + rpcGetConnInfo(ahandle, &connInfo); + + pHBRsp->ipList.index = 0; + pHBRsp->ipList.port = htons(tsMgmtShellPort); + pHBRsp->ipList.numOfIps = 0; + if (pSdbPublicIpList != NULL && pSdbIpList != NULL) { + pHBRsp->ipList.numOfIps = htons(pSdbPublicIpList->numOfIps); + if (connInfo.serverIp == tsPublicIpInt) { + for (int i = 0; i < pSdbPublicIpList->numOfIps; ++i) { + pHBRsp->ipList.ip[i] = htonl(pSdbPublicIpList->ip[i]); + } + } else { + for (int i = 0; i < pSdbIpList->numOfIps; ++i) { + pHBRsp->ipList.ip[i] = htonl(pSdbIpList->ip[i]); + } + } + } + + /* + * TODO + * Dispose kill stream or kill query message + */ + pHBRsp->queryId = 0; + pHBRsp->streamId = 0; + pHBRsp->killConnection = 0; + + rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, pHBRsp, sizeof(SCMHeartBeatMsg)); + rpcFreeCont(pCont); + + return TSDB_CODE_SUCCESS; } int32_t mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) { @@ -1313,36 +1263,32 @@ int32_t mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secr return TSDB_CODE_SUCCESS; } -int32_t mgmtProcessConnectMsg(int8_t type, void *pCont, int32_t contLen, void *ahandle, int32_t code) { +static int32_t mgmtProcessConnectMsg(void *pCont, int32_t contLen, void *ahandle) { SCMConnectMsg *pConnectMsg = (SCMConnectMsg *) pCont; SRpcConnInfo connInfo; rpcGetConnInfo(ahandle, &connInfo); + int32_t code; SUserObj *pUser = mgmtGetUser(connInfo.user); if (pUser == NULL) { - mLError("user:%s login from %s, code:%d", connInfo.user, taosIpStr(connInfo.clientIp), code); - rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); - return TSDB_CODE_INVALID_USER; + code = TSDB_CODE_INVALID_USER; + goto connect_over; } if (mgmtCheckExpired()) { - mLError("user:%s login from %s, code:%d", connInfo.user, taosIpStr(connInfo.clientIp), code); - rpcSendResponse(ahandle, TSDB_CODE_GRANT_EXPIRED, NULL, 0); - return TSDB_CODE_GRANT_EXPIRED; + code = TSDB_CODE_GRANT_EXPIRED; + goto connect_over; } SAcctObj *pAcct = mgmtGetAcct(pUser->acct); if (pAcct == NULL) { - mLError("user:%s login from %s, code:%d", connInfo.user, taosIpStr(connInfo.clientIp), code); - rpcSendResponse(ahandle, TSDB_CODE_INVALID_ACCT, NULL, 0); - return TSDB_CODE_INVALID_ACCT; + code = TSDB_CODE_INVALID_ACCT; + goto connect_over; } code = taosCheckVersion(pConnectMsg->clientVersion, version, 3); if (code != TSDB_CODE_SUCCESS) { - mLError("user:%s login from %s, code:%d", connInfo.user, taosIpStr(connInfo.clientIp), code); - rpcSendResponse(ahandle, code, NULL, 0); - return code; + goto connect_over; } if (pConnectMsg->db[0]) { @@ -1350,131 +1296,101 @@ int32_t mgmtProcessConnectMsg(int8_t type, void *pCont, int32_t contLen, void *a sprintf(dbName, "%x%s%s", pAcct->acctId, TS_PATH_DELIMITER, pConnectMsg->db); SDbObj *pDb = mgmtGetDb(dbName); if (pDb == NULL) { - mLError("user:%s login from %s, code:%d", connInfo.user, taosIpStr(connInfo.clientIp), code); - rpcSendResponse(ahandle, TSDB_CODE_INVALID_DB, NULL, 0); - return TSDB_CODE_INVALID_DB; + code = TSDB_CODE_INVALID_DB; + goto connect_over; } } SCMConnectRsp *pConnectRsp = rpcMallocCont(sizeof(SCMConnectRsp)); if (pConnectRsp == NULL) { - mLError("user:%s login from %s, code:%d", connInfo.user, taosIpStr(connInfo.clientIp), code); - rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0); - return TSDB_CODE_SERV_OUT_OF_MEMORY; + code = TSDB_CODE_SERV_OUT_OF_MEMORY; + goto connect_over; } sprintf(pConnectRsp->acctId, "%x", pAcct->acctId); strcpy(pConnectRsp->serverVersion, version); pConnectRsp->writeAuth = pUser->writeAuth; pConnectRsp->superAuth = pUser->superAuth; - - pConnectRsp->index = 0; + pConnectRsp->ipList.index = 0; + pConnectRsp->ipList.port = htons(tsMgmtShellPort); + pConnectRsp->ipList.numOfIps = 0; if (pSdbPublicIpList != NULL && pSdbIpList != NULL) { - pConnectRsp->numOfIps = htons(pSdbPublicIpList->numOfIps); - pConnectRsp->port = htons(tsMgmtShellPort); + pConnectRsp->ipList.numOfIps = htons(pSdbPublicIpList->numOfIps); if (connInfo.serverIp == tsPublicIpInt) { for (int i = 0; i < pSdbPublicIpList->numOfIps; ++i) { - pConnectRsp->ip[i] = htonl(pSdbPublicIpList->ip[i]); + pConnectRsp->ipList.ip[i] = htonl(pSdbPublicIpList->ip[i]); } } else { for (int i = 0; i < pSdbIpList->numOfIps; ++i) { - pConnectRsp->ip[i] = htonl(pSdbIpList->ip[i]); + pConnectRsp->ipList.ip[i] = htonl(pSdbIpList->ip[i]); } } + } + +connect_over: + if (code != TSDB_CODE_SUCCESS) { + mLError("user:%s login from %s, code:%d", connInfo.user, taosIpStr(connInfo.clientIp), code); + rpcSendResponse(ahandle, code, NULL, 0); } else { - pConnectRsp->numOfIps = 0; - pConnectRsp->port = htons(tsMgmtShellPort); + mLPrint("user:%s login from %s, code:%d", connInfo.user, taosIpStr(connInfo.clientIp), code); + rpcSendResponse(ahandle, code, pConnectRsp, sizeof(pConnectRsp)); } - mLPrint("user:%s login from %s, code:%d", connInfo.user, taosIpStr(connInfo.clientIp), code); - return TSDB_CODE_SUCCESS; + rpcFreeCont(pCont); + return code; } -void mgmtProcessMsgFromShell(char type, void *pCont, int32_t contLen, void *ahandle, int32_t code) { -// SIntMsg * pMsg = (SIntMsg *)msg; -// SConnObj *pConn = (SConnObj *)ahandle; -// -// if (msg == NULL) { -// if (pConn) { -// mgmtRemoveConnFromAcct(pConn); -// atomic_fetch_sub_32(&mgmtShellConns, 1); -// atomic_fetch_sub_32(&sdbExtConns, 1); -// mTrace("connection from %s is closed", pConn->pUser->user); -// memset(pConn, 0, sizeof(SConnObj)); -// } -// -// return NULL; -// } -// -//#ifdef CLUSTER -// if (sdbInited == NULL || sdbStatus != SDB_STATUS_SERVING) { -// taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_NOT_READY); -// mTrace("shell msg is ignored since SDB is not ready"); -// } -//#endif -// -// if (pConn == NULL) { -// pConn = connList + pMsg->destId; -// pConn->thandle = thandle; -// strcpy(pConn->user, pMsg->meterId); -// pConn->usePublicIp = (pMsg->destIp == tsPublicIpInt ? 1 : 0); -// mTrace("pConn:%p is rebuild, destIp:%s publicIp:%s usePublicIp:%u", -// pConn, taosIpStr(pMsg->destIp), taosIpStr(tsPublicIpInt), pConn->usePublicIp); -// } -// -// if (pMsg->msgType == TSDB_MSG_TYPE_CONNECT) { -// (*mgmtProcessShellMsg[pMsg->msgType])((char *)pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pConn); -// } else { -// SMgmtHead *pHead = (SMgmtHead *)pMsg->content; -// if (pConn->pAcct == NULL) { -// pConn->pUser = mgmtGetUser(pConn->user); -// if (pConn->pUser) { -// pConn->pAcct = mgmtGetAcct(pConn->pUser->acct); -// mgmtEstablishConn(pConn); -// mTrace("login from:%x:%hu", pConn->ip, htons(pConn->port)); -// } -// } -// -// if (pConn->pAcct) { -// if (pConn->pDb == NULL || strncmp(pConn->pDb->name, pHead->db, tListLen(pConn->pDb->name)) != 0) { -// pConn->pDb = mgmtGetDb(pHead->db); -// } -// -// char *cont = (char *)pMsg->content + sizeof(SMgmtHead); -// int32_t contLen = pMsg->msgLen - sizeof(SIntMsg) - sizeof(SMgmtHead); -// -// // read-only request can be executed concurrently -// if ((pMsg->msgType == TSDB_MSG_TYPE_TABLE_META && (!mgmtCheckMeterMetaMsgType(cont))) || -// pMsg->msgType == TSDB_MSG_TYPE_STABLE_META || pMsg->msgType == TSDB_MSG_TYPE_DNODE_RETRIEVE || -// pMsg->msgType == TSDB_MSG_TYPE_SHOW || pMsg->msgType == TSDB_MSG_TYPE_MULTI_TABLE_META) { -// (*mgmtProcessShellMsg[pMsg->msgType])(cont, contLen, pConn); -// } else { -// if (mgmtProcessShellMsg[pMsg->msgType]) { -// SSchedMsg schedMsg; -// schedMsg.msg = malloc(pMsg->msgLen); // Message to deal with -// memcpy(schedMsg.msg, pMsg, pMsg->msgLen); -// -// schedMsg.fp = mgmtProcessTranRequest; -// schedMsg.tfp = NULL; -// schedMsg.thandle = pConn; -// -// taosScheduleTask(tsMgmtTranQhandle, &schedMsg); -// } else { -// mError("%s from shell is not processed", taosMsg[pMsg->msgType]); -// } -// } -// } else { -// taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_DISCONNECTED); -// } -// } -// -// if (pConn->pAcct == NULL) { -// taosCloseRpcConn(pConn->thandle); -// memset(pConn, 0, sizeof(SConnObj)); // close the connection; -// pConn = NULL; -// } -// -// return pConn; + +/** + * check if we need to add mgmtProcessMeterMetaMsg into tranQueue, which will be executed one-by-one. + */ +static bool mgmtCheckMeterMetaMsgType(void *pMsg) { + SMeterInfoMsg *pInfo = (SMeterInfoMsg *) pMsg; + int16_t autoCreate = htons(pInfo->createFlag); + STableInfo *pTable = mgmtGetTable(pInfo->meterId); + + // If table does not exists and autoCreate flag is set, we add the handler into task queue + bool addIntoTranQueue = (pTable == NULL && autoCreate == 1); + if (addIntoTranQueue) { + mTrace("meter:%s auto created task added", pInfo->meterId); + } + + return addIntoTranQueue; +} + +static bool mgmtCheckMsgReadOnly(int8_t type, void *pCont) { + if ((type == TSDB_MSG_TYPE_TABLE_META && (!mgmtCheckMeterMetaMsgType(pCont))) || + type == TSDB_MSG_TYPE_STABLE_META || type == TSDB_MSG_TYPE_DNODE_RETRIEVE || + type == TSDB_MSG_TYPE_SHOW || type == TSDB_MSG_TYPE_MULTI_TABLE_META || + type == TSDB_MSG_TYPE_CONNECT) { + return true; + } + + return false; +} + +static void mgmtProcessMsgFromShell(char type, void *pCont, int contLen, void *ahandle, int32_t code) { + if (sdbGetRunStatus() != SDB_STATUS_SERVING) { + mTrace("shell msg is ignored since SDB is not ready"); + rpcSendResponse(ahandle, TSDB_CODE_NOT_READY, NULL, 0); + return; + } + + if (mgmtCheckMsgReadOnly(type, pCont)) { + (*mgmtProcessShellMsg[(int8_t)type])(pCont, contLen, ahandle); + } else { + if (mgmtProcessShellMsg[(int8_t)type]) { + SSchedMsg schedMsg; + schedMsg.msg = malloc(contLen); + memcpy(schedMsg.msg, pCont, contLen); + schedMsg.fp = mgmtProcessTranRequest; + schedMsg.tfp = NULL; + schedMsg.thandle = ahandle; + taosScheduleTask(tsMgmtTranQhandle, &schedMsg); + } else { + mError("%s from shell is not processed", taosMsg[(int8_t)type]); + } + } } void mgmtInitProcessShellMsg() { @@ -1499,7 +1415,7 @@ void mgmtInitProcessShellMsg() { mgmtProcessShellMsg[TSDB_MSG_TYPE_DNODE_RETRIEVE] = mgmtProcessRetrieveMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_SHOW] = mgmtProcessShowMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_CONNECT] = mgmtProcessConnectMsg; -// mgmtProcessShellMsg[TSDB_MSG_TYPE_HEARTBEAT] = mgmtProcessHeartBeatMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_HEARTBEAT] = mgmtProcessHeartBeatMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_DNODE] = mgmtProcessCreateDnodeMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_DNODE] = mgmtProcessDropDnodeMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_MNODE] = mgmtProcessCreateMnodeMsg; @@ -1516,44 +1432,44 @@ int32_t mgmtCheckRedirectMsgImp(SConnObj *pConn, int32_t msgType) { } int32_t (*mgmtCheckRedirectMsg)(SConnObj *pConn, int32_t msgType) = mgmtCheckRedirectMsgImp; -int32_t mgmtProcessAlterAcctMsgImp(char *pMsg, int32_t msgLen, SConnObj *pConn) { +int32_t mgmtProcessAlterAcctMsgImp(void *pCont, int32_t contLen, void *ahandle) { //return taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_ALTER_ACCT_RSP, TSDB_CODE_OPS_NOT_SUPPORT); return 0; } -int32_t (*mgmtProcessAlterAcctMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn) = mgmtProcessAlterAcctMsgImp; +int32_t (*mgmtProcessAlterAcctMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessAlterAcctMsgImp; -int32_t mgmtProcessCreateDnodeMsgImp(char *pMsg, int32_t msgLen, SConnObj *pConn) { +int32_t mgmtProcessCreateDnodeMsgImp(void *pCont, int32_t contLen, void *ahandle) { //return taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_CREATE_DNODE_RSP, TSDB_CODE_OPS_NOT_SUPPORT); return 0; } -int32_t (*mgmtProcessCreateDnodeMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn) = mgmtProcessCreateDnodeMsgImp; +int32_t (*mgmtProcessCreateDnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessCreateDnodeMsgImp; -int32_t mgmtProcessCfgMnodeMsgImp(char *pMsg, int32_t msgLen, SConnObj *pConn) { +int32_t mgmtProcessCfgMnodeMsgImp(void *pCont, int32_t contLen, void *ahandle) { //return taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_CFG_MNODE_RSP, TSDB_CODE_OPS_NOT_SUPPORT); return 0; } -int32_t (*mgmtProcessCfgMnodeMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn) = mgmtProcessCfgMnodeMsgImp; +int32_t (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessCfgMnodeMsgImp; -int32_t mgmtProcessDropMnodeMsgImp(char *pMsg, int32_t msgLen, SConnObj *pConn) { +int32_t mgmtProcessDropMnodeMsgImp(void *pCont, int32_t contLen, void *ahandle) { //return taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_DROP_MNODE_RSP, TSDB_CODE_OPS_NOT_SUPPORT); return 0; } -int32_t (*mgmtProcessDropMnodeMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn) = mgmtProcessDropMnodeMsgImp; +int32_t (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessDropMnodeMsgImp; -int32_t mgmtProcessDropDnodeMsgImp(char *pMsg, int32_t msgLen, SConnObj *pConn) { +int32_t mgmtProcessDropDnodeMsgImp(void *pCont, int32_t contLen, void *ahandle) { //return taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_DROP_DNODE_RSP, TSDB_CODE_OPS_NOT_SUPPORT); return 0; } -int32_t (*mgmtProcessDropDnodeMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn) = mgmtProcessDropDnodeMsgImp; +int32_t (*mgmtProcessDropDnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessDropDnodeMsgImp; -int32_t mgmtProcessDropAcctMsgImp(char *pMsg, int32_t msgLen, SConnObj *pConn) { +int32_t mgmtProcessDropAcctMsgImp(void *pCont, int32_t contLen, void *ahandle) { // return taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_DROP_ACCT_RSP, TSDB_CODE_OPS_NOT_SUPPORT); return 0; } -int32_t (*mgmtProcessDropAcctMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn) = mgmtProcessDropAcctMsgImp; +int32_t (*mgmtProcessDropAcctMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessDropAcctMsgImp; -int32_t mgmtProcessCreateAcctMsgImp(char *pMsg, int32_t msgLen, SConnObj *pConn) { +int32_t mgmtProcessCreateAcctMsgImp(void *pCont, int32_t contLen, void *ahandle) { // return taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_CREATE_ACCT_RSP, TSDB_CODE_OPS_NOT_SUPPORT); return 0; } -int32_t (*mgmtProcessCreateAcctMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn) = mgmtProcessCreateAcctMsgImp; \ No newline at end of file +int32_t (*mgmtProcessCreateAcctMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessCreateAcctMsgImp; \ No newline at end of file diff --git a/src/sdb/src/sdbEngine.c b/src/sdb/src/sdbEngine.c index fdf79607249f3721aa487fb0344ca0bafb2d2642..96ec56ae1a79bfcbce92f6fc4745c5e906af0594 100644 --- a/src/sdb/src/sdbEngine.c +++ b/src/sdb/src/sdbEngine.c @@ -24,8 +24,9 @@ extern char version[]; const int16_t sdbFileVersion = 0; int sdbExtConns = 0; -SIpList *pSdbIpList = NULL; -SIpList *pSdbPublicIpList = NULL; +SRpcIpSet *pSdbIpList = NULL; +SRpcIpSet *pSdbPublicIpList = NULL; +SSdbPeer * sdbPeer[SDB_MAX_PEERS]; // first slot for self #ifdef CLUSTER int sdbMaster = 0; @@ -57,6 +58,13 @@ int64_t sdbGetVersion() { return sdbVersion; }; +int32_t sdbGetRunStatus() { + if (sdbInited == NULL) { + return SDB_STATUS_OFFLINE; + } + return sdbStatus; +} + void sdbFinishCommit(void *handle) { SSdbTable *pTable = (SSdbTable *)handle; uint32_t sdbEcommit = SDB_ENDCOMMIT;