diff --git a/src/dnode/src/dnodeMClient.c b/src/dnode/src/dnodeMClient.c index 1281ce98ac3299096934f69d0985f045707a2e2e..7be1bd6e30e36ea072edf287e25cf83bb0cf0ade 100644 --- a/src/dnode/src/dnodeMClient.c +++ b/src/dnode/src/dnodeMClient.c @@ -57,7 +57,6 @@ void dnodeCleanupMClient() { } static void dnodeProcessRspFromMnode(SRpcMsg *pMsg) { - if (dnodeProcessMgmtRspFp[pMsg->msgType]) { (*dnodeProcessMgmtRspFp[pMsg->msgType])(pMsg); } else { diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 7131440ec12ed16c8100fe5085acf3f8cb675645..72b6241334999793525f9efb4437324ec1f66e96 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -98,14 +98,13 @@ void dnodeMgmt(SRpcMsg *pMsg) { if (dnodeProcessMgmtMsgFp[pMsg->msgType]) { (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg); } else { - terrno = TSDB_CODE_MSG_NOT_PROCESSED; + SRpcMsg rsp; + rsp.handle = pMsg->handle; + rsp.code = TSDB_CODE_MSG_NOT_PROCESSED; + rsp.pCont = NULL; + rpcSendResponse(&rsp); } - SRpcMsg rsp; - rsp.handle = pMsg->handle; - rsp.code = terrno; - rsp.pCont = NULL; - rpcSendResponse(&rsp); rpcFreeCont(pMsg->pCont); // free the received message } @@ -275,15 +274,15 @@ static void dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions); pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile); - SVnodeObj *pVnodeObj = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pCreate->cfg.vgId); - if (pVnodeObj != NULL) { - rpcRsp.code = TSDB_CODE_SUCCESS; - } else { - rpcRsp.code = dnodeCreateVnode(pCreate); - } +// SVnodeObj *pVnodeObj = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pCreate->cfg.vgId); +// if (pVnodeObj != NULL) { +// rpcRsp.code = TSDB_CODE_SUCCESS; +// } else { +// rpcRsp.code = dnodeCreateVnode(pCreate); +// } + rpcRsp.code = TSDB_CODE_SUCCESS; rpcSendResponse(&rpcRsp); - rpcFreeCont(rpcMsg->pCont); } static void dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) { @@ -301,7 +300,6 @@ static void dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) { } rpcSendResponse(&rpcRsp); - rpcFreeCont(rpcMsg->pCont); } static void dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) { @@ -321,7 +319,6 @@ static void dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) { } rpcSendResponse(&rpcRsp); - rpcFreeCont(rpcMsg->pCont); } static void dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) { diff --git a/src/inc/mnode.h b/src/inc/mnode.h index d206577cdc63f4d2f1d9c4a124c09b02b7ed123a..44cd67334683c9b158cf25913a7c81da218e565f 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -39,12 +39,7 @@ extern "C" { #include "ttimer.h" #include "tutil.h" -// internal globals -extern char version[]; -extern void *tsMgmtTmr; -extern char tsMgmtDirectory[]; - -typedef struct { + typedef struct { uint32_t privateIp; int32_t sid; uint32_t moduleStatus; @@ -87,11 +82,6 @@ typedef struct { int32_t vnode; } SVnodeGid; -typedef struct { - int32_t sid; - int32_t vgId; // vnode group ID -} STableGid; - typedef struct { char tableId[TSDB_TABLE_ID_LEN + 1]; int8_t type; @@ -248,16 +238,32 @@ typedef struct { int16_t offset[TSDB_MAX_COLUMNS]; int16_t bytes[TSDB_MAX_COLUMNS]; void * signature; - uint16_t payloadLen; /* length of payload*/ - char payload[]; /* payload for wildcard match in show tables */ + uint16_t payloadLen; + char payload[]; } SShowObj; -//mgmtSystem +typedef struct { + uint8_t msgType; + int8_t expected; + int8_t received; + int8_t successed; + int32_t contLen; + int32_t code; + void *ahandle; + void *thandle; + void *pCont; + SDbObj *pDb; + SUserObj *pUser; +} SQueuedMsg; + int32_t mgmtInitSystem(); int32_t mgmtStartSystem(); void mgmtCleanUpSystem(); void mgmtStopSystem(); +extern char version[]; +extern void *tsMgmtTmr; +extern char tsMgmtDirectory[]; #ifdef __cplusplus } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index b83b81d358a8fbcba28bb9bad44e5765431a1ef6..225c25ff08c4c9533ed92cf89001b7f88f5c661e 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -238,6 +238,7 @@ typedef struct SSchema { } SSchema; typedef struct { + int32_t vgId; int32_t vnode; //the index of vnode uint32_t ip; } SVnodeDesc; diff --git a/src/mnode/inc/mgmtProfile.h b/src/mnode/inc/mgmtProfile.h index 31904666586cd5a1f7c349f07b75d0e90fc48762..7f9fd9622c7b782e1d796fc36f6340198702934b 100644 --- a/src/mnode/inc/mgmtProfile.h +++ b/src/mnode/inc/mgmtProfile.h @@ -28,22 +28,6 @@ bool mgmtCheckQhandle(uint64_t qhandle); void mgmtSaveQhandle(void *qhandle); void mgmtFreeQhandle(void *qhandle); -enum { - TSDB_PROCESS_CREATE_VGROUP, - TSDB_PROCESS_CREATE_VGROUP_GET_META, - TSDB_PROCESS_CREATE_TABLE, - TSDB_PROCESS_CREATE_TABLE_GET_META, -}; - -typedef struct { - void *thandle; // come from uplayer - void *ahandle; // object to process - void *cont; // additional information of object to process - int32_t type; // the type of sync process - int32_t received; // num of received, such as numOfVnodes - int32_t contLen; // the length of additional information -} SProcessInfo; - #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtShell.h b/src/mnode/inc/mgmtShell.h index ff089dad7ebb110388002d93eaf9252562a4c1cd..a9af568547802fca47a913a46cddf94951a9af72 100644 --- a/src/mnode/inc/mgmtShell.h +++ b/src/mnode/inc/mgmtShell.h @@ -23,13 +23,16 @@ extern "C" { int32_t mgmtInitShell(); void mgmtCleanUpShell(); -void mgmtAddShellMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); +void mgmtAddShellMsgHandle(uint8_t msgType, void (*fp)(SQueuedMsg *queuedMsg)); typedef int32_t (*SShowMetaFp)(STableMeta *pMeta, SShowObj *pShow, void *pConn); typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn); void mgmtAddShellShowMetaHandle(uint8_t showType, SShowMetaFp fp); void mgmtAddShellShowRetrieveHandle(uint8_t showType, SShowRetrieveFp fp); +void mgmtAddToShellQueue(SQueuedMsg *queuedMsg); +void mgmtSendSimpleResp(void *thandle, int32_t code); + #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtTable.h b/src/mnode/inc/mgmtTable.h index 2d94d48882ab5b337041f06af11787bec7b747e8..514b903db8010b741c26a2b3d268f5ac0f5586cc 100644 --- a/src/mnode/inc/mgmtTable.h +++ b/src/mnode/inc/mgmtTable.h @@ -33,7 +33,6 @@ STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid); int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, bool usePublicIp); int32_t mgmtRetrieveMetricMeta(void *pConn, char **pStart, SSuperTableMetaMsg *pInfo); -int32_t mgmtCreateTable(SCMCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta); int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore); int32_t mgmtAlterTable(SDbObj *pDb, SCMAlterTableMsg *pAlter); @@ -44,10 +43,6 @@ void mgmtSetTableDirty(STableInfo *pTable, bool isDirty); SMDDropTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable); SMDDropSTableMsg *mgmtBuildRemoveSuperTableMsg(STableInfo *pTable); -void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle); -void mgmtProcessCreateTable(SVgObj *pVgroup, SCMCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta); -void mgmtProcessCreateVgroup(SCMCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta); - #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtVgroup.h b/src/mnode/inc/mgmtVgroup.h index 975a10dc47ca2303b52ef02c637358370577ea10..7ac97076c7f9113cb45082692213e02a5c76fe27 100644 --- a/src/mnode/inc/mgmtVgroup.h +++ b/src/mnode/inc/mgmtVgroup.h @@ -29,7 +29,7 @@ void mgmtCleanUpVgroups(); SVgObj *mgmtGetVgroup(int32_t vgId); SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode); -SVgObj *mgmtCreateVgroup(SDbObj *pDb); +void mgmtCreateVgroup(SQueuedMsg *pMsg); int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup); void mgmtUpdateVgroup(SVgObj *pVgroup); diff --git a/src/mnode/src/mgmtBalance.c b/src/mnode/src/mgmtBalance.c index 81f410548e13e606cea3375434e1635c962824e3..fae237c4f55d1ab4ed4a2eca0c47759141831207 100644 --- a/src/mnode/src/mgmtBalance.c +++ b/src/mnode/src/mgmtBalance.c @@ -56,10 +56,10 @@ int32_t mgmtAllocVnodes(SVgObj *pVgroup) { } if (selectedVnode == -1) { - mError("vgroup:%d alloc vnode failed, free vnodes:%d", pVgroup->vgId, pDnode->numOfFreeVnodes); + mError("alloc vnode failed, free vnodes:%d", pDnode->numOfFreeVnodes); return -1; } else { - mTrace("vgroup:%d allocate vnode:%d, last allocated vnode:%d", pVgroup->vgId, selectedVnode, lastAllocVode); + mTrace("allocate vnode:%d, last allocated vnode:%d", selectedVnode, lastAllocVode); pVgroup->vnodeGid[0].vnode = selectedVnode; pDnode->lastAllocVnode = selectedVnode + 1; if (pDnode->lastAllocVnode >= pDnode->numOfVnodes) pDnode->lastAllocVnode = 0; diff --git a/src/mnode/src/mgmtDClient.c b/src/mnode/src/mgmtDClient.c index 4670663222dde7b01521976e6ba7912f2247c701..59b2402b8cf045c8e72078d9dd6c3bdd358c6576 100644 --- a/src/mnode/src/mgmtDClient.c +++ b/src/mnode/src/mgmtDClient.c @@ -84,39 +84,6 @@ static void mgmtProcessRspFromDnode(SRpcMsg *rpcMsg) { rpcFreeCont(rpcMsg->pCont); } - -//static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) { -// mTrace("create table rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); -// if (rpcMsg->handle == NULL) return; -// -// SProcessInfo *info = rpcMsg->handle; -// assert(info->type == TSDB_PROCESS_CREATE_TABLE || info->type == TSDB_PROCESS_CREATE_TABLE_GET_META); -// -// STableInfo *pTable = info->ahandle; -// if (rpcMsg->code != TSDB_CODE_SUCCESS) { -// mError("table:%s, failed to create in dnode, code:%d, set it dirty", pTable->tableId, rpcMsg->code); -// mgmtSetTableDirty(pTable, true); -// } else { -// mTrace("table:%s, created in dnode", pTable->tableId); -// mgmtSetTableDirty(pTable, false); -// } -// -// if (rpcMsg->code != TSDB_CODE_SUCCESS) { -// SRpcMsg rpcRsp = {.handle = info->thandle, .pCont = NULL, .contLen = 0, .code = rpcMsg->code, .msgType = 0}; -// rpcSendResponse(&rpcMsg); -// } else { -// if (info->type == TSDB_PROCESS_CREATE_TABLE_GET_META) { -// mTrace("table:%s, start to process get meta", pTable->tableId); -// mgmtProcessGetTableMeta(pTable, rpcMsg->handle); -// } else { -// SRpcMsg rpcRsp = {.handle = info->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; -// rpcSendResponse(&rpcMsg); -// } -// } -// -// free(info); -//} -// //static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) { // mTrace("drop table rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); //} @@ -125,27 +92,6 @@ static void mgmtProcessRspFromDnode(SRpcMsg *rpcMsg) { // mTrace("alter table rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); //} // -//static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { -// mTrace("create vnode rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); -// if (rpcMsg->handle == NULL) return; -// -// SProcessInfo *info = rpcMsg->handle; -// assert(info->type == TSDB_PROCESS_CREATE_VGROUP || info->type == TSDB_PROCESS_CREATE_VGROUP_GET_META); -// -// info->received++; -// SVgObj *pVgroup = info->ahandle; -// -// bool isGetMeta = false; -// if (info->type == TSDB_PROCESS_CREATE_VGROUP_GET_META) { -// isGetMeta = true; -// } -// -// mTrace("vgroup:%d, received:%d numOfVnodes:%d", pVgroup->vgId, info->received, pVgroup->numOfVnodes); -// if (info->received == pVgroup->numOfVnodes) { -// mgmtProcessCreateTable(pVgroup, info->cont, info->contLen, info->thandle, isGetMeta); -// free(info); -// } -//} // //static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) { // mTrace("drop vnode rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); diff --git a/src/mnode/src/mgmtDServer.c b/src/mnode/src/mgmtDServer.c index 469c903d833a265bd4307dceae420f89b9859aab..faf6c480c3437eeed7f830ec09ea1286c63c78d5 100644 --- a/src/mnode/src/mgmtDServer.c +++ b/src/mnode/src/mgmtDServer.c @@ -210,22 +210,6 @@ static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *s // } //} // -//void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) { -// mTrace("vgroup:%d, send create all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle); -// for (int i = 0; i < pVgroup->numOfVnodes; ++i) { -// SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip); -// mgmtSendCreateVnodeMsg(pVgroup, pVgroup->vnodeGid[i].vnode, &ipSet, ahandle); -// } -//} -// -//void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle) { -// mTrace("vgroup:%d, send create vnode:%d msg, ahandle:%p", pVgroup->vgId, vnode, ahandle); -// SMDCreateVnodeMsg *pVpeer = mgmtBuildCreateVnodeMsg(pVgroup, vnode); -// if (pVpeer != NULL) { -// mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_MD_CREATE_VNODE, pVpeer, sizeof(SMDCreateVnodeMsg), ahandle); -// } -//} -// //void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) { // if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) { // mError("invalid msg type:%d", msgType); diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index 5670d2a384a8e2376eeb824c5851dba96593ed77..34ad141a841bf08a18d6462cfd71f2c514038e72 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -41,9 +41,9 @@ static int32_t mgmtDropDb(SDbObj *pDb); static int32_t mgmtGetDbMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn); -static void mgmtProcessCreateDbMsg(SRpcMsg *rpcMsg); -static void mgmtProcessAlterDbMsg(SRpcMsg *rpcMsg); -static void mgmtProcessDropDbMsg(SRpcMsg *rpcMsg); +static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg); +static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg); +static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg); static void *(*mgmtDbActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize); static void *mgmtDbActionInsert(void *row, char *str, int32_t size, int32_t *ssize); @@ -383,6 +383,7 @@ static void mgmtDropDbFromSdb(SDbObj *pDb) { } static int32_t mgmtDropDb(SDbObj *pDb) { + if (pDb->dropStatus == TSDB_DB_STATUS_DROPPING) { bool finished = mgmtCheckDropDbFinished(pDb); if (!finished) { @@ -405,6 +406,7 @@ static int32_t mgmtDropDb(SDbObj *pDb) { } } +UNUSED_FUNC static int32_t mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists) { SDbObj *pDb = (SDbObj *)sdbGetRow(tsDbSdb, name); if (pDb == NULL) { @@ -904,19 +906,10 @@ void mgmtRemoveTableFromDb(SDbObj *pDb) { atomic_add_fetch_32(&pDb->numOfTables, -1); } -static void mgmtProcessCreateDbMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - if (mgmtCheckRedirect(rpcMsg->handle)) return; - - SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); - if (pUser == NULL) { - rpcRsp.code = TSDB_CODE_INVALID_USER; - rpcSendResponse(&rpcRsp); - return; - } - - SCMCreateDbMsg *pCreate = (SCMCreateDbMsg *) rpcMsg->pCont; +static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg) { + if (mgmtCheckRedirect(pMsg->thandle)) return; + SCMCreateDbMsg *pCreate = pMsg->pCont; pCreate->maxSessions = htonl(pCreate->maxSessions); pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize); pCreate->daysPerFile = htonl(pCreate->daysPerFile); @@ -928,69 +921,58 @@ static void mgmtProcessCreateDbMsg(SRpcMsg *rpcMsg) { pCreate->rowsInFileBlock = htonl(pCreate->rowsInFileBlock); // pCreate->cacheNumOfBlocks = htonl(pCreate->cacheNumOfBlocks); + int32_t code; if (mgmtCheckExpired()) { - rpcRsp.code = TSDB_CODE_GRANT_EXPIRED; - } else if (!pUser->writeAuth) { - rpcRsp.code = TSDB_CODE_NO_RIGHTS; + code = TSDB_CODE_GRANT_EXPIRED; + } else if (!pMsg->pUser->writeAuth) { + code = TSDB_CODE_NO_RIGHTS; } else { - rpcRsp.code = mgmtCreateDb(pUser->pAcct, pCreate); - if (rpcRsp.code == TSDB_CODE_SUCCESS) { - mLPrint("DB:%s is created by %s", pCreate->db, pUser->user); + code = mgmtCreateDb(pMsg->pUser->pAcct, pCreate); + if (code == TSDB_CODE_SUCCESS) { + mLPrint("DB:%s is created by %s", pCreate->db, pMsg->pUser->user); } } - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, code); } -static void mgmtProcessAlterDbMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - if (mgmtCheckRedirect(rpcMsg->handle)) return; - - SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); - if (pUser == NULL) { - rpcRsp.code = TSDB_CODE_INVALID_USER; - rpcSendResponse(&rpcRsp); - return; - } +static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) { + if (mgmtCheckRedirect(pMsg->thandle)) return; - SCMAlterDbMsg *pAlter = (SCMAlterDbMsg *) rpcMsg->pCont; + SCMAlterDbMsg *pAlter = pMsg->pCont; pAlter->daysPerFile = htonl(pAlter->daysPerFile); pAlter->daysToKeep = htonl(pAlter->daysToKeep); pAlter->maxSessions = htonl(pAlter->maxSessions) + 1; - if (!pUser->writeAuth) { - rpcRsp.code = TSDB_CODE_NO_RIGHTS; + int32_t code; + if (!pMsg->pUser->writeAuth) { + code = TSDB_CODE_NO_RIGHTS; } else { - rpcRsp.code = mgmtAlterDb(pUser->pAcct, pAlter); - if (rpcRsp.code == TSDB_CODE_SUCCESS) { - mLPrint("DB:%s is altered by %s", pAlter->db, pUser->user); + code = mgmtAlterDb(pMsg->pUser->pAcct, pAlter); + if (code == TSDB_CODE_SUCCESS) { + mLPrint("DB:%s is altered by %s", pAlter->db, pMsg->pUser->user); } } - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, code); } +static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) { + if (mgmtCheckRedirect(pMsg->thandle)) return; -static void mgmtProcessDropDbMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - if (mgmtCheckRedirect(rpcMsg->handle)) return; - - SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); - if (pUser == NULL) { - rpcRsp.code = TSDB_CODE_INVALID_USER; - rpcSendResponse(&rpcRsp); - return ; - } - - if (pUser->superAuth) { - SCMDropDbMsg *pDrop = rpcMsg->pCont; - rpcRsp.code = mgmtDropDbByName(pUser->pAcct, pDrop->db, pDrop->ignoreNotExists); - if (rpcRsp.code == TSDB_CODE_SUCCESS) { - mLPrint("DB:%s is dropped by %s", pDrop->db, pUser->user); - } + int32_t code; + if (pMsg->pUser->superAuth) { + code = TSDB_CODE_OPS_NOT_SUPPORT; + //SCMDropDbMsg *pDrop = rpcMsg->pCont; + //rpcRsp.code = mgmtDropDbByName(pUser->pAcct, pDrop->db, pDrop->ignoreNotExists); + //if (rpcRsp.code == TSDB_CODE_SUCCESS) { + // mLPrint("DB:%s is dropped by %s", pDrop->db, pUser->user); + //} } else { - rpcRsp.code = TSDB_CODE_NO_RIGHTS; + code = TSDB_CODE_NO_RIGHTS; } - rpcSendResponse(&rpcRsp); + if (code != TSDB_CODE_SUCCESS) { + mgmtSendSimpleResp(pMsg->thandle, code); + } } diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index 576b7ebf6ef6088b79ecd7db99397faa4519cce1..c48b4477e5021dcead416b36d6c86b3823de10b6 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -43,7 +43,7 @@ static int32_t mgmtGetConfigMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn static int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mgmtGetVnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); -static void mgmtProcessCfgDnodeMsg(SRpcMsg *rpcMsg); +static void mgmtProcessCfgDnodeMsg(SQueuedMsg *pMsg); void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode) { int32_t maxVnodes = pDnode->numOfCores * tsNumOfVnodesPerCore; @@ -93,7 +93,7 @@ void mgmtSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes, int32_t vgId) { memset(pVload, 0, sizeof(SVnodeLoad)); pVload->vnode = vnodeGid[i].vnode; pVload->vgId = vgId; - mTrace("dnode:%s, vnode:%d add to vgroup:%d", taosIpStr(vnodeGid[i].ip), vnodeGid[i].vnode, pVload->vgId); + mTrace("dnode:%s, vnode:%d add to vgroup:%d", taosIpStr(pDnode->privateIp), vnodeGid[i].vnode, pVload->vgId); mgmtCalcNumOfFreeVnodes(pDnode); } else { mError("dnode:%s, not in dnode DB!!!", taosIpStr(vnodeGid[i].ip)); @@ -527,21 +527,14 @@ bool mgmtCheckDnodeInOfflineState(SDnodeObj *pDnode) { return pDnode->status == TSDB_DN_STATUS_OFFLINE; } -void mgmtProcessCfgDnodeMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - if (mgmtCheckRedirect(rpcMsg->handle)) return; +void mgmtProcessCfgDnodeMsg(SQueuedMsg *pMsg) { + SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + if (mgmtCheckRedirect(pMsg->thandle)) return; - SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); - if (pUser == NULL) { - rpcRsp.code = TSDB_CODE_INVALID_USER; - rpcSendResponse(&rpcRsp); - return; - } - - SCMCfgDnodeMsg *pCmCfgDnode = (SCMCfgDnodeMsg *) rpcMsg->pCont; + SCMCfgDnodeMsg *pCmCfgDnode = pMsg->pCont; uint32_t dnodeIp = inet_addr(pCmCfgDnode->ip); - if (strcmp(pUser->pAcct->user, "root") != 0) { + if (strcmp(pMsg->pUser->pAcct->user, "root") != 0) { rpcRsp.code = TSDB_CODE_NO_RIGHTS; } else { SRpcIpSet ipSet = mgmtGetIpSetFromIp(dnodeIp); @@ -560,7 +553,7 @@ void mgmtProcessCfgDnodeMsg(SRpcMsg *rpcMsg) { } if (rpcRsp.code == TSDB_CODE_SUCCESS) { - mTrace("dnode:%s is configured by %s", pCmCfgDnode->ip, pUser->user); + mTrace("dnode:%s is configured by %s", pCmCfgDnode->ip, pMsg->pUser->user); } rpcSendResponse(&rpcRsp); diff --git a/src/mnode/src/mgmtProfile.c b/src/mnode/src/mgmtProfile.c index e16a7f166c7b1c25c34c4b662caedaf57acc7e0e..bb192ce8a56fd353da3dd9ce92046b3eab2154c2 100644 --- a/src/mnode/src/mgmtProfile.c +++ b/src/mnode/src/mgmtProfile.c @@ -558,9 +558,11 @@ bool mgmtCheckQhandle(uint64_t qhandle) { } void mgmtSaveQhandle(void *qhandle) { + mTrace("qhandle:%p is allocated", qhandle); } void mgmtFreeQhandle(void *qhandle) { + mTrace("qhandle:%p is freed", qhandle); } int mgmtGetConns(SShowObj *pShow, void *pConn) { @@ -673,72 +675,72 @@ int32_t mgmtRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn return numOfRows; } -void mgmtProcessKillQueryMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - if (mgmtCheckRedirect(rpcMsg->handle)) return; +void mgmtProcessKillQueryMsg(SQueuedMsg *pMsg) { + SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + if (mgmtCheckRedirect(pMsg->thandle)) return; - SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); + SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle); if (pUser == NULL) { rpcRsp.code = TSDB_CODE_INVALID_USER; rpcSendResponse(&rpcRsp); return; } - SCMKillQueryMsg *pKill = (SCMKillQueryMsg *) rpcMsg->pCont; + SCMKillQueryMsg *pKill = pMsg->pCont; int32_t code; if (!pUser->writeAuth) { code = TSDB_CODE_NO_RIGHTS; } else { - code = mgmtKillQuery(pKill->queryId, rpcMsg->handle); + code = mgmtKillQuery(pKill->queryId, pMsg->thandle); } rpcRsp.code = code; rpcSendResponse(&rpcRsp); } -void mgmtProcessKillStreamMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - if (mgmtCheckRedirect(rpcMsg->handle)) return; +void mgmtProcessKillStreamMsg(SQueuedMsg *pMsg) { + SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + if (mgmtCheckRedirect(pMsg->thandle)) return; - SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); + SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle); if (pUser == NULL) { rpcRsp.code = TSDB_CODE_INVALID_USER; rpcSendResponse(&rpcRsp); return; } - SCMKillStreamMsg *pKill = (SCMKillStreamMsg *) rpcMsg->pCont; + SCMKillStreamMsg *pKill = pMsg->pCont; int32_t code; if (!pUser->writeAuth) { code = TSDB_CODE_NO_RIGHTS; } else { - code = mgmtKillStream(pKill->queryId, rpcMsg->handle); + code = mgmtKillStream(pKill->queryId, pMsg->thandle); } rpcRsp.code = code; rpcSendResponse(&rpcRsp); } -void mgmtProcessKillConnectionMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - if (mgmtCheckRedirect(rpcMsg->handle)) return; +void mgmtProcessKillConnectionMsg(SQueuedMsg *pMsg) { + SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + if (mgmtCheckRedirect(pMsg->thandle)) return; - SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); + SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle); if (pUser == NULL) { rpcRsp.code = TSDB_CODE_INVALID_USER; rpcSendResponse(&rpcRsp); return; } - SCMKillConnMsg *pKill = (SCMKillConnMsg *) rpcMsg->pCont; + SCMKillConnMsg *pKill = pMsg->pCont; int32_t code; if (!pUser->writeAuth) { code = TSDB_CODE_NO_RIGHTS; } else { - code = mgmtKillConnection(pKill->queryId, rpcMsg->handle); + code = mgmtKillConnection(pKill->queryId, pMsg->thandle); } rpcRsp.code = code; diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 094848c8c081ba60495d28493ca6b05139d99a38..058173b13f889ebe159f46d5f9e6526077491863 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -41,24 +41,27 @@ typedef int32_t (*SShowMetaFp)(STableMeta *pMeta, SShowObj *pShow, void *pConn); typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn); -static void mgmtProcessMsgFromShell(SRpcMsg *pMsg); -static void mgmtProcessShowMsg(SRpcMsg *rpcMsg); -static void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg); -static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg); static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey); static bool mgmtCheckMsgReadOnly(int8_t type, void *pCont); -static void mgmtProcessHeartBeatMsg(SRpcMsg *rpcMsg); -static void mgmtProcessConnectMsg(SRpcMsg *rpcMsg); +static void mgmtProcessMsgFromShell(SRpcMsg *pMsg); +static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg); +static void mgmtProcessMsgWhileNotReady(SRpcMsg *rpcMsg); +static void mgmtProcessShowMsg(SQueuedMsg *queuedMsg); +static void mgmtProcessRetrieveMsg(SQueuedMsg *queuedMsg); +static void mgmtProcessHeartBeatMsg(SQueuedMsg *queuedMsg); +static void mgmtProcessConnectMsg(SQueuedMsg *queuedMsg); static void *tsMgmtShellRpc = NULL; static void *tsMgmtTranQhandle = NULL; -static void (*tsMgmtProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *) = {0}; +static void (*tsMgmtProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SQueuedMsg *) = {0}; static SShowMetaFp tsMgmtShowMetaFp[TSDB_MGMT_TABLE_MAX] = {0}; static SShowRetrieveFp tsMgmtShowRetrieveFp[TSDB_MGMT_TABLE_MAX] = {0}; int32_t mgmtInitShell() { mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_SHOW, mgmtProcessShowMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_RETRIEVE, mgmtProcessRetrieveMsg); + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_HEARTBEAT, mgmtProcessHeartBeatMsg); + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mgmtProcessConnectMsg); tsMgmtTranQhandle = taosInitScheduler(tsMaxDnodes + tsMaxShellConns, 1, "mnodeT"); @@ -84,9 +87,6 @@ int32_t mgmtInitShell() { return -1; } - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_HEARTBEAT, mgmtProcessHeartBeatMsg); - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mgmtProcessConnectMsg); - mPrint("server connection to shell is opened"); return 0; } @@ -104,7 +104,7 @@ void mgmtCleanUpShell() { } } -void mgmtAddShellMsgHandle(uint8_t showType, void (*fp)(SRpcMsg *rpcMsg)) { +void mgmtAddShellMsgHandle(uint8_t showType, void (*fp)(SQueuedMsg *queuedMsg)) { tsMgmtProcessShellMsgFp[showType] = fp; } @@ -117,107 +117,118 @@ void mgmtAddShellShowRetrieveHandle(uint8_t msgType, SShowRetrieveFp fp) { } void mgmtProcessTranRequest(SSchedMsg *sched) { - SRpcMsg *rpcMsg = sched->msg; - (*tsMgmtProcessShellMsgFp[rpcMsg->msgType])(rpcMsg); - rpcFreeCont(rpcMsg->pCont); - free(rpcMsg); + SQueuedMsg *queuedMsg = sched->msg; + (*tsMgmtProcessShellMsgFp[queuedMsg->msgType])(queuedMsg); + rpcFreeCont(queuedMsg->pCont); + free(queuedMsg); } -void mgmtAddToTranRequest(SRpcMsg *rpcMsg) { - SRpcMsg *queuedRpcMsg = malloc(sizeof(SRpcMsg)); - memcpy(queuedRpcMsg, rpcMsg, sizeof(SRpcMsg)); - +void mgmtAddToShellQueue(SQueuedMsg *queuedMsg) { SSchedMsg schedMsg; - schedMsg.msg = queuedRpcMsg; + schedMsg.msg = queuedMsg; schedMsg.fp = mgmtProcessTranRequest; taosScheduleTask(tsMgmtTranQhandle, &schedMsg); } static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { if (sdbGetRunStatus() != SDB_STATUS_SERVING) { - mTrace("shell msg is ignored since SDB is not ready"); - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = TSDB_CODE_NOT_READY, .msgType = 0}; - rpcSendResponse(&rpcRsp); + mgmtProcessMsgWhileNotReady(rpcMsg); rpcFreeCont(rpcMsg->pCont); return; } - mTrace("%s is received", taosMsg[rpcMsg->msgType]); - if (tsMgmtProcessShellMsgFp[rpcMsg->msgType]) { - if (mgmtCheckMsgReadOnly(rpcMsg->msgType, rpcMsg->pCont)) { - (*tsMgmtProcessShellMsgFp[rpcMsg->msgType])(rpcMsg); - rpcFreeCont(rpcMsg->pCont); - } else { - mgmtAddToTranRequest(rpcMsg); - } - } else { - mError("%s is not processed", taosMsg[rpcMsg->msgType]); + if (tsMgmtProcessShellMsgFp[rpcMsg->msgType] == NULL) { mgmtProcessUnSupportMsg(rpcMsg); rpcFreeCont(rpcMsg->pCont); + return; } -} -static void mgmtProcessShowMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); + if (pUser == NULL) { + mError("thandle:%p, failed to retrieve user info", rpcMsg->handle); + mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_USER); + rpcFreeCont(rpcMsg->pCont); + return; + } + + if (mgmtCheckMsgReadOnly(rpcMsg->msgType, rpcMsg->pCont)) { + SQueuedMsg queuedMsg = {0}; + queuedMsg.thandle = rpcMsg->handle; + queuedMsg.msgType = rpcMsg->msgType; + queuedMsg.contLen = rpcMsg->contLen; + queuedMsg.pCont = rpcMsg->pCont; + queuedMsg.pUser = pUser; + (*tsMgmtProcessShellMsgFp[rpcMsg->msgType])(&queuedMsg); + rpcFreeCont(rpcMsg->pCont); + } else { + SQueuedMsg *queuedMsg = calloc(1, sizeof(SQueuedMsg)); + queuedMsg->thandle = rpcMsg->handle; + queuedMsg->msgType = rpcMsg->msgType; + queuedMsg->contLen = rpcMsg->contLen; + queuedMsg->pCont = rpcMsg->pCont; + queuedMsg->pUser = pUser; + mgmtAddToShellQueue(queuedMsg); + } +} - SCMShowMsg *pShowMsg = rpcMsg->pCont; +static void mgmtProcessShowMsg(SQueuedMsg *pMsg) { + SCMShowMsg *pShowMsg = pMsg->pCont; if (pShowMsg->type == TSDB_MGMT_TABLE_DNODE || TSDB_MGMT_TABLE_GRANTS || TSDB_MGMT_TABLE_SCORES) { - if (mgmtCheckRedirect(rpcMsg->handle) != TSDB_CODE_SUCCESS) { + if (mgmtCheckRedirect(pMsg->thandle)) { return; } } - int32_t size = sizeof(SCMShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE; + if (pShowMsg->type >= TSDB_MGMT_TABLE_MAX) { + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_MSG_TYPE); + return; + } + + if (!tsMgmtShowMetaFp[pShowMsg->type]) { + mError("show type:%d %s is not support", pShowMsg->type, taosMsg[pShowMsg->type]); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_OPS_NOT_SUPPORT); + return; + } + + int32_t size = sizeof(SCMShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE; SCMShowRsp *pShowRsp = rpcMallocCont(size); if (pShowRsp == NULL) { - rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY); return; } - int32_t code; - if (pShowMsg->type >= TSDB_MGMT_TABLE_MAX) { - code = TSDB_CODE_INVALID_MSG_TYPE; + SShowObj *pShow = (SShowObj *) calloc(1, sizeof(SShowObj) + htons(pShowMsg->payloadLen)); + pShow->signature = pShow; + pShow->type = pShowMsg->type; + pShow->payloadLen = htons(pShowMsg->payloadLen); + strcpy(pShow->db, pShowMsg->db); + memcpy(pShow->payload, pShowMsg->payload, pShow->payloadLen); + + mgmtSaveQhandle(pShow); + pShowRsp->qhandle = htobe64((uint64_t) pShow); + + int32_t code = (*tsMgmtShowMetaFp[pShowMsg->type])(&pShowRsp->tableMeta, pShow, pMsg->thandle); + if (code == 0) { + SRpcMsg rpcRsp = { + .handle = pMsg->thandle, + .pCont = pShowRsp, + .contLen = sizeof(SCMShowRsp) + sizeof(SSchema) * pShow->numOfColumns, + .code = code, + .msgType = 0 + }; + rpcSendResponse(&rpcRsp); } else { - SShowObj *pShow = (SShowObj *) calloc(1, sizeof(SShowObj) + htons(pShowMsg->payloadLen)); - pShow->signature = pShow; - pShow->type = pShowMsg->type; - strcpy(pShow->db, pShowMsg->db); - mTrace("pShow:%p is allocated", pShow); - - // set the table name query condition - pShow->payloadLen = htons(pShowMsg->payloadLen); - memcpy(pShow->payload, pShowMsg->payload, pShow->payloadLen); - - mgmtSaveQhandle(pShow); - pShowRsp->qhandle = htobe64((uint64_t) pShow); - if (tsMgmtShowMetaFp[pShowMsg->type]) { - code = (*tsMgmtShowMetaFp[pShowMsg->type])(&pShowRsp->tableMeta, pShow, rpcMsg->handle); - if (code == 0) { - size = sizeof(SCMShowRsp) + sizeof(SSchema) * pShow->numOfColumns; - } else { - mError("pShow:%p, type:%d %s, failed to get Meta, code:%d", pShow, pShowMsg->type, - taosMsg[(uint8_t) pShowMsg->type], code); - free(pShow); - } - } else { - code = TSDB_CODE_OPS_NOT_SUPPORT; - } + mError("pShow:%p, type:%d %s, failed to get Meta, code:%d", pShow, pShowMsg->type, taosMsg[pShowMsg->type], code); + mgmtFreeQhandle(pShow); + rpcFreeCont(pShowRsp); } - - rpcRsp.code = code; - rpcRsp.pCont = pShowRsp; - rpcRsp.contLen = size; - rpcSendResponse(&rpcRsp); } -static void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - +static void mgmtProcessRetrieveMsg(SQueuedMsg *pMsg) { int32_t rowsToRead = 0; int32_t size = 0; int32_t rowsRead = 0; - SRetrieveTableMsg *pRetrieve = (SRetrieveTableMsg *) rpcMsg->pCont; + SRetrieveTableMsg *pRetrieve = pMsg->pCont; pRetrieve->qhandle = htobe64(pRetrieve->qhandle); /* @@ -226,16 +237,14 @@ static void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg) { */ if (!mgmtCheckQhandle(pRetrieve->qhandle)) { mError("retrieve:%p, qhandle:%p is invalid", pRetrieve, pRetrieve->qhandle); - rpcRsp.code = TSDB_CODE_INVALID_QHANDLE; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_QHANDLE); return; } SShowObj *pShow = (SShowObj *)pRetrieve->qhandle; - if (pShow->signature != (void *)pShow) { - mError("pShow:%p, signature:%p, query memory is corrupted", pShow, pShow->signature); - rpcRsp.code = TSDB_CODE_MEMORY_CORRUPTED; - rpcSendResponse(&rpcRsp); + if (!mgmtCheckQhandle(pShow)) { + mError("pShow:%p, query memory is corrupted", pShow); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MEMORY_CORRUPTED); return; } else { if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) { @@ -258,10 +267,9 @@ static void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg) { // if free flag is set, client wants to clean the resources if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) - rowsRead = (*tsMgmtShowRetrieveFp[pShow->type])(pShow, pRsp->data, rowsToRead, rpcMsg->handle); + rowsRead = (*tsMgmtShowRetrieveFp[pShow->type])(pShow, pRsp->data, rowsToRead, pMsg->thandle); - if (rowsRead < 0) { - rowsRead = 0; // TSDB_CODE_ACTION_IN_PROGRESS; + if (rowsRead < 0) { // TSDB_CODE_ACTION_IN_PROGRESS; rpcFreeCont(pRsp); return; } @@ -269,8 +277,13 @@ static void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg) { pRsp->numOfRows = htonl(rowsRead); pRsp->precision = htonl(TSDB_TIME_PRECISION_MILLI); // millisecond time precision - rpcRsp.pCont = pRsp; - rpcRsp.contLen = size; + SRpcMsg rpcRsp = { + .handle = pMsg->thandle, + .pCont = pRsp, + .contLen = size, + .code = 0, + .msgType = 0 + }; rpcSendResponse(&rpcRsp); if (rowsToRead == 0) { @@ -278,21 +291,19 @@ static void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg) { } } -static void mgmtProcessHeartBeatMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; +static void mgmtProcessHeartBeatMsg(SQueuedMsg *pMsg) { //SCMHeartBeatMsg *pHBMsg = (SCMHeartBeatMsg *) rpcMsg->pCont; //mgmtSaveQueryStreamList(pHBMsg); SCMHeartBeatRsp *pHBRsp = (SCMHeartBeatRsp *) rpcMallocCont(sizeof(SCMHeartBeatRsp)); if (pHBRsp == NULL) { - rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY); return; } SRpcConnInfo connInfo; - if (rpcGetConnInfo(rpcMsg->handle, &connInfo) != 0) { - mError("conn:%p is already released while process heart beat msg", rpcMsg->handle); + if (rpcGetConnInfo(pMsg->thandle, &connInfo) != 0) { + mError("conn:%p is already released while process heart beat msg", pMsg->thandle); return; } @@ -320,8 +331,13 @@ static void mgmtProcessHeartBeatMsg(SRpcMsg *rpcMsg) { pHBRsp->streamId = 0; pHBRsp->killConnection = 0; - rpcRsp.pCont = pHBRsp; - rpcRsp.contLen = sizeof(SCMHeartBeatRsp); + SRpcMsg rpcRsp = { + .handle = pMsg->thandle, + .pCont = pHBRsp, + .contLen = sizeof(SCMHeartBeatRsp), + .code = 0, + .msgType = 0 + }; rpcSendResponse(&rpcRsp); } @@ -340,13 +356,13 @@ static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secr } } -static void mgmtProcessConnectMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - SCMConnectMsg *pConnectMsg = (SCMConnectMsg *) rpcMsg->pCont; +static void mgmtProcessConnectMsg(SQueuedMsg *pMsg) { + SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + SCMConnectMsg *pConnectMsg = pMsg->pCont; SRpcConnInfo connInfo; - if (rpcGetConnInfo(rpcMsg->handle, &connInfo) != 0) { - mError("conn:%p is already released while process connect msg", rpcMsg->handle); + if (rpcGetConnInfo(pMsg->thandle, &connInfo) != 0) { + mError("thandle:%p is already released while process connect msg", pMsg->thandle); return; } @@ -450,6 +466,7 @@ static bool mgmtCheckMsgReadOnly(int8_t type, void *pCont) { } static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg) { + mError("%s is not processed", taosMsg[rpcMsg->msgType]); SRpcMsg rpcRsp = { .msgType = 0, .pCont = 0, @@ -459,3 +476,26 @@ static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg) { }; rpcSendResponse(&rpcRsp); } + +static void mgmtProcessMsgWhileNotReady(SRpcMsg *rpcMsg) { + mTrace("%s is ignored since SDB is not ready", taosMsg[rpcMsg->msgType]); + SRpcMsg rpcRsp = { + .msgType = 0, + .pCont = 0, + .contLen = 0, + .code = TSDB_CODE_NOT_READY, + .handle = rpcMsg->handle + }; + rpcSendResponse(&rpcRsp); +} + +void mgmtSendSimpleResp(void *thandle, int32_t code) { + SRpcMsg rpcRsp = { + .msgType = 0, + .pCont = 0, + .contLen = 0, + .code = code, + .handle = thandle + }; + rpcSendResponse(&rpcRsp); +} diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 188083d516904165fde44c6a8a600f90200ca8bd..b1cdbef0b3fe02c6ba9a9ffeba2a3e955af3d8d6 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -44,14 +44,16 @@ extern void *tsNormalTableSdb; extern void *tsChildTableSdb; -static void mgmtProcessCreateTableMsg(SRpcMsg *rpcMsg); -static void mgmtProcessDropTableMsg(SRpcMsg *rpcMsg); -static void mgmtProcessAlterTableMsg(SRpcMsg *rpcMsg); -static void mgmtProcessTableMetaMsg(SRpcMsg *rpcMsg); -static void mgmtProcessMultiTableMetaMsg(SRpcMsg *rpcMsg); -static void mgmtProcessSuperTableMetaMsg(SRpcMsg *rpcMsg); +static void mgmtProcessCreateTableMsg(SQueuedMsg *queueMsg); +static void mgmtProcessDropTableMsg(SQueuedMsg *queueMsg); +static void mgmtProcessAlterTableMsg(SQueuedMsg *queueMsg); +static void mgmtProcessTableMetaMsg(SQueuedMsg *queueMsg); +static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *queueMsg); +static void mgmtProcessSuperTableMetaMsg(SQueuedMsg *queueMsg); +static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg); static int32_t mgmtGetShowTableMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); +static void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle); int32_t mgmtInitTables() { int32_t code = mgmtInitSuperTables(); @@ -79,6 +81,7 @@ int32_t mgmtInitTables() { mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_STABLE_META, mgmtProcessSuperTableMetaMsg); mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_TABLE, mgmtGetShowTableMeta); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mgmtRetrieveShowTables); + mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP, mgmtProcessCreateTableRsp); return TSDB_CODE_SUCCESS; } @@ -131,170 +134,56 @@ int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, boo return TSDB_CODE_SUCCESS; } +static void mgmtCreateTable(SVgObj *pVgroup, SQueuedMsg *pMsg) { + SCMCreateTableMsg *pCreate = pMsg->pCont; + pCreate->numOfColumns = htons(pCreate->numOfColumns); + pCreate->numOfTags = htons(pCreate->numOfTags); + pCreate->sqlLen = htons(pCreate->sqlLen); -void mgmtProcessCreateVgroup(SCMCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) { - SRpcMsg rpcRsp = {.handle = thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - SDbObj *pDb = mgmtGetDb(pCreate->db); - if (pDb == NULL) { - mError("table:%s, failed to create vgroup, db not found", pCreate->tableId); - rpcRsp.code = TSDB_CODE_INVALID_DB; - rpcSendResponse(&rpcRsp); - return; - } - - SVgObj *pVgroup = mgmtCreateVgroup(pDb); - if (pVgroup == NULL) { - mError("table:%s, failed to alloc vnode to vgroup", pCreate->tableId); - rpcRsp.code = TSDB_CODE_NO_ENOUGH_DNODES; - rpcSendResponse(&rpcRsp); - return; - } - - void *cont = rpcMallocCont(contLen); - if (cont == NULL) { - mError("table:%s, failed to create table, can not alloc memory", pCreate->tableId); - rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY; - rpcSendResponse(&rpcRsp); - return; - } - - memcpy(cont, pCreate, contLen); - - SProcessInfo *info = calloc(1, sizeof(SProcessInfo)); - info->type = TSDB_PROCESS_CREATE_VGROUP; - info->thandle = thandle; - info->ahandle = pVgroup; - info->cont = cont; - info->contLen = contLen; - - if (isGetMeta) { - info->type = TSDB_PROCESS_CREATE_VGROUP_GET_META; + SSchema *pSchema = (SSchema*) pCreate->schema; + for (int32_t i = 0; i < pCreate->numOfColumns + pCreate->numOfTags; ++i) { + pSchema->bytes = htons(pSchema->bytes); + pSchema->colId = i; + pSchema++; } - mgmtSendCreateVgroupMsg(pVgroup, info); -} - -//void mgmtSendCreateTableMsg(SMDCreateTableMsg *pCreate, SRpcIpSet *ipSet, void *ahandle) { -// mTrace("table:%s, send create table msg, ahandle:%p", pCreate->tableId, ahandle); -// SRpcMsg rpcMsg = { -// .handle = ahandle, -// .pCont = pCreate, -// .contLen = htonl(pCreate->contLen), -// .code = 0, -// .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE -// }; -// rpcSendRequest(tsMgmtDClientRpc, ipSet, &rpcMsg); -//} -// - - -void mgmtProcessCreateTable(SVgObj *pVgroup, SCMCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) { - assert(pVgroup != NULL); - SRpcMsg rpcRsp = {.handle = thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - int32_t sid = taosAllocateId(pVgroup->idPool); if (sid < 0) { - mTrace("table:%s, no enough sid in vgroup:%d, start to create a new vgroup", pCreate->tableId, pVgroup->vgId); - mgmtProcessCreateVgroup(pCreate, contLen, thandle, isGetMeta); + mTrace("thandle:%p, no enough sid in vgroup:%d, start to create a new one", pMsg->thandle, pVgroup->vgId); + mgmtCreateVgroup(pMsg); return; } + int32_t code; STableInfo *pTable; - SMDCreateTableMsg *pDCreate = NULL; + SMDCreateTableMsg *pMDCreate = NULL; if (pCreate->numOfColumns == 0) { - mTrace("table:%s, start to create child table, vgroup:%d sid:%d", pCreate->tableId, pVgroup->vgId, sid); - rpcRsp.code = mgmtCreateChildTable(pCreate, contLen, pVgroup, sid, &pDCreate, &pTable); + mTrace("thandle:%p, create ctable:%s, vgroup:%d sid:%d ahandle:%p", pMsg->thandle, pCreate->tableId, pVgroup->vgId, sid, pMsg); + code = mgmtCreateChildTable(pCreate, pMsg->contLen, pVgroup, sid, &pMDCreate, &pTable); } else { - mTrace("table:%s, start to create normal table, vgroup:%d sid:%d", pCreate->tableId, pVgroup->vgId, sid); - rpcRsp.code = mgmtCreateNormalTable(pCreate, contLen, pVgroup, sid, &pDCreate, &pTable); + mTrace("thandle:%p, create ntable:%s, vgroup:%d sid:%d ahandle:%p", pMsg->thandle, pCreate->tableId, pVgroup->vgId, sid, pMsg); + code = mgmtCreateNormalTable(pCreate, pMsg->contLen, pVgroup, sid, &pMDCreate, &pTable); } - if (rpcRsp.code != TSDB_CODE_SUCCESS) { - mTrace("table:%s, failed to create table in vgroup:%d sid:%d ", pCreate->tableId, pVgroup->vgId, sid); - rpcSendResponse(&rpcRsp); + if (code != TSDB_CODE_SUCCESS) { + mTrace("thandle:%p, failed to create table:%s in vgroup:%d", pMsg->thandle, pCreate->tableId, pVgroup->vgId); + mgmtSendSimpleResp(pMsg->thandle, code); return; } - assert(pDCreate != NULL); - assert(pTable != NULL); - - SProcessInfo *info = calloc(1, sizeof(SProcessInfo)); - info->type = TSDB_PROCESS_CREATE_TABLE; - info->thandle = thandle; - info->ahandle = pTable; SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); - if (isGetMeta) { - info->type = TSDB_PROCESS_CREATE_TABLE_GET_META; - } - SRpcMsg rpcMsg = { - .handle = info, - .pCont = pCreate, - .contLen = htonl(pDCreate->contLen), - .code = 0, - .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE + .handle = pMsg, + .pCont = pCreate, + .contLen = htonl(pMDCreate->contLen), + .code = 0, + .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE }; mgmtSendMsgToDnode(&ipSet, &rpcMsg); } -int32_t mgmtCreateTable(SCMCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) { - SDbObj *pDb = mgmtGetDb(pCreate->db); - if (pDb == NULL) { - mError("table:%s, failed to create table, db not selected", pCreate->tableId); - return TSDB_CODE_DB_NOT_SELECTED; - } - - STableInfo *pTable = mgmtGetTable(pCreate->tableId); - if (pTable != NULL) { - if (pCreate->igExists) { - mTrace("table:%s, table is already exist, think it success", pCreate->tableId); - return TSDB_CODE_SUCCESS; - } else { - mError("table:%s, failed to create table, table already exist", pCreate->tableId); - return TSDB_CODE_TABLE_ALREADY_EXIST; - } - } - - SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); - assert(pAcct != NULL); - - int32_t code = mgmtCheckTableLimit(pAcct, pCreate->numOfColumns); - if (code != TSDB_CODE_SUCCESS) { - mError("table:%s, failed to create table, table num exceed the limit", pCreate->tableId); - return code; - } - - if (mgmtCheckExpired()) { - mError("table:%s, failed to create table, grant expired", pCreate->tableId); - return TSDB_CODE_GRANT_EXPIRED; - } - - if (pCreate->numOfTags != 0) { - mTrace("table:%s, start to create super table, tags:%d columns:%d", - pCreate->tableId, pCreate->numOfTags, pCreate->numOfColumns); - return mgmtCreateSuperTable(pDb, pCreate); - } - - code = mgmtCheckTimeSeries(pCreate->numOfColumns); - if (code != TSDB_CODE_SUCCESS) { - mError("table:%s, failed to create table, timeseries exceed the limit", pCreate->tableId); - return TSDB_CODE_SUCCESS; - } - - SVgObj *pVgroup = mgmtGetAvailableVgroup(pDb); - if (pVgroup == NULL) { - mTrace("table:%s, no avaliable vgroup, start to create a new one", pCreate->tableId); - mgmtProcessCreateVgroup(pCreate, contLen, thandle, isGetMeta); - } else { - mTrace("table:%s, try to create table in vgroup:%d", pCreate->tableId, pVgroup->vgId); - mgmtProcessCreateTable(pVgroup, pCreate, contLen, thandle, isGetMeta); - } - - return TSDB_CODE_ACTION_IN_PROGRESS; -} - int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore) { STableInfo *pTable = mgmtGetTable(tableId); if (pTable == NULL) { @@ -547,114 +436,145 @@ SMDDropTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable) { } void mgmtSetTableDirty(STableInfo *pTable, bool isDirty) { + // TODO: if dirty, delete from sdb pTable->dirty = isDirty; } -void mgmtProcessCreateTableMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; +void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { + SCMCreateTableMsg *pCreate = pMsg->pCont; + mTrace("thandle:%p, start to create table:%s", pMsg->thandle, pCreate->tableId); - SCMCreateTableMsg *pCreate = (SCMCreateTableMsg *) rpcMsg->pCont; - pCreate->numOfColumns = htons(pCreate->numOfColumns); - pCreate->numOfTags = htons(pCreate->numOfTags); - pCreate->sqlLen = htons(pCreate->sqlLen); + if (mgmtCheckRedirect(pMsg->thandle)) { + mError("thandle:%p, failed to create table:%s, need redirect", pMsg->thandle, pCreate->tableId); + return; + } - SSchema *pSchema = (SSchema*) pCreate->schema; - for (int32_t i = 0; i < pCreate->numOfColumns + pCreate->numOfTags; ++i) { - pSchema->bytes = htons(pSchema->bytes); - pSchema->colId = i; - pSchema++; + if (mgmtCheckExpired()) { + mError("thandle:%p, failed to create table:%s, grant expired", pCreate->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_GRANT_EXPIRED); + return; } - if (mgmtCheckRedirect(rpcMsg->handle) != TSDB_CODE_SUCCESS) { - mError("table:%s, failed to create table, need redirect message", pCreate->tableId); + if (!pMsg->pUser->writeAuth) { + mError("thandle:%p, failed to create table:%s, no rights", pMsg->thandle, pCreate->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); return; } - SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); - if (pUser == NULL) { - mError("table:%s, failed to create table, invalid user", pCreate->tableId); - rpcRsp.code = TSDB_CODE_INVALID_USER; - rpcSendResponse(&rpcRsp); + SAcctObj *pAcct = pMsg->pUser->pAcct; + int32_t code = mgmtCheckTableLimit(pAcct, htons(pCreate->numOfColumns)); + if (code != TSDB_CODE_SUCCESS) { + mError("thandle:%p, failed to create table:%s, exceed the limit", pMsg->thandle, pCreate->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); return; } - if (!pUser->writeAuth) { - mError("table:%s, failed to create table, no rights", pCreate->tableId); - rpcRsp.code = TSDB_CODE_NO_RIGHTS; - rpcSendResponse(&rpcRsp); + SDbObj *pDb = mgmtGetDb(pCreate->db); + if (pDb == NULL) { + mError("thandle:%p, failed to create table:%s, db not selected", pMsg->thandle, pCreate->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED); return; } - int32_t code = mgmtCreateTable(pCreate, rpcMsg->contLen, rpcMsg->handle, false); - if (code != TSDB_CODE_ACTION_IN_PROGRESS) { - rpcRsp.code = code; - rpcSendResponse(&rpcRsp); + STableInfo *pTable = mgmtGetTable(pCreate->tableId); + if (pTable != NULL) { + if (pCreate->igExists) { + mTrace("thandle:%p, table:%s is already exist", pMsg->thandle, pCreate->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS); + return; + } else { + mError("thandle:%p, failed to create table:%s, table already exist", pMsg->thandle, pCreate->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_TABLE_ALREADY_EXIST); + return; + } + } + + if (pCreate->numOfTags != 0) { + mTrace("thandle:%p, start to create super table:%s, tags:%d columns:%d", + pMsg->thandle, pCreate->tableId, pCreate->numOfTags, pCreate->numOfColumns); + code = mgmtCreateSuperTable(pDb, pCreate); + mgmtSendSimpleResp(pMsg->thandle, code); + return; + } + + code = mgmtCheckTimeSeries(pCreate->numOfColumns); + if (code != TSDB_CODE_SUCCESS) { + mError("thandle:%p, failed to create table:%s, timeseries exceed the limit", pMsg->thandle, pCreate->tableId); + mgmtSendSimpleResp(pMsg->thandle, code); + return; + } + + SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg)); + memcpy(newMsg, pMsg, sizeof(SQueuedMsg)); + pMsg->pCont = NULL; + + SVgObj *pVgroup = mgmtGetAvailableVgroup(pDb); + if (pVgroup == NULL) { + mTrace("thandle:%p, table:%s start to create a new vgroup", pMsg->thandle, pCreate->tableId); + mgmtCreateVgroup(pMsg); + } else { + mTrace("thandle:%p, create table:%s in vgroup:%d", pMsg->thandle, pCreate->tableId, pVgroup->vgId); + mgmtCreateTable(pVgroup, pMsg); } } -void mgmtProcessDropTableMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - SCMDropTableMsg *pDrop = (SCMDropTableMsg *) rpcMsg->pCont; +void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { + SCMDropTableMsg *pDrop = pMsg->pCont; - if (mgmtCheckRedirect(rpcMsg->handle) != TSDB_CODE_SUCCESS) { - mError("table:%s, failed to drop table, need redirect message", pDrop->tableId); + if (mgmtCheckRedirect(pMsg->thandle)) { + mError("thandle:%p, failed to drop table:%s, need redirect message", pMsg->thandle, pDrop->tableId); return; } - SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); + SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle); if (pUser == NULL) { mError("table:%s, failed to drop table, invalid user", pDrop->tableId); - rpcRsp.code = TSDB_CODE_INVALID_USER; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_USER); return; } if (!pUser->writeAuth) { mError("table:%s, failed to drop table, no rights", pDrop->tableId); - rpcRsp.code = TSDB_CODE_NO_RIGHTS; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); return; } SDbObj *pDb = mgmtGetDbByTableId(pDrop->tableId); if (pDb == NULL) { mError("table:%s, failed to drop table, db not selected", pDrop->tableId); - rpcRsp.code = TSDB_CODE_DB_NOT_SELECTED; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED); return; } int32_t code = mgmtDropTable(pDb, pDrop->tableId, pDrop->igNotExists); if (code != TSDB_CODE_ACTION_IN_PROGRESS) { - rpcRsp.code = code; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, code); } } -void mgmtProcessAlterTableMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - if (mgmtCheckRedirect(rpcMsg->handle) != TSDB_CODE_SUCCESS) { +void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) { + if (mgmtCheckRedirect(pMsg->thandle)) { return; } - SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); + SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle); if (pUser == NULL) { - rpcRsp.code = TSDB_CODE_INVALID_USER; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_USER); return; } - SCMAlterTableMsg *pAlter = (SCMAlterTableMsg *) rpcMsg->pCont; + SCMAlterTableMsg *pAlter = pMsg->pCont; + int32_t code; if (!pUser->writeAuth) { - rpcRsp.code = TSDB_CODE_NO_RIGHTS; + code = TSDB_CODE_NO_RIGHTS; } else { pAlter->type = htons(pAlter->type); pAlter->numOfCols = htons(pAlter->numOfCols); if (pAlter->numOfCols > 2) { mError("table:%s error numOfCols:%d in alter table", pAlter->tableId, pAlter->numOfCols); - rpcRsp.code = TSDB_CODE_APP_ERROR; + code = TSDB_CODE_APP_ERROR; } else { SDbObj *pDb = mgmtGetDb(pAlter->db); if (pDb) { @@ -662,17 +582,17 @@ void mgmtProcessAlterTableMsg(SRpcMsg *rpcMsg) { pAlter->schema[i].bytes = htons(pAlter->schema[i].bytes); } - rpcRsp.code = mgmtAlterTable(pDb, pAlter); - if (rpcRsp.code == 0) { + code = mgmtAlterTable(pDb, pAlter); + if (code == 0) { mLPrint("table:%s is altered by %s", pAlter->tableId, pUser->user); } } else { - rpcRsp.code = TSDB_CODE_DB_NOT_SELECTED; + code = TSDB_CODE_DB_NOT_SELECTED; } } } - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, code); } void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle) { @@ -707,21 +627,14 @@ void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle) { rpcSendResponse(&rpcRsp); } - -void mgmtProcessTableMetaMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp; - rpcRsp.handle = rpcMsg->handle; - rpcRsp.pCont = NULL; - rpcRsp.contLen = 0; - - SCMTableInfoMsg *pInfo = rpcMsg->pCont; +void mgmtProcessTableMetaMsg(SQueuedMsg *pMsg) { + SCMTableInfoMsg *pInfo = pMsg->pCont; pInfo->createFlag = htons(pInfo->createFlag); - SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); + SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle); if (pUser == NULL) { mError("table:%s, failed to get table meta, invalid user", pInfo->tableId); - rpcRsp.code = TSDB_CODE_INVALID_USER; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_USER); return; } @@ -729,12 +642,11 @@ void mgmtProcessTableMetaMsg(SRpcMsg *rpcMsg) { if (pTable == NULL) { if (pInfo->createFlag != 1) { mError("table:%s, failed to get table meta, table not exist", pInfo->tableId); - rpcRsp.code = TSDB_CODE_INVALID_TABLE; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); return; } else { // on demand create table from super table if table does not exists - if (mgmtCheckRedirect(rpcMsg->handle) != TSDB_CODE_SUCCESS) { + if (mgmtCheckRedirect(pMsg->thandle)) { mError("table:%s, failed to create table while get meta info, need redirect message", pInfo->tableId); return; } @@ -743,8 +655,7 @@ void mgmtProcessTableMetaMsg(SRpcMsg *rpcMsg) { SCMCreateTableMsg *pCreateMsg = rpcMallocCont(contLen); if (pCreateMsg == NULL) { mError("table:%s, failed to create table while get meta info, no enough memory", pInfo->tableId); - rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY); return; } @@ -752,41 +663,34 @@ void mgmtProcessTableMetaMsg(SRpcMsg *rpcMsg) { strcpy(pCreateMsg->tableId, pInfo->tableId); mError("table:%s, start to create table while get meta info", pInfo->tableId); - mgmtCreateTable(pCreateMsg, contLen, rpcMsg->handle, true); +// mgmtCreateTable(pCreateMsg, contLen, pMsg->thandle, true); } } else { - mgmtProcessGetTableMeta(pTable, rpcMsg->handle); + mgmtProcessGetTableMeta(pTable, pMsg->thandle); } } -void mgmtProcessMultiTableMetaMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp; - rpcRsp.handle = rpcMsg->handle; - rpcRsp.pCont = NULL; - rpcRsp.contLen = 0; - +void mgmtProcessMultiTableMetaMsg(SQueuedMsg *pMsg) { SRpcConnInfo connInfo; - if (rpcGetConnInfo(rpcMsg->handle, &connInfo) != 0) { - mError("conn:%p is already released while get mulit table meta", rpcMsg->handle); + if (rpcGetConnInfo(pMsg->thandle, &connInfo) != 0) { + mError("conn:%p is already released while get mulit table meta", pMsg->thandle); return; } bool usePublicIp = (connInfo.serverIp == tsPublicIpInt); SUserObj *pUser = mgmtGetUser(connInfo.user); if (pUser == NULL) { - rpcRsp.code = TSDB_CODE_INVALID_USER; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_USER); return; } - SCMMultiTableInfoMsg *pInfo = rpcMsg->pCont; + SCMMultiTableInfoMsg *pInfo = pMsg->pCont; pInfo->numOfTables = htonl(pInfo->numOfTables); int32_t totalMallocLen = 4*1024*1024; // first malloc 4 MB, subsequent reallocation as twice SMultiTableMeta *pMultiMeta = rpcMallocCont(totalMallocLen); if (pMultiMeta == NULL) { - rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY); return; } @@ -823,29 +727,69 @@ void mgmtProcessMultiTableMetaMsg(SRpcMsg *rpcMsg) { } } + SRpcMsg rpcRsp = {0}; + rpcRsp.handle = pMsg->thandle; rpcRsp.pCont = pMultiMeta; rpcRsp.contLen = pMultiMeta->contLen; rpcSendResponse(&rpcRsp); } -void mgmtProcessSuperTableMetaMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - SCMSuperTableInfoMsg *pInfo = rpcMsg->pCont; +void mgmtProcessSuperTableMetaMsg(SQueuedMsg *pMsg) { + SCMSuperTableInfoMsg *pInfo = pMsg->pCont; STableInfo *pTable = mgmtGetSuperTable(pInfo->tableId); if (pTable == NULL) { - rpcRsp.code = TSDB_CODE_INVALID_TABLE; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); return; } SCMSuperTableInfoRsp *pRsp = mgmtGetSuperTableVgroup((SSuperTableObj *) pTable); if (pRsp != NULL) { int32_t msgLen = sizeof(SSuperTableObj) + htonl(pRsp->numOfDnodes) * sizeof(int32_t); + SRpcMsg rpcRsp = {0}; + rpcRsp.handle = pMsg->thandle; rpcRsp.pCont = pRsp; rpcRsp.contLen = msgLen; rpcSendResponse(&rpcRsp); } else { - rpcRsp.code = TSDB_CODE_INVALID_TABLE; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); } -} \ No newline at end of file +} + +static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) { + if (rpcMsg->handle == NULL) return; + + SQueuedMsg *queueMsg = rpcMsg->handle; + queueMsg->received++; + + STableInfo *pTable = queueMsg->ahandle; + mTrace("thandle:%p, create table:%s rsp received, ahandle:%p code:%d received:%d", + queueMsg->thandle, pTable->tableId, rpcMsg->handle, rpcMsg->code, queueMsg->received); + + if (rpcMsg->code != TSDB_CODE_SUCCESS) { + mgmtSetTableDirty(pTable, true); + //sdbDeleteRow(tsVgroupSdb, pVgroup); + mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code); + mError("table:%s, failed to create in dnode, code:%d, set it dirty", pTable->tableId, rpcMsg->code); + mgmtSetTableDirty(pTable, true); + } else { + mTrace("table:%s, created in dnode", pTable->tableId); + mgmtSetTableDirty(pTable, false); + + if (queueMsg->msgType != TSDB_MSG_TYPE_CM_CREATE_TABLE) { + SQueuedMsg *newMsg = calloc(1, sizeof(SQueuedMsg)); + newMsg->msgType = queueMsg->msgType; + newMsg->thandle = queueMsg->thandle; + newMsg->pDb = queueMsg->pDb; + newMsg->pUser = queueMsg->pUser; + newMsg->contLen = queueMsg->contLen; + newMsg->pCont = rpcMallocCont(newMsg->contLen); + memcpy(newMsg->pCont, queueMsg->pCont, newMsg->contLen); + mTrace("table:%s, start to process get meta", pTable->tableId); + mgmtAddToShellQueue(newMsg); + } else { + mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code); + } + } + + free(queueMsg); +} diff --git a/src/mnode/src/mgmtUser.c b/src/mnode/src/mgmtUser.c index 821e553810d7f430dbe2eb8594eb31bf6e3ec34d..eb39c4c253aee719c7a3e9af15322c6bbcc74677 100644 --- a/src/mnode/src/mgmtUser.c +++ b/src/mnode/src/mgmtUser.c @@ -33,9 +33,9 @@ static int32_t mgmtUpdateUser(SUserObj *pUser); static int32_t mgmtGetUserMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void *pConn); -static void mgmtProcessCreateUserMsg(SRpcMsg *rpcMsg); -static void mgmtProcessAlterUserMsg(SRpcMsg *rpcMsg); -static void mgmtProcessDropUserMsg(SRpcMsg *rpcMsg); +static void mgmtProcessCreateUserMsg(SQueuedMsg *pMsg); +static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg); +static void mgmtProcessDropUserMsg(SQueuedMsg *pMsg); static void *(*mgmtUserActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize); static void *mgmtUserActionInsert(void *row, char *str, int32_t size, int32_t *ssize); @@ -337,52 +337,40 @@ SUserObj *mgmtGetUserFromConn(void *pConn) { return NULL; } -static void mgmtProcessCreateUserMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - if (mgmtCheckRedirect(rpcMsg->handle)) return; - - SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); - if (pUser == NULL) { - rpcRsp.code = TSDB_CODE_INVALID_USER; - rpcSendResponse(&rpcRsp); - return; - } +static void mgmtProcessCreateUserMsg(SQueuedMsg *pMsg) { + if (mgmtCheckRedirect(pMsg->thandle)) return; + int32_t code; + SUserObj *pUser = pMsg->pUser; + if (pUser->superAuth) { - SCMCreateUserMsg *pCreate = rpcMsg->pCont; - rpcRsp.code = mgmtCreateUser(pUser->pAcct, pCreate->user, pCreate->pass); - if (rpcRsp.code == TSDB_CODE_SUCCESS) { + SCMCreateUserMsg *pCreate = pMsg->pCont; + code = mgmtCreateUser(pUser->pAcct, pCreate->user, pCreate->pass); + if (code == TSDB_CODE_SUCCESS) { mLPrint("user:%s is created by %s", pCreate->user, pUser->user); } } else { - rpcRsp.code = TSDB_CODE_NO_RIGHTS; + code = TSDB_CODE_NO_RIGHTS; } - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, code); } -static void mgmtProcessAlterUserMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - if (mgmtCheckRedirect(rpcMsg->handle)) return; +static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg) { + if (mgmtCheckRedirect(pMsg->thandle)) return; - SUserObj *pOperUser = mgmtGetUserFromConn(rpcMsg->handle); - if (pOperUser == NULL) { - rpcRsp.code = TSDB_CODE_INVALID_USER; - rpcSendResponse(&rpcRsp); - return; - } - - SCMAlterUserMsg *pAlter = rpcMsg->pCont; + int32_t code; + SUserObj *pOperUser = pMsg->pUser; + + SCMAlterUserMsg *pAlter = pMsg->pCont; SUserObj *pUser = mgmtGetUser(pAlter->user); if (pUser == NULL) { - rpcRsp.code = TSDB_CODE_INVALID_USER; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_USER); return; } if (strcmp(pUser->user, "monitor") == 0 || (strcmp(pUser->user + 1, pUser->acct) == 0 && pUser->user[0] == '_')) { - rpcRsp.code = TSDB_CODE_NO_RIGHTS; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); return; } @@ -405,13 +393,13 @@ static void mgmtProcessAlterUserMsg(SRpcMsg *rpcMsg) { if (hasRight) { memset(pUser->pass, 0, sizeof(pUser->pass)); taosEncryptPass((uint8_t*)pAlter->pass, strlen(pAlter->pass), pUser->pass); - rpcRsp.code = mgmtUpdateUser(pUser); - mLPrint("user:%s password is altered by %s, code:%d", pAlter->user, pUser->user, rpcRsp.code); + code = mgmtUpdateUser(pUser); + mLPrint("user:%s password is altered by %s, code:%d", pAlter->user, pUser->user, code); } else { - rpcRsp.code = TSDB_CODE_NO_RIGHTS; + code = TSDB_CODE_NO_RIGHTS; } - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, code); return; } @@ -454,42 +442,34 @@ static void mgmtProcessAlterUserMsg(SRpcMsg *rpcMsg) { pUser->writeAuth = 1; } - rpcRsp.code = mgmtUpdateUser(pUser); - mLPrint("user:%s privilege is altered by %s, code:%d", pAlter->user, pUser->user, rpcRsp.code); + code = mgmtUpdateUser(pUser); + mLPrint("user:%s privilege is altered by %s, code:%d", pAlter->user, pUser->user, code); } else { - rpcRsp.code = TSDB_CODE_NO_RIGHTS; + code = TSDB_CODE_NO_RIGHTS; } - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, code); return; } - rpcRsp.code = TSDB_CODE_NO_RIGHTS; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); } -static void mgmtProcessDropUserMsg(SRpcMsg *rpcMsg) { - SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - if (mgmtCheckRedirect(rpcMsg->handle)) return; +static void mgmtProcessDropUserMsg(SQueuedMsg *pMsg) { + if (mgmtCheckRedirect(pMsg->thandle)) return; - SUserObj *pOperUser = mgmtGetUserFromConn(rpcMsg->handle); - if (pOperUser == NULL) { - rpcRsp.code = TSDB_CODE_INVALID_USER; - rpcSendResponse(&rpcRsp); - return ; - } + int32_t code; + SUserObj *pOperUser = pMsg->pUser; - SCMDropUserMsg *pDrop = rpcMsg->pCont; + SCMDropUserMsg *pDrop = pMsg->pCont; SUserObj *pUser = mgmtGetUser(pDrop->user); if (pUser == NULL) { - rpcRsp.code = TSDB_CODE_INVALID_USER; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_USER); return ; } if (strcmp(pUser->user, "monitor") == 0 || (strcmp(pUser->user + 1, pUser->acct) == 0 && pUser->user[0] == '_')) { - rpcRsp.code = TSDB_CODE_NO_RIGHTS; - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); return ; } @@ -511,13 +491,13 @@ static void mgmtProcessDropUserMsg(SRpcMsg *rpcMsg) { } if (hasRight) { - rpcRsp.code = mgmtDropUser(pUser->pAcct, pDrop->user); - if (rpcRsp.code == TSDB_CODE_SUCCESS) { + code = mgmtDropUser(pUser->pAcct, pDrop->user); + if (code == TSDB_CODE_SUCCESS) { mLPrint("user:%s is dropped by %s", pDrop->user, pUser->user); } } else { - rpcRsp.code = TSDB_CODE_NO_RIGHTS; + code = TSDB_CODE_NO_RIGHTS; } - rpcSendResponse(&rpcRsp); + mgmtSendSimpleResp(pMsg->thandle, code); } diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index c4a11bd11ea269ba5a1cdc2dacc99348c8297de3..8e3dd78ab90252c5e67fac95d234b37356b91252 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -24,6 +24,7 @@ #include "mgmtDb.h" #include "mgmtDClient.h" #include "mgmtDnode.h" +#include "mgmtProfile.h" #include "mgmtShell.h" #include "mgmtTable.h" #include "mgmtVgroup.h" @@ -42,6 +43,9 @@ static void *mgmtVgroupActionDestroy(void *row, char *str, int32_t size, int32_t static int32_t mgmtGetVgroupMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn); +static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg); + +void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); static void mgmtVgroupActionInit() { SVgObj tObj; @@ -114,6 +118,7 @@ int32_t mgmtInitVgroups() { mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_VGROUP, mgmtGetVgroupMeta); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_VGROUP, mgmtRetrieveVgroups); + mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mgmtProcessCreateVnodeRsp); mTrace("vgroup is initialized"); return 0; @@ -123,19 +128,6 @@ SVgObj *mgmtGetVgroup(int32_t vgId) { return (SVgObj *)sdbGetRow(tsVgroupSdb, &vgId); } -int32_t mgmtAllocateSid(SDbObj *pDb, SVgObj *pVgroup) { - int32_t sid = taosAllocateId(pVgroup->idPool); - if (sid < 0) { - mWarn("table:%s, vgroup:%d run out of ID, num:%d", pDb->name, pVgroup->vgId, taosIdPoolNumOfUsed(pVgroup->idPool)); - pDb->vgStatus = TSDB_VG_STATUS_IN_PROGRESS; - mgmtCreateVgroup(pDb); - terrno = TSDB_CODE_ACTION_IN_PROGRESS; - } - - terrno = 0; - return sid; -} - /* * TODO: check if there is enough sids */ @@ -155,21 +147,25 @@ void mgmtProcessVgTimer(void *handle, void *tmrId) { pDb->vgTimer = NULL; } -SVgObj *mgmtCreateVgroup(SDbObj *pDb) { +void mgmtCreateVgroup(SQueuedMsg *pMsg) { + SDbObj *pDb = pMsg->pDb; + if (pDb == NULL) { + mError("thandle:%p, failed to create vgroup, db not found", pMsg->thandle); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_DB); + return; + } + SVgObj *pVgroup = (SVgObj *)calloc(sizeof(SVgObj), 1); strcpy(pVgroup->dbName, pDb->name); pVgroup->numOfVnodes = pDb->cfg.replications; - pVgroup->createdTime = taosGetTimestampMs(); - - // based on load balance, create a new one if (mgmtAllocVnodes(pVgroup) != 0) { - mError("db:%s, no enough free dnode to alloc %d vnodes", pDb->name, pVgroup->numOfVnodes); + mError("thandle:%p, db:%s no enough dnode to alloc %d vnodes", pMsg->thandle, pDb->name, pVgroup->numOfVnodes); free(pVgroup); - pDb->vgStatus = TSDB_VG_STATUS_FULL; - taosTmrReset(mgmtProcessVgTimer, 5000, pDb, tsMgmtTmr, &pDb->vgTimer); - return NULL; + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_ENOUGH_DNODES); + return; } + pVgroup->createdTime = taosGetTimestampMs(); pVgroup->tableList = (STableInfo **) calloc(sizeof(STableInfo *), pDb->cfg.maxSessions); pVgroup->numOfTables = 0; pVgroup->idPool = taosInitIdPool(pDb->cfg.maxSessions); @@ -179,11 +175,14 @@ SVgObj *mgmtCreateVgroup(SDbObj *pDb) { sdbInsertRow(tsVgroupSdb, pVgroup, 0); - mTrace("vgroup:%d, vgroup is created, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); - for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) - mTrace("vgroup:%d, dnode:%s vnode:%d is created", pVgroup->vgId, taosIpStr(pVgroup->vnodeGid[i].ip), pVgroup->vnodeGid[i].vnode); + mPrint("thandle:%p, vgroup:%d is created in mnode, db:%s replica:%d", pMsg->thandle, pVgroup->vgId, pDb->name, + pVgroup->numOfVnodes); + for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { + mPrint("thandle:%p, vgroup:%d, dnode:%s vnode:%d", pMsg->thandle, pVgroup->vgId, + taosIpStr(pVgroup->vnodeGid[i].ip), pVgroup->vnodeGid[i].vnode); + } - return pVgroup; + mgmtSendCreateVgroupMsg(pVgroup, pMsg); } int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) { @@ -514,13 +513,13 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode) { SDbObj *pDb = mgmtGetDb(pVgroup->dbName); if (pDb == NULL) return NULL; - SMDCreateVnodeMsg *pVPeers = rpcMallocCont(sizeof(SMDCreateVnodeMsg)); - if (pVPeers == NULL) return NULL; + SMDCreateVnodeMsg *pVnode = rpcMallocCont(sizeof(SMDCreateVnodeMsg)); + if (pVnode == NULL) return NULL; - pVPeers->vnode = htonl(vnode); - pVPeers->cfg = pDb->cfg; + pVnode->vnode = htonl(vnode); + pVnode->cfg = pDb->cfg; - SVnodeCfg *pCfg = &pVPeers->cfg; + SVnodeCfg *pCfg = &pVnode->cfg; pCfg->vgId = htonl(pVgroup->vgId); pCfg->maxSessions = htonl(pCfg->maxSessions); pCfg->cacheBlockSize = htonl(pCfg->cacheBlockSize); @@ -534,13 +533,14 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode) { pCfg->replications = (char) pVgroup->numOfVnodes; pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock); - SVnodeDesc *vpeerDesc = pVPeers->vpeerDesc; + SVnodeDesc *vpeerDesc = pVnode->vpeerDesc; for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) { - vpeerDesc[j].ip = htonl(pVgroup->vnodeGid[j].ip); + vpeerDesc[j].vgId = htonl(pVgroup->vgId); + vpeerDesc[j].ip = htonl(pVgroup->vnodeGid[j].ip); vpeerDesc[j].vnode = htonl(pVgroup->vnodeGid[j].vnode); } - return pVPeers; + return pVnode; } SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode) { @@ -558,7 +558,11 @@ SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode) { } SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup) { - SRpcIpSet ipSet = {.numOfIps = pVgroup->numOfVnodes, .inUse = 0, .port = tsMnodeDnodePort + 1}; + SRpcIpSet ipSet = { + .numOfIps = pVgroup->numOfVnodes, + .inUse = 0, + .port = tsMnodeDnodePort + }; for (int i = 0; i < pVgroup->numOfVnodes; ++i) { ipSet.ip[i] = pVgroup->vnodeGid[i].ip; } @@ -566,7 +570,12 @@ SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup) { } SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip) { - SRpcIpSet ipSet = {.ip[0] = ip, .numOfIps = 1, .inUse = 0, .port = tsMnodeDnodePort + 1}; + SRpcIpSet ipSet = { + .ip[0] = ip, + .numOfIps = 1, + .inUse = 0, + .port = tsMnodeDnodePort + }; return ipSet; } @@ -574,19 +583,54 @@ void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, vo mTrace("vgroup:%d, send create vnode:%d msg, ahandle:%p", pVgroup->vgId, vnode, ahandle); SMDCreateVnodeMsg *pCreate = mgmtBuildCreateVnodeMsg(pVgroup, vnode); SRpcMsg rpcMsg = { - .handle = ahandle, - .pCont = pCreate, - .contLen = pCreate ? sizeof(SMDCreateVnodeMsg) : 0, - .code = 0, - .msgType = TSDB_MSG_TYPE_MD_CREATE_VNODE + .handle = ahandle, + .pCont = pCreate, + .contLen = pCreate ? sizeof(SMDCreateVnodeMsg) : 0, + .code = 0, + .msgType = TSDB_MSG_TYPE_MD_CREATE_VNODE }; mgmtSendMsgToDnode(ipSet, &rpcMsg); } void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) { - mTrace("vgroup:%d, send create all vnodes msg, handle:%p", pVgroup->vgId, ahandle); + mTrace("send create vgroup:%d msg, ahandle:%p", pVgroup->vgId, ahandle); for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip); mgmtSendCreateVnodeMsg(pVgroup, pVgroup->vnodeGid[i].vnode, &ipSet, ahandle); } +} + +static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { + if (rpcMsg->handle == NULL) return; + + SQueuedMsg *queueMsg = rpcMsg->handle; + queueMsg->received++; + if (rpcMsg->code == TSDB_CODE_SUCCESS) { + queueMsg->code = rpcMsg->code; + queueMsg->successed++; + } + + SVgObj *pVgroup = queueMsg->ahandle; + mTrace("thandle:%p, vgroup:%d create vnode rsp received, ahandle:%p code:%d received:%d successed:%d expected:%d", + queueMsg->thandle, pVgroup->vgId, rpcMsg->handle, rpcMsg->code, queueMsg->received, queueMsg->successed, + queueMsg->expected); + + if (queueMsg->received != queueMsg->expected) return; + + if (queueMsg->received == queueMsg->successed) { + SQueuedMsg *newMsg = calloc(1, sizeof(SQueuedMsg)); + newMsg->msgType = queueMsg->msgType; + newMsg->thandle = queueMsg->thandle; + newMsg->pDb = queueMsg->pDb; + newMsg->pUser = queueMsg->pUser; + newMsg->contLen = queueMsg->contLen; + newMsg->pCont = rpcMallocCont(newMsg->contLen); + memcpy(newMsg->pCont, queueMsg->pCont, newMsg->contLen); + mgmtAddToShellQueue(newMsg); + } else { + sdbDeleteRow(tsVgroupSdb, pVgroup); + mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code); + } + + free(queueMsg); } \ No newline at end of file