diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 68632dc41fac029fa62a1670619b54f5a5bb72b3..3bfaaa7305ec2fb30c7c069f8942cc5f4677ef7e 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -31,44 +31,49 @@ void (*dnodeInitMgmtIpFp)() = NULL; int32_t (*dnodeInitMgmtFp)() = NULL; void (*dnodeProcessStatusRspFp)(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) = NULL; void (*dnodeSendMsgToMnodeFp)(int8_t msgType, void *pCont, int32_t contLen) = NULL; +void (*dnodeSendRspToMnodeFp)(void *handle, int32_t code, void *pCont, int contLen) = NULL; static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn); static void dnodeInitProcessShellMsg(); static void dnodeSendMsgToMnodeQueueFp(SSchedMsg *sched) { - int8_t msgType = *(int8_t *) (sched->msg - sizeof(int32_t) - sizeof(int8_t)); - int32_t contLen = *(int32_t *) (sched->msg - sizeof(int8_t)); - int8_t *pCont = sched->msg; - void *pConn = NULL; + int32_t contLen = *(int32_t *) (sched->msg - 4); + int32_t code = *(int32_t *) (sched->msg - 8); + int8_t msgType = *(int8_t *) (sched->msg - 9); + void *handle = sched->ahandle; + int8_t *pCont = sched->msg; - mgmtProcessMsgFromDnode(pCont, contLen, msgType, pConn); + mgmtProcessMsgFromDnode(pCont, contLen, handle, code); rpcFreeCont(sched->msg); } -void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen) { +void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen, void *ahandle) { dTrace("msg:%s is sent to mnode", taosMsg[msgType]); if (dnodeSendMsgToMnodeFp) { dnodeSendMsgToMnodeFp(msgType, pCont, contLen); } else { SSchedMsg schedMsg = {0}; - schedMsg.fp = dnodeSendMsgToMnodeQueueFp; - schedMsg.msg = pCont; - *(int8_t *) (pCont - sizeof(int32_t) - sizeof(int8_t)) = msgType; - *(int32_t *) (pCont - sizeof(int8_t)) = contLen; + schedMsg.fp = dnodeSendMsgToMnodeQueueFp; + schedMsg.msg = pCont; + schedMsg.ahandle = ahandle; + *(int32_t *) (pCont - 4) = contLen; + *(int32_t *) (pCont - 8) = TSDB_CODE_SUCCESS; + *(int8_t *) (pCont - 9) = msgType; taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg); } } void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen) { dTrace("rsp:%s is sent to mnode", taosMsg[msgType]); - if (tsIsCluster) { - rpcSendResponse(pConn, code, pCont, contLen); + if (dnodeSendRspToMnodeFp) { + dnodeSendRspToMnodeFp(pConn, code, pCont, contLen); } else { SSchedMsg schedMsg = {0}; schedMsg.fp = dnodeSendMsgToMnodeFp; schedMsg.msg = pCont; - *(int8_t *) (pCont - sizeof(int32_t) - sizeof(int8_t)) = msgType; - *(int32_t *) (pCont - sizeof(int8_t)) = contLen; + *(int32_t *) (pCont - 4) = contLen; + *(int32_t *) (pCont - 8) = code; + *(int8_t *) (pCont - 9) = msgType; taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg); } } @@ -88,7 +93,7 @@ void dnodeInitMgmtIp() { } } -void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int contLen, void *pConn, int32_t code) { +void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int contLen, void *handle, int32_t code) { if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) { dError("invalid msg type:%d", msgType); } else { diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 65b17a88a0ba39b7d3d445557497f6cdef977ff1..894ef245fad099b21eb12e083dbeddfe3c8befd9 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -46,6 +46,11 @@ extern void (*dnodeParseParameterK)(); extern int32_t (*dnodeCheckSystem)(); +// dnodeSystem +extern void *tsDnodeMgmtQhandle; + +void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int contLen, void *pConn, int32_t code); + // dnodeModule extern void (*dnodeStartModules)(); diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 70ded6a8151fb049fd0a7bc23181fd9097c13cc7..22d3b340d7efcefab6ab96bb9d539e35eb914b50 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -258,11 +258,13 @@ typedef struct { //mgmtSystem int32_t mgmtStartSystem(); void mgmtCleanUpSystem(); -void mgmtProcessMsgFromDnode(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn); +void mgmtProcessMsgFromDnode(char msgType, void *pCont, int contLen, void *pConn, int32_t code); extern int32_t (*mgmtInitSystem)(); extern void (*mgmtStopSystem)(); extern void (*mgmtCleanUpRedirect)(); + + #ifdef __cplusplus } #endif diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index f153a9a5739ebbae5c84b4a36065776472aa8083..6861d31b873a7898ae1e9812ade92cb25ff0c543 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -162,6 +162,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FAILED_TO_LOCK_RESOURCES, 0, 117, "failed to lock TAOS_DEFINE_ERROR(TSDB_CODE_TABLE_ID_MISMATCH, 0, 118, "table id mismatch") TAOS_DEFINE_ERROR(TSDB_CODE_QUERY_CACHE_ERASED, 0, 119, "query cache erased") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MSG, 0, 120, "invalid message") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TABLE_TYPE, 0, 121, "invalid table typee") #ifdef TAOS_ERROR_C }; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 211e8d10bb05980dee3238fbd4548388e6996260..e6202673bc0019f0dc9677ec76b180d22d3eb110 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -240,10 +240,11 @@ typedef struct { int16_t numOfTags; int32_t tagDataLen; int32_t sqlDataLen; + int32_t contLen; uint64_t createdTime; char tableId[TSDB_TABLE_ID_LEN + 1]; char superTableId[TSDB_TABLE_ID_LEN + 1]; - int8_t data[]; + char data[]; } SDCreateTableMsg; typedef struct { @@ -325,9 +326,13 @@ typedef struct { short vnode; int32_t sid; uint64_t uid; - char tableId[TSDB_TABLE_ID_LEN]; + char tableId[TSDB_TABLE_ID_LEN + 1]; } SDRemoveTableMsg; +typedef struct { + char tableId[TSDB_TABLE_ID_LEN + 1]; +} SDRemoveSuperTableMsg; + typedef struct { int32_t vnode; } SFreeVnodeMsg; @@ -603,8 +608,8 @@ typedef struct { } SSecIe; typedef struct { - int32_t dnode; //the ID of dnode - int32_t vnode; //the index of vnode + uint32_t dnode; //the ip of dnode + int32_t vnode; //the index of vnode uint32_t ip; } SVPeerDesc; @@ -616,7 +621,7 @@ typedef struct { typedef struct { int32_t vnode; SVnodeCfg cfg; - SVPeerDesc vpeerDesc[]; + SVPeerDesc vpeerDesc[TSDB_MAX_MPEERS]; } SVPeersMsg; typedef struct { @@ -732,11 +737,13 @@ typedef struct { } SCreateMnodeMsg, SDropMnodeMsg, SCreateDnodeMsg, SDropDnodeMsg; typedef struct { - int32_t vnode; - int32_t sid; + uint32_t dnode; + int32_t vnode; + int32_t sid; } STableCfgMsg; typedef struct { + uint32_t dnode; int32_t vnode; } SVpeerCfgMsg; diff --git a/src/mnode/inc/mgmtChildTable.h b/src/mnode/inc/mgmtChildTable.h index 4ba19854cff2e849db585edbabbf7e956281cde0..79672d96d07f8355408e96a4810461e58927a377 100644 --- a/src/mnode/inc/mgmtChildTable.h +++ b/src/mnode/inc/mgmtChildTable.h @@ -35,7 +35,7 @@ int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgr int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable); int32_t mgmtAlterChildTable(SDbObj *pDb, SAlterTableMsg *pAlter); int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent); -int8_t *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup); +SCreateTableMsg *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable); int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMeta *pMeta, bool usePublicIp); diff --git a/src/mnode/inc/mgmtDnode.h b/src/mnode/inc/mgmtDnode.h index 9ee3910abb626fa26aabc6564d846a49753f5f8b..6159d5e5dca0336fa979295da79d04688600e553 100644 --- a/src/mnode/inc/mgmtDnode.h +++ b/src/mnode/inc/mgmtDnode.h @@ -57,6 +57,7 @@ extern bool (*mgmtCheckConfigShow)(SGlobalConfig *cfg); extern SDnodeObj tsDnodeObj; + #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtDnodeInt.h b/src/mnode/inc/mgmtDnodeInt.h index 816d916a2a6fa7ca0585a113d6d2255edbadb406..19a7c7b034e082e16ce166c296a2afdc6c7dd7ce 100644 --- a/src/mnode/inc/mgmtDnodeInt.h +++ b/src/mnode/inc/mgmtDnodeInt.h @@ -26,23 +26,19 @@ extern "C" { extern void *mgmtStatusTimer; -int32_t mgmtSendCreateTableMsg(SChildTableObj *pTable, SVgObj *pVgroup); -int32_t mgmtSendCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup); +void mgmtSendCreateTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *handle); +void mgmtSendRemoveTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *handle); +void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *handle); +void mgmtSendVPeersMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *handle); +void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *handle); -int mgmtSendRemoveMeterMsgToDnode(STableInfo *pTable, SVgObj *pVgroup); -int mgmtSendVPeersMsg(SVgObj *pVgroup); -int mgmtSendFreeVnodeMsg(SVgObj *pVgroup); -int mgmtSendOneFreeVnodeMsg(SVnodeGid *pVnodeGid); -char *taosBuildRspMsgToDnode(SDnodeObj *pObj, char type); -char *taosBuildReqMsgToDnode(SDnodeObj *pObj, char type); - -extern int32_t (*mgmtSendSimpleRspToDnode)(void *pConn, int32_t msgType, int32_t code); -extern int32_t (*mgmtSendMsgToDnode)(int8_t *pCont, int32_t contLen, int8_t msgType); extern int32_t (*mgmtInitDnodeInt)(); extern void (*mgmtCleanUpDnodeInt)(); extern void (*mgmtProcessDnodeStatus)(void *handle, void *tmrId); +void mgmtSendMsgToDnode(int8_t msgType, void *pCont, int32_t contLen, void *ahandle); +void mgmtSendRspToDnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtTable.h b/src/mnode/inc/mgmtTable.h index 84de446f619ea76ccc1fd1a24af0aea6537dd642..50e6991fed8b474beaa4af3973eacf3f7be3ea87 100644 --- a/src/mnode/inc/mgmtTable.h +++ b/src/mnode/inc/mgmtTable.h @@ -41,6 +41,9 @@ void mgmtCleanUpMeters(); void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable); void mgmtRemoveTableFromSuperTable(SSuperTableObj *pStable); +SDCreateTableMsg *mgmtBuildCreateTableMsg(STableInfo *pTable); +SDRemoveTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable); +SDRemoveSuperTableMsg *mgmtBuildRemoveSuperTableMsg(STableInfo *pTable); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtVgroup.h b/src/mnode/inc/mgmtVgroup.h index 65bdd076655cacc8d0e21b2ebc0b064030b3a146..249d7d133cf8b7077ff6cf523d666351c9507b5c 100644 --- a/src/mnode/inc/mgmtVgroup.h +++ b/src/mnode/inc/mgmtVgroup.h @@ -27,6 +27,7 @@ extern "C" { int32_t mgmtInitVgroups(); void mgmtCleanUpVgroups(); SVgObj *mgmtGetVgroup(int32_t vgId); +SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode); SVgObj *mgmtCreateVgroup(SDbObj *pDb); int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup); @@ -41,6 +42,11 @@ SVgObj *mgmtGetAvailVgroup(SDbObj *pDb, int32_t *sid); void mgmtAddTableIntoVgroup(SVgObj *pVgroup, STableInfo *pTable); void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable); +SVPeersMsg *mgmtBuildVpeersMsg(SVgObj *pVgroup, int32_t vnode); + +SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup); +SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip); + #ifdef __cplusplus } #endif diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index a9454c689f363dfe6eb018e5a9bb24f60f550bfd..091243db278b89a7578573ed370e7539e7ceb570 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -273,7 +273,7 @@ void mgmtCleanUpChildTables() { sdbCloseTable(tsChildTableSdb); } -int8_t *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup) { +SCreateTableMsg *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable) { // SCreateTableMsg *pCreateTable = (SCreateTableMsg *) pMsg; // memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN); // memcpy(pCreateTable->superTableId, pTable->superTable->tableId, TSDB_TABLE_ID_LEN); @@ -360,7 +360,7 @@ int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) { return TSDB_CODE_OTHERS; } - mgmtSendRemoveMeterMsgToDnode((STableInfo *) pTable, pVgroup); + mgmtSendRemoveTableMsg((STableInfo *) pTable, pVgroup); sdbDeleteRow(tsChildTableSdb, pTable); diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index c34670330c3532751ddd69801545fc8518abadd6..7e9613d5b3261c578144d629f98b2e6dedbaa8d1 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -295,7 +295,7 @@ int32_t mgmtSetDbDropping(SDbObj *pDb) { } } } - mgmtSendFreeVnodeMsg(pVgroup); + mgmtSendFreeVnodesMsg(pVgroup); pVgroup = pVgroup->next; } @@ -355,7 +355,7 @@ int32_t mgmtDropDb(SDbObj *pDb) { if (!finished) { SVgObj *pVgroup = pDb->pHead; while (pVgroup != NULL) { - mgmtSendFreeVnodeMsg(pVgroup); + mgmtSendFreeVnodesMsg(pVgroup); pVgroup = pVgroup->next; } return TSDB_CODE_ACTION_IN_PROGRESS; diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index 95887af157925e1f3151f9c1d8c6c40ad15672ad..5a9e9aff358dfe686f23a4422c8200bc3bd87e36 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -22,6 +22,7 @@ #include "mgmtDnode.h" #include "mgmtBalance.h" #include "mgmtUser.h" +#include "mgmtVgroup.h" SDnodeObj tsDnodeObj; @@ -596,4 +597,5 @@ bool mgmtCheckConfigShowImp(SGlobalConfig *cfg) { return true; } -bool (*mgmtCheckConfigShow)(SGlobalConfig *cfg) = mgmtCheckConfigShowImp; \ No newline at end of file +bool (*mgmtCheckConfigShow)(SGlobalConfig *cfg) = mgmtCheckConfigShowImp; + diff --git a/src/mnode/src/mgmtDnodeInt.c b/src/mnode/src/mgmtDnodeInt.c index 088f76ea510d4312556abb1d3aafb0789b5e51ab..d4294e64a43169622b17c7a848b17840174c87bc 100644 --- a/src/mnode/src/mgmtDnodeInt.c +++ b/src/mnode/src/mgmtDnodeInt.c @@ -15,428 +15,198 @@ #define _DEFAULT_SOURCE #include "os.h" - -#include "mnode.h" +#include "taoserror.h" +#include "tsched.h" +#include "tstatus.h" +#include "tsystem.h" +#include "tutil.h" #include "dnode.h" -#include "mgmtDnodeInt.h" +#include "mnode.h" #include "mgmtBalance.h" -#include "mgmtDnode.h" #include "mgmtDb.h" -#include "mgmtVgroup.h" +#include "mgmtDnode.h" +#include "mgmtDnodeInt.h" #include "mgmtTable.h" -#include "tutil.h" -#include "tstatus.h" -#include "tsystem.h" -#include "tsched.h" -#include "taoserror.h" -#include "dnodeSystem.h" -#include "mgmtChildTable.h" -#include "mgmtNormalTable.h" - -void mgmtProcessMsgFromDnode(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn); -int mgmtSendVPeersMsg(SVgObj *pVgroup); -char *mgmtBuildVpeersIe(char *pMsg, SVgObj *pVgroup, int vnode); -//char *mgmtBuildCreateMeterIe(STabObj *pTable, char *pMsg, int vnode); -extern void *tsDnodeMgmtQhandle; -void * mgmtStatusTimer = NULL; - -void mgmtSendMsgToDnodeImpFp(SSchedMsg *sched) { - int8_t msgType = *(int8_t *) (sched->msg - sizeof(int32_t) - sizeof(int8_t)); - int32_t contLen = *(int32_t *) (sched->msg - sizeof(int8_t)); - int8_t *pCont = sched->msg; - void *pConn = NULL; - - dnodeProcessMsgFromMgmt(msgType, pCont, contLen, pConn, TSDB_CODE_SUCCESS); +#include "mgmtVgroup.h" + +void (*mgmtSendMsgToDnodeFp)(int8_t msgType, void *pCont, int32_t contLen, void *ahandle) = NULL; +void (*mgmtSendRspToDnodeFp)(void *handle, int32_t code, void *pCont, int32_t contLen) = NULL; +void *mgmtStatusTimer = NULL; + +static void mgmtSendMsgToDnodeQueueFp(SSchedMsg *sched) { + int32_t contLen = *(int32_t *) (sched->msg - 4); + int32_t code = *(int32_t *) (sched->msg - 8); + int8_t msgType = *(int8_t *) (sched->msg - 9); + void *ahandle = sched->ahandle; + int8_t *pCont = sched->msg; + + dnodeProcessMsgFromMgmt(msgType, pCont, contLen, ahandle, code); rpcFreeCont(sched->msg); } -int32_t mgmtSendMsgToDnodeImp(int8_t *pCont, int32_t contLen, int8_t msgType) { +void mgmtSendMsgToDnode(int8_t msgType, void *pCont, int32_t contLen, void *ahandle) { mTrace("msg:%s is sent to dnode", taosMsg[msgType]); - *(int8_t *) (pCont - sizeof(int32_t) - sizeof(int8_t)) = msgType; - *(int32_t *) (pCont - sizeof(int8_t)) = contLen; - - SSchedMsg schedMsg = {0}; - schedMsg.fp = mgmtSendMsgToDnodeImpFp; - schedMsg.msg = pCont; - - taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg); + if (mgmtSendMsgToDnodeFp) { + mgmtSendMsgToDnodeFp(msgType, pCont, contLen, ahandle); + } else { + SSchedMsg schedMsg = {0}; + schedMsg.fp = mgmtSendMsgToDnodeQueueFp; + schedMsg.msg = pCont; + schedMsg.ahandle = ahandle; + *(int32_t *) (pCont - 4) = contLen; + *(int32_t *) (pCont - 8) = TSDB_CODE_SUCCESS; + *(int8_t *) (pCont - 9) = msgType; + taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg); + } +} - return TSDB_CODE_SUCCESS; +void mgmtSendRspToDnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen) { + mTrace("rsp:%s is sent to dnode", taosMsg[msgType]); + if (mgmtSendRspToDnodeFp) { + mgmtSendRspToDnodeFp(pConn, code, pCont, contLen); + } else { + SSchedMsg schedMsg = {0}; + schedMsg.fp = mgmtSendMsgToDnodeQueueFp; + schedMsg.msg = pCont; + *(int32_t *) (pCont - 4) = contLen; + *(int32_t *) (pCont - 8) = code; + *(int8_t *) (pCont - 9) = msgType; + taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg); + } } -int32_t (*mgmtSendMsgToDnode)(int8_t *pCont, int32_t contLen, int8_t msgType) = mgmtSendMsgToDnodeImp; +static void mgmtProcessTableCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *pConn) { + STableCfgMsg *pCfg = (STableCfgMsg *) pCont; + pCfg->dnode = htonl(pCfg->dnode); + pCfg->vnode = htonl(pCfg->vnode); + pCfg->sid = htonl(pCfg->sid); + mTrace("dnode:%s, vnode:%d, sid:%d, receive table config msg", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid); + + if (!sdbMaster) { + mError("dnode:%s, vnode:%d, sid:%d, not master, redirect it", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid); + mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_REDIRECT, NULL, 0); + return; + } -int32_t mgmtSendSimpleRspToDnodeImp(void *pConn, int32_t msgType, int32_t code) { - int8_t *pCont = rpcMallocCont(sizeof(int32_t)); - *(int32_t *) pCont = code; + STableInfo *pTable = mgmtGetTableByPos(pCfg->dnode, pCfg->vnode, pCfg->sid); + if (pTable == NULL) { + mError("dnode:%s, vnode:%d, sid:%d, table not found", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid); + mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_INVALID_TABLE, NULL, 0); + return; + } - mgmtSendMsgToDnodeImp(pCont, sizeof(int32_t), msgType); - return TSDB_CODE_SUCCESS; + mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0); + mgmtSendCreateTableMsg(pTable, NULL); } -int32_t (*mgmtSendSimpleRspToDnode)(void *pConn, int32_t msgType, int32_t code) = mgmtSendSimpleRspToDnodeImp; - -int32_t mgmtProcessMeterCfgMsg(int8_t *pCont, int32_t contLen, void *pConn) { +static void mgmtProcessVnodeCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *pConn) { if (!sdbMaster) { - mgmtSendSimpleRspToDnode(pConn, TSDB_MSG_TYPE_TABLE_CFG_RSP, TSDB_CODE_REDIRECT); - return TSDB_CODE_REDIRECT; + mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_REDIRECT, NULL, 0); + return; } - STableCfgMsg *cfg = (STableCfgMsg *) pConn; - int32_t vnode = htonl(cfg->vnode); - int32_t sid = htonl(cfg->sid); + SVpeerCfgMsg *pCfg = (SVpeerCfgMsg *) pCont; + pCfg->dnode = htonl(pCfg->dnode); + pCfg->vnode = htonl(pCfg->vnode); - STableInfo *pTable = mgmtGetTableByPos(0, vnode, sid); - if (pTable == NULL) { - mgmtSendSimpleRspToDnode(pConn, TSDB_MSG_TYPE_TABLE_CFG_RSP, TSDB_CODE_INVALID_TABLE); - return TSDB_CODE_INVALID_TABLE_ID; + SVgObj *pVgroup = mgmtGetVgroupByVnode(pCfg->dnode, pCfg->vnode); + if (pVgroup == NULL) { + mTrace("dnode:%s, vnode:%d, no vgroup info", taosIpStr(pCfg->dnode), pCfg->vnode); + mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_NOT_ACTIVE_VNODE, NULL, 0); + return; } - int8_t *pCreateTableMsg = NULL; - if (pTable->type == TSDB_TABLE_TYPE_NORMAL_TABLE) { - pCreateTableMsg = mgmtBuildCreateNormalTableMsg((SNormalTableObj *)pTable); - } else if (pTable->type == TSDB_TABLE_TYPE_CHILD_TABLE) { - pCreateTableMsg = mgmtBuildCreateNormalTableMsg((SNormalTableObj *)pTable); - } else if (pTable->type == TSDB_TABLE_TYPE_STREAM_TABLE) { - pCreateTableMsg = mgmtBuildCreateNormalTableMsg((SNormalTableObj *)pTable); - } else {} - - if (pCreateTableMsg != NULL) { - mgmtSendMsgToDnode(pCreateTableMsg, 0, TSDB_MSG_TYPE_TABLE_CFG_RSP); - return TSDB_CODE_SUCCESS; - } else { - mgmtSendSimpleRspToDnode(pConn, TSDB_MSG_TYPE_TABLE_CFG_RSP, TSDB_CODE_INVALID_TABLE); - return TSDB_CODE_INVALID_TABLE; - } + mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0); + mgmtSendVPeersMsg(pVgroup, pCfg->vnode, NULL); } -int mgmtProcessVpeerCfgMsg(int8_t *pCont, int32_t contLen, void *pConn) { -// char * pMsg, *pStart; -// int msgLen = 0; -// SVpeerCfgMsg *pCfg = (SVpeerCfgMsg *)cont; -// SVgObj * pVgroup = NULL; -// -// if (!sdbMaster) { -// mgmtSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VNODE_CFG_RSP, TSDB_CODE_REDIRECT); -// return 0; -// } -// -// int vnode = htonl(pCfg->vnode); -// -// pStart = taosBuildRspMsgToDnode(pObj, TSDB_MSG_TYPE_VNODE_CFG_RSP); -// if (pStart == NULL) { -// mgmtSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VNODE_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); -// return 0; -// } -// pMsg = pStart; -// -// if (vnode < pObj->numOfVnodes) pVgroup = mgmtGetVgroup(pObj->vload[vnode].vgId); -// -// if (pVgroup) { -// *pMsg = 0; -// pMsg++; -// pMsg = mgmtBuildVpeersIe(pMsg, pVgroup, vnode); -// mTrace("dnode:%s, vnode:%d, vgroup:%d, send create vnode msg, code:%d", taosIpStr(pObj->privateIp), vnode, pVgroup->vgId, *pMsg); -// } else { -// mTrace("dnode:%s, vnode:%d, no vgroup info, vgroup:%d", taosIpStr(pObj->privateIp), vnode, pObj->vload[vnode].vgId); -// *pMsg = TSDB_CODE_NOT_ACTIVE_VNODE; -// pMsg++; -// *(int32_t *)pMsg = htonl(vnode); -// pMsg += sizeof(int32_t); -// } -// -// msgLen = pMsg - pStart; -// mgmtSendMsgToDnode(pObj, pStart, msgLen); +static void mgmtProcessCreateTableRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) { + mTrace("create table rsp received, handle:%p code:%d", thandle, code); +} - return 0; +static void mgmtProcessRemoveTableRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) { + mTrace("remove table rsp received, handle:%p code:%d", thandle, code); } -int mgmtProcessCreateRsp(int8_t *pCont, int32_t contLen, void *pConn) { return 0; } - -int mgmtProcessFreeVnodeRsp(int8_t *pCont, int32_t contLen, void *pConn) { return 0; } - -int mgmtProcessVPeersRsp(int8_t *pCont, int32_t contLen, void *pConn) { -// STaosRsp *pRsp = (STaosRsp *)msg; -// -// if (!sdbMaster) { -// mgmtSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_DNODE_VPEERS_RSP, TSDB_CODE_REDIRECT); -// return 0; -// } -// -// SDbObj *pDb = mgmtGetDb(pRsp->more); -// if (!pDb) { -// mError("dnode:%s, db:%s not find, code:%d", taosIpStr(pObj->privateIp), pRsp->more, pRsp->code); -// return 0; -// } -// -// if (pDb->vgStatus != TSDB_VG_STATUS_IN_PROGRESS) { -// mTrace("dnode:%s, db:%s vpeer rsp already disposed, vgroup status:%s code:%d", -// taosIpStr(pObj->privateIp), pRsp->more, taosGetVgroupStatusStr(pDb->vgStatus), pRsp->code); -// return 0; -// } -// -// if (pRsp->code == TSDB_CODE_SUCCESS) { -// pDb->vgStatus = TSDB_VG_STATUS_READY; -// mTrace("dnode:%s, db:%s vgroup is created in dnode", taosIpStr(pObj->privateIp), pRsp->more); -// return 0; -// } -// -// pDb->vgStatus = pRsp->code; -// mError("dnode:%s, db:%s vgroup init failed, code:%d %s", -// taosIpStr(pObj->privateIp), pRsp->more, pRsp->code, taosGetVgroupStatusStr(pDb->vgStatus)); +static void mgmtProcessFreeVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) { + mTrace("free vnode rsp received, handle:%p code:%d", thandle, code); +} - return 0; +static void mgmtProcessVPeersRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) { + mTrace("vpeers rsp received, handle:%p code:%d", thandle, code); } -void mgmtProcessMsgFromDnode(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn) { +void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *handle, int32_t code) { if (msgType == TSDB_MSG_TYPE_TABLE_CFG) { - mgmtProcessMeterCfgMsg(pCont, contLen, pConn); + mgmtProcessTableCfgMsg(msgType, pCont, contLen, handle); } else if (msgType == TSDB_MSG_TYPE_VNODE_CFG) { - mgmtProcessVpeerCfgMsg(pCont, contLen, pConn); + mgmtProcessVnodeCfgMsg(msgType, pCont, contLen, handle); } else if (msgType == TSDB_MSG_TYPE_DNODE_CREATE_TABLE_RSP) { - mgmtProcessCreateRsp(pCont, contLen, pConn); + mgmtProcessCreateTableRsp(msgType, pCont, contLen, handle, code); } else if (msgType == TSDB_MSG_TYPE_DNODE_REMOVE_TABLE_RSP) { - // do nothing + mgmtProcessRemoveTableRsp(msgType, pCont, contLen, handle, code); } else if (msgType == TSDB_MSG_TYPE_DNODE_VPEERS_RSP) { - mgmtProcessVPeersRsp(pCont, contLen, pConn); + mgmtProcessVPeersRsp(msgType, pCont, contLen, handle, code); } else if (msgType == TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP) { - mgmtProcessFreeVnodeRsp(pCont, contLen, pConn); + mgmtProcessFreeVnodeRsp(msgType, pCont, contLen, handle, code); } else if (msgType == TSDB_MSG_TYPE_DNODE_CFG_RSP) { - // do nothing; } else if (msgType == TSDB_MSG_TYPE_ALTER_STREAM_RSP) { - // do nothing; } else { mError("%s from dnode is not processed", taosMsg[msgType]); } } -int32_t mgmtSendCreateTableMsg(SChildTableObj *pTable, SVgObj *pVgroup) { -// uint64_t timeStamp = taosGetTimestampMs(); -// -// for (int32_t index = 0; index < pVgroup->numOfVnodes; ++index) { -// SDnodeObj *pObj = mgmtGetDnode(pVgroup->vnodeGid[index].ip); -// if (pObj == NULL) { -// continue; -// } -// -// int8_t *pStart = taosBuildReqMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE, 64000); -// if (pStart == NULL) { -// continue; -// } -// -// int8_t *pMsg = mgmtBuildCreateChildTableMsg(pTable, pStart, pVgroup->vnodeGid[index].vnode, tagDataLen, pTagData); -// int32_t msgLen = pMsg - pStart; -// -// mgmtSendMsgToDnode(pObj, pStart, msgLen); -// } -// -// pVgroup->lastCreate = timeStamp; - return 0; -} +void mgmtSendCreateTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *handle) { + mTrace("table:%s, sid:%d send create table msg, handle:%p", pTable->tableId, pTable->sid); - -int32_t mgmtSendCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup) { -// uint64_t timeStamp = taosGetTimestampMs(); -// -// for (int32_t index = 0; index < pVgroup->numOfVnodes; ++index) { -// SDnodeObj *pObj = mgmtGetDnode(pVgroup->vnodeGid[index].ip); -// if (pObj == NULL) { -// continue; -// } -// -// int8_t *pStart = taosBuildReqMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE, 64000); -// if (pStart == NULL) { -// continue; -// } -// -// int8_t *pMsg = mgmtBuildCreateNormalTableMsg(pTable, pStart, pVgroup->vnodeGid[index].vnode); -// int32_t msgLen = pMsg - pStart; -// -// mgmtSendMsgToDnode(pObj, pStart, msgLen); -// } -// -// pVgroup->lastCreate = timeStamp; -// return 0; - return 0; -} - -int mgmtSendRemoveMeterMsgToDnode(STableInfo *pTable, SVgObj *pVgroup) { -// SDRemoveTableMsg *pRemove; -// char * pMsg, *pStart; -// int i, msgLen = 0; -// SDnodeObj * pObj; -// char ipstr[20]; -// uint64_t timeStamp; -// -// timeStamp = taosGetTimestampMs(); -// -// for (i = 0; i < pVgroup->numOfVnodes; ++i) { -// //if (pVgroup->vnodeGid[i].ip == 0) continue; -// -// pObj = mgmtGetDnode(pVgroup->vnodeGid[i].ip); -// if (pObj == NULL) continue; -// -// pStart = taosBuildReqMsgToDnode(pObj, TSDB_MSG_TYPE_DNODE_REMOVE_CHILD_TABLE); -// if (pStart == NULL) continue; -// pMsg = pStart; -// -// pRemove = (SDRemoveTableMsg *)pMsg; -// pRemove->vnode = htons(pVgroup->vnodeGid[i].vnode); -// pRemove->sid = htonl(pTable->gid.sid); -// memcpy(pRemove->tableId, pTable->tableId, TSDB_TABLE_ID_LEN); -// -// pMsg += sizeof(SDRemoveTableMsg); -// msgLen = pMsg - pStart; -// -// mgmtSendMsgToDnode(pObj, pStart, msgLen); -// -// tinet_ntoa(ipstr, pVgroup->vnodeGid[i].ip); -// mTrace("dnode:%s vid:%d, send remove meter msg, sid:%d status:%d", ipstr, pVgroup->vnodeGid[i].vnode, -// pTable->gid.sid, pObj->status); -// } -// -// pVgroup->lastRemove = timeStamp; - - return 0; + SDCreateTableMsg *pCreate = mgmtBuildCreateTableMsg(pTable); + if (pCreate != NULL) { + mgmtSendMsgToDnode(TSDB_MSG_TYPE_DNODE_CREATE_TABLE, pCreate, htonl(pCreate->contLen), handle); + } } -int mgmtSendAlterStreamMsgToDnode(void *pTable, SVgObj *pVgroup) { -// SAlterStreamMsg *pAlter; -// char * pMsg, *pStart; -// int i, msgLen = 0; -// SDnodeObj * pObj; -// -// for (i = 0; i < pVgroup->numOfVnodes; ++i) { -// if (pVgroup->vnodeGid[i].ip == 0) continue; -// -// pObj = mgmtGetDnode(pVgroup->vnodeGid[i].ip); -// if (pObj == NULL) continue; -// -// pStart = taosBuildReqMsgToDnode(pObj, TSDB_MSG_TYPE_ALTER_STREAM); -// if (pStart == NULL) continue; -// pMsg = pStart; -// -// pAlter = (SAlterStreamMsg *)pMsg; -// pAlter->vnode = htons(pVgroup->vnodeGid[i].vnode); -// pAlter->sid = htonl(pTable->gid.sid); -// pAlter->uid = pTable->uid; -// pAlter->status = pTable->status; -// -// pMsg += sizeof(SAlterStreamMsg); -// msgLen = pMsg - pStart; -// -// mgmtSendMsgToDnode(pObj, pStart, msgLen); -// } - - return 0; -} +void mgmtSendRemoveTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *handle) { + mTrace("table:%s, sid:%d send remove table msg, handle:%p", pTable->tableId, pTable->sid); -char *mgmtBuildVpeersIe(char *pMsg, SVgObj *pVgroup, int vnode) { - SVPeersMsg *pVPeers = (SVPeersMsg *)pMsg; - SDbObj * pDb; - - pDb = mgmtGetDb(pVgroup->dbName); - pVPeers->vnode = htonl(vnode); - - pVPeers->cfg = pDb->cfg; - SVnodeCfg *pCfg = &pVPeers->cfg; - pCfg->vgId = htonl(pVgroup->vgId); - pCfg->maxSessions = htonl(pCfg->maxSessions); - pCfg->cacheBlockSize = htonl(pCfg->cacheBlockSize); - pCfg->cacheNumOfBlocks.totalBlocks = htonl(pCfg->cacheNumOfBlocks.totalBlocks); - pCfg->daysPerFile = htonl(pCfg->daysPerFile); - pCfg->daysToKeep1 = htonl(pCfg->daysToKeep1); - pCfg->daysToKeep2 = htonl(pCfg->daysToKeep2); - pCfg->daysToKeep = htonl(pCfg->daysToKeep); - pCfg->commitTime = htonl(pCfg->commitTime); - pCfg->blocksPerTable = htons(pCfg->blocksPerTable); - pCfg->replications = (char)pVgroup->numOfVnodes; - pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock); - - SVPeerDesc *vpeerDesc = pVPeers->vpeerDesc; - - pMsg = (char *)(pVPeers->vpeerDesc); - - for (int j = 0; j < pVgroup->numOfVnodes; ++j) { - vpeerDesc[j].ip = htonl(pVgroup->vnodeGid[j].ip); - vpeerDesc[j].vnode = htonl(pVgroup->vnodeGid[j].vnode); - pMsg += sizeof(SVPeerDesc); + SDRemoveTableMsg *pRemove = mgmtBuildRemoveTableMsg(pTable); + if (pRemove != NULL) { + mgmtSendMsgToDnode(TSDB_MSG_TYPE_DNODE_REMOVE_TABLE, pRemove, sizeof(SDRemoveTableMsg), handle); } +} - return pMsg; +void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *handle) { + mTrace("table:%s, sid:%d send alter stream msg, handle:%p", pTable->tableId, pTable->sid); } -int mgmtSendVPeersMsg(SVgObj *pVgroup) { -// SDnodeObj *pDnode; -// char * pMsg, *pStart; -// int msgLen = 0; -// -// for (int i = 0; i < pVgroup->numOfVnodes; ++i) { -// pDnode = mgmtGetDnode(pVgroup->vnodeGid[i].ip); -// if (pDnode == NULL) { -// mError("dnode:%s not there", taosIpStr(pVgroup->vnodeGid[i].ip)); -// continue; -// } -// -// pDnode->vload[pVgroup->vnodeGid[i].vnode].vgId = pVgroup->vgId; -// mgmtUpdateDnode(pDnode); -// -// if (pDnode->thandle && pVgroup->numOfVnodes >= 1) { -// pStart = taosBuildReqMsgToDnode(pDnode, TSDB_MSG_TYPE_DNODE_VPEERS); -// if (pStart == NULL) continue; -// pMsg = mgmtBuildVpeersIe(pStart, pVgroup, pVgroup->vnodeGid[i].vnode); -// msgLen = pMsg - pStart; -// -// mgmtSendMsgToDnode(pDnode, pStart, msgLen); -// } -// } +void mgmtSendVPeersMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *handle) { + mTrace("vgroup:%d, vnode:%d send vpeer msg, handle:%p", pVgroup->vgId, vnode, handle); - return 0; + SVPeersMsg *pVpeer = mgmtBuildVpeersMsg(pVgroup, vnode); + if (pVpeer != NULL) { + mgmtSendMsgToDnode(TSDB_MSG_TYPE_DNODE_VPEERS, pVpeer, sizeof(SVPeersMsg), handle); + } } -int mgmtSendOneFreeVnodeMsg(SVnodeGid *pVnodeGid) { -// SFreeVnodeMsg *pFreeVnode; -// char * pMsg, *pStart; -// int msgLen = 0; -// SDnodeObj * pDnode; -// -// pDnode = mgmtGetDnode(pVnodeGid->ip); -// if (pDnode == NULL) { -// mError("dnode:%s not there", taosIpStr(pVnodeGid->ip)); -// return -1; -// } -// -// if (pDnode->thandle == NULL) { -// mTrace("dnode:%s offline, failed to send Vpeer msg", taosIpStr(pVnodeGid->ip)); -// return -1; -// } -// -// pStart = taosBuildReqMsgToDnode(pDnode, TSDB_MSG_TYPE_DNODE_FREE_VNODE); -// if (pStart == NULL) return -1; -// pMsg = pStart; -// -// pFreeVnode = (SFreeVnodeMsg *)pMsg; -// pFreeVnode->vnode = htons(pVnodeGid->vnode); -// -// pMsg += sizeof(SFreeVnodeMsg); -// -// msgLen = pMsg - pStart; -// mgmtSendMsgToDnode(pDnode, pStart, msgLen); +void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *handle) { + mTrace("vnode:%d send free vnode msg, handle:%p", vnode, handle); - return 0; + SFreeVnodeMsg *pFreeVnode = rpcMallocCont(sizeof(SFreeVnodeMsg)); + if (pFreeVnode != NULL) { + pFreeVnode->vnode = htonl(vnode); + mgmtSendMsgToDnode(TSDB_MSG_TYPE_DNODE_FREE_VNODE, pFreeVnode, sizeof(SFreeVnodeMsg), handle); + } } -int mgmtSendFreeVnodeMsg(SVgObj *pVgroup) { - for (int i = 0; i < pVgroup->numOfVnodes; ++i) { - mgmtSendOneFreeVnodeMsg(pVgroup->vnodeGid + i); +void mgmtSendFreeVnodesMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *handle) { + for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { + SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip); + mgmtSendOneFreeVnodeMsg(pVgroup->vnodeGid + i, &ipSet, handle); } - - return 0; } -int mgmtCfgDynamicOptions(SDnodeObj *pDnode, char *msg) { +int32_t mgmtCfgDynamicOptions(SDnodeObj *pDnode, char *msg) { char *option, *value; - int olen, valen; + int32_t olen, valen; paGetToken(msg, &option, &olen); if (strncasecmp(option, "unremove", 8) == 0) { @@ -445,7 +215,7 @@ int mgmtCfgDynamicOptions(SDnodeObj *pDnode, char *msg) { } else if (strncasecmp(option, "score", 5) == 0) { paGetToken(option + olen + 1, &value, &valen); if (valen > 0) { - int score = atoi(value); + int32_t score = atoi(value); mTrace("dnode:%s, custom score set from:%d to:%d", taosIpStr(pDnode->privateIp), pDnode->customScore, score); pDnode->customScore = score; mgmtUpdateDnode(pDnode); @@ -455,7 +225,7 @@ int mgmtCfgDynamicOptions(SDnodeObj *pDnode, char *msg) { } else if (strncasecmp(option, "bandwidth", 9) == 0) { paGetToken(msg, &value, &valen); if (valen > 0) { - int bandwidthMb = atoi(value); + int32_t bandwidthMb = atoi(value); if (bandwidthMb >= 0 && bandwidthMb < 10000000) { mTrace("dnode:%s, bandwidth(Mb) set from:%d to:%d", taosIpStr(pDnode->privateIp), pDnode->bandwidthMb, bandwidthMb); pDnode->bandwidthMb = bandwidthMb; @@ -469,10 +239,10 @@ int mgmtCfgDynamicOptions(SDnodeObj *pDnode, char *msg) { return -1; } -int mgmtSendCfgDnodeMsg(char *cont) { +int32_t mgmtSendCfgDnodeMsg(char *cont) { #ifdef CLUSTER char * pMsg, *pStart; - int msgLen = 0; + int32_t msgLen = 0; #endif SDnodeObj *pDnode; @@ -487,7 +257,7 @@ int mgmtSendCfgDnodeMsg(char *cont) { } mTrace("dnode:%s, dynamic option received, content:%s", taosIpStr(pDnode->privateIp), pCfg->config); - int code = mgmtCfgDynamicOptions(pDnode, pCfg->config); + int32_t code = mgmtCfgDynamicOptions(pDnode, pCfg->config); if (code != -1) { return code; } @@ -524,7 +294,7 @@ void mgmtProcessDnodeStatusImp(void *handle, void *tmrId) { taosGetSysMemory(&memoryUsedMB); pObj->diskAvailable = tsAvailDataDirGB; - for (int vnode = 0; vnode < pObj->numOfVnodes; ++vnode) { + for (int32_t vnode = 0; vnode < pObj->numOfVnodes; ++vnode) { SVnodeLoad *pVload = &(pObj->vload[vnode]); SVnodeObj * pVnode = vnodeList + vnode; diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index adb14560edb85f07c4a220a16298320223328502..e7211e2530957cc29f7baec570df2ee91be281f8 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -387,7 +387,7 @@ int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) { return TSDB_CODE_OTHERS; } - mgmtSendRemoveMeterMsgToDnode((STableInfo *) pTable, pVgroup); + mgmtSendRemoveTableMsg((STableInfo *) pTable, pVgroup); sdbDeleteRow(tsNormalTableSdb, pTable); diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index b9e80529026121a77a150a69fab57fd55e3b1ac9..3c19753931bf68dc195ce273f4b71c6668e1ce55 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -390,3 +390,31 @@ int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void * return numOfRows; } + +SDCreateTableMsg *mgmtBuildCreateTableMsg(STableInfo *pTable) { + SDCreateTableMsg *pCreate = NULL; + if (pTable->type == TSDB_TABLE_TYPE_NORMAL_TABLE) { + pCreate = mgmtBuildCreateNormalTableMsg((SNormalTableObj *) pTable); + } else if (pTable->type == TSDB_TABLE_TYPE_CHILD_TABLE) { + pCreate = mgmtBuildCreateChildTableMsg((SChildTableObj *) pTable); + } else if (pTable->type == TSDB_TABLE_TYPE_STREAM_TABLE) { + pCreate = mgmtBuildCreateNormalTableMsg((SNormalTableObj *) pTable); + } else { + } + + return pCreate; +} + +SDRemoveTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable) { + SDRemoveTableMsg *pRemove = NULL; + if (pTable->type == TSDB_TABLE_TYPE_NORMAL_TABLE) { + pRemove = mgmtBuildCreateNormalTableMsg((SNormalTableObj *) pTable); + } else if (pTable->type == TSDB_TABLE_TYPE_CHILD_TABLE) { + pRemove = mgmtBuildCreateChildTableMsg((SChildTableObj *) pTable); + } else if (pTable->type == TSDB_TABLE_TYPE_STREAM_TABLE) { + pRemove = mgmtBuildCreateNormalTableMsg((SNormalTableObj *) pTable); + } else { + } + + return pRemove; +} \ No newline at end of file diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 1cef4492476cff3aba03c89855355e8944963841..8f7ea729d114064ec83568fc715fbb8d163e0949 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -234,7 +234,7 @@ int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) { } mTrace("vgroup:%d, db:%s replica:%d is deleted", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); - mgmtSendFreeVnodeMsg(pVgroup); + mgmtSendFreeVnodesMsg(pVgroup); sdbDeleteRow(tsVgroupSdb, pVgroup); return 0; @@ -521,3 +521,63 @@ void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable) { pVgroup->tableList[pTable->sid] = NULL; taosFreeId(pVgroup->idPool, pTable->sid); } + +SVPeersMsg *mgmtBuildVpeersMsg(SVgObj *pVgroup, int32_t vnode) { + SDbObj *pDb = mgmtGetDb(pVgroup->dbName); + if (pDb == NULL) return NULL; + + SVPeersMsg *pVPeers = rpcMallocCont(sizeof(SVPeersMsg)); + if (pVPeers == NULL) return NULL; + + pVPeers->vnode = htonl(vnode); + pVPeers->cfg = pDb->cfg; + + SVnodeCfg *pCfg = &pVPeers->cfg; + pCfg->vgId = htonl(pVgroup->vgId); + pCfg->maxSessions = htonl(pCfg->maxSessions); + pCfg->cacheBlockSize = htonl(pCfg->cacheBlockSize); + pCfg->cacheNumOfBlocks.totalBlocks = htonl(pCfg->cacheNumOfBlocks.totalBlocks); + pCfg->daysPerFile = htonl(pCfg->daysPerFile); + pCfg->daysToKeep1 = htonl(pCfg->daysToKeep1); + pCfg->daysToKeep2 = htonl(pCfg->daysToKeep2); + pCfg->daysToKeep = htonl(pCfg->daysToKeep); + pCfg->commitTime = htonl(pCfg->commitTime); + pCfg->blocksPerTable = htons(pCfg->blocksPerTable); + pCfg->replications = (char) pVgroup->numOfVnodes; + pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock); + + SVPeerDesc *vpeerDesc = pVPeers->vpeerDesc; + for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) { + vpeerDesc[j].ip = htonl(pVgroup->vnodeGid[j].ip); + vpeerDesc[j].vnode = htonl(pVgroup->vnodeGid[j].vnode); + } + + return pVPeers; +} + +SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode) { + if (vnode < 0 || vnode >= TSDB_MAX_VNODES) { + return NULL; + } + + SDnodeObj *pDnode = mgmtGetDnode(dnode); + if (pDnode == NULL) { + return NULL; + } + + int32_t vgId = pDnode->vload[vnode].vgId; + return mgmtGetVgroup(vgId); +} + +SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup) { + SRpcIpSet ipSet = {.numOfIps = pVgroup->numOfVnodes, .inUse = 0, .port = tsMgmtDnodePort + 1}; + for (int i = 0; i < pVgroup->numOfVnodes; ++i) { + ipSet.ip[i] = pVgroup->vnodeGid[i].ip; + } + return ipSet; +} + +SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip) { + SRpcIpSet ipSet = {.ip = ip, .numOfIps = 1, .inUse = 0, .port = tsMgmtDnodePort + 1}; + return ipSet; +} \ No newline at end of file