From aa84e61e58626c912e17fdd7099f967fbf46db1f Mon Sep 17 00:00:00 2001 From: slguan Date: Thu, 27 Feb 2020 17:04:04 +0800 Subject: [PATCH] communication from dnode to mgmt --- src/dnode/inc/dnodeMgmt.h | 2 +- src/dnode/src/dnodeMgmt.c | 13 ++-- src/inc/dnode.h | 2 +- src/mnode/inc/mgmtDnodeInt.h | 13 ++-- src/mnode/inc/mgmtProfile.h | 17 +++++ src/mnode/inc/mgmtTable.h | 2 +- src/mnode/src/mgmtChildTable.c | 8 +-- src/mnode/src/mgmtDb.c | 4 +- src/mnode/src/mgmtDnodeInt.c | 114 ++++++++++++++++++++++++-------- src/mnode/src/mgmtNormalTable.c | 8 +-- src/mnode/src/mgmtShell.c | 4 +- src/mnode/src/mgmtTable.c | 28 ++++++-- src/mnode/src/mgmtVgroup.c | 6 +- src/util/inc/tsched.h | 2 +- src/util/inc/tsystem.h | 2 +- 15 files changed, 155 insertions(+), 70 deletions(-) diff --git a/src/dnode/inc/dnodeMgmt.h b/src/dnode/inc/dnodeMgmt.h index aae3bb3118..5158b12ae2 100644 --- a/src/dnode/inc/dnodeMgmt.h +++ b/src/dnode/inc/dnodeMgmt.h @@ -26,7 +26,7 @@ extern "C" { int32_t dnodeInitMgmt(); void dnodeInitMgmtIp(); -void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int contLen, void *pConn, int32_t code); +void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code); void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen); void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen); diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 3bfaaa7305..d26574f7cb 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -43,11 +43,11 @@ static void dnodeSendMsgToMnodeQueueFp(SSchedMsg *sched) { void *handle = sched->ahandle; int8_t *pCont = sched->msg; - mgmtProcessMsgFromDnode(pCont, contLen, handle, code); + mgmtProcessMsgFromDnode(msgType, pCont, contLen, handle, code); rpcFreeCont(sched->msg); } -void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen, void *ahandle) { +void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen) { dTrace("msg:%s is sent to mnode", taosMsg[msgType]); if (dnodeSendMsgToMnodeFp) { dnodeSendMsgToMnodeFp(msgType, pCont, contLen); @@ -55,7 +55,6 @@ void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen, void *aha SSchedMsg schedMsg = {0}; schedMsg.fp = dnodeSendMsgToMnodeQueueFp; schedMsg.msg = pCont; - schedMsg.ahandle = ahandle; *(int32_t *) (pCont - 4) = contLen; *(int32_t *) (pCont - 8) = TSDB_CODE_SUCCESS; *(int8_t *) (pCont - 9) = msgType; @@ -69,7 +68,7 @@ void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont, dnodeSendRspToMnodeFp(pConn, code, pCont, contLen); } else { SSchedMsg schedMsg = {0}; - schedMsg.fp = dnodeSendMsgToMnodeFp; + schedMsg.fp = dnodeSendMsgToMnodeQueueFp; schedMsg.msg = pCont; *(int32_t *) (pCont - 4) = contLen; *(int32_t *) (pCont - 8) = code; @@ -93,7 +92,7 @@ void dnodeInitMgmtIp() { } } -void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int contLen, void *handle, int32_t code) { +void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) { if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) { dError("invalid msg type:%d", msgType); } else { @@ -221,7 +220,7 @@ void dnodeSendVpeerCfgMsg(int32_t vnode) { } cfg->vnode = htonl(vnode); - dnodeSendMsgToMnode((int8_t*)cfg, sizeof(SVpeerCfgMsg), TSDB_MSG_TYPE_VNODE_CFG); + dnodeSendMsgToMnode(TSDB_MSG_TYPE_VNODE_CFG, cfg, sizeof(SVpeerCfgMsg)); } void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) { @@ -231,7 +230,7 @@ void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) { } cfg->vnode = htonl(vnode); - dnodeSendMsgToMnode((int8_t*)cfg, sizeof(STableCfgMsg), TSDB_MSG_TYPE_TABLE_CFG); + dnodeSendMsgToMnode(TSDB_MSG_TYPE_TABLE_CFG, cfg, sizeof(STableCfgMsg)); } void dnodeInitProcessShellMsg() { diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 894ef245fa..45ff014dc5 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -49,7 +49,7 @@ extern int32_t (*dnodeCheckSystem)(); // dnodeSystem extern void *tsDnodeMgmtQhandle; -void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int contLen, void *pConn, int32_t code); +void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code); // dnodeModule extern void (*dnodeStartModules)(); diff --git a/src/mnode/inc/mgmtDnodeInt.h b/src/mnode/inc/mgmtDnodeInt.h index 19a7c7b034..2e0ff6ab4c 100644 --- a/src/mnode/inc/mgmtDnodeInt.h +++ b/src/mnode/inc/mgmtDnodeInt.h @@ -26,18 +26,19 @@ extern "C" { extern void *mgmtStatusTimer; -void mgmtSendCreateTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *handle); -void mgmtSendRemoveTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *handle); -void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *handle); -void mgmtSendVPeersMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *handle); -void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *handle); +void mgmtSendCreateTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle); +void mgmtSendRemoveTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle); +void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle); +void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle); +void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle); +void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle); extern int32_t (*mgmtInitDnodeInt)(); extern void (*mgmtCleanUpDnodeInt)(); extern void (*mgmtProcessDnodeStatus)(void *handle, void *tmrId); -void mgmtSendMsgToDnode(int8_t msgType, void *pCont, int32_t contLen, void *ahandle); +void mgmtSendMsgToDnode(SRpcIpSet *ipSet, int8_t msgType, void *pCont, int32_t contLen, void *ahandle); void mgmtSendRspToDnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen); #ifdef __cplusplus diff --git a/src/mnode/inc/mgmtProfile.h b/src/mnode/inc/mgmtProfile.h index 65d2a02cd6..94e4f44bcc 100644 --- a/src/mnode/inc/mgmtProfile.h +++ b/src/mnode/inc/mgmtProfile.h @@ -38,6 +38,23 @@ int32_t mgmtKillStream(char *qidstr, void *pConn); int32_t mgmtKillConnection(char *qidstr, void *pConn); +enum { + TSDB_PROCESS_CREATE_TABLE, + TSDB_PROCESS_CREATE_VGROUP, + TSDB_PROCESS_CREATE_VGROUP_AND_TABLE, + TSDB_PROCESS_CREATE_VNODE, + TSDB_PROCESS_TABLE_CFG, +}; + +typedef struct { + void *thandle; // come from uplayer + void *ahandle; // object to process + void *cont; // additional information of object to process + int32_t type; // the type of sync process + int32_t received; // num of received, such as numOfVnodes + int32_t contLen; // the length of additional information +} SProcessInfo; + #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtTable.h b/src/mnode/inc/mgmtTable.h index 50e6991fed..c3c003fe0f 100644 --- a/src/mnode/inc/mgmtTable.h +++ b/src/mnode/inc/mgmtTable.h @@ -31,7 +31,7 @@ STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid); int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, bool usePublicIp); int32_t mgmtRetrieveMetricMeta(void *pConn, char **pStart, SSuperTableMetaMsg *pInfo); -int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate); +int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate, void *thandle); int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore); int32_t mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter); int32_t mgmtGetShowTableMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index 091243db27..9f2acd7792 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -31,6 +31,7 @@ #include "mgmtDb.h" #include "mgmtDnodeInt.h" #include "mgmtGrant.h" +#include "mgmtProfile.h" #include "mgmtSuperTable.h" #include "mgmtTable.h" #include "mgmtVgroup.h" @@ -339,19 +340,16 @@ int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgr mError("table:%s, corresponding super table schema is null", pCreate->tableId); return TSDB_CODE_INVALID_TABLE; } - // memcpy(schema, pTagData + TSDB_TABLE_ID_LEN + 1, size); if (sdbInsertRow(tsChildTableSdb, pTable, 0) < 0) { mError("table:%s, update sdb error", pCreate->tableId); return TSDB_CODE_SDB_ERROR; } - mgmtSendCreateTableMsg(pTable, pVgroup); - mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" PRIu64 " db:%s", pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name); - return 0; + return TSDB_CODE_SUCCESS; } int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) { @@ -360,7 +358,7 @@ int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) { return TSDB_CODE_OTHERS; } - mgmtSendRemoveTableMsg((STableInfo *) pTable, pVgroup); +// mgmtSendRemoveTableMsg((STableInfo *) pTable, pVgroup); sdbDeleteRow(tsChildTableSdb, pTable); diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index 7e9613d5b3..3ce254d6c6 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -295,7 +295,7 @@ int32_t mgmtSetDbDropping(SDbObj *pDb) { } } } - mgmtSendFreeVnodesMsg(pVgroup); +// mgmtSendFreeVnodesMsg(pVgroup); pVgroup = pVgroup->next; } @@ -465,7 +465,7 @@ int32_t mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) { //rebuild meterList in mgmtVgroup.c mgmtUpdateVgroup(pVgroup); } - mgmtSendVPeersMsg(pVgroup); +// mgmtSendCreateVnodeMsg(pVgroup); pVgroup = pVgroup->next; } mgmtStartBalanceTimer(10); diff --git a/src/mnode/src/mgmtDnodeInt.c b/src/mnode/src/mgmtDnodeInt.c index d4294e64a4..c051af171e 100644 --- a/src/mnode/src/mgmtDnodeInt.c +++ b/src/mnode/src/mgmtDnodeInt.c @@ -26,10 +26,11 @@ #include "mgmtDb.h" #include "mgmtDnode.h" #include "mgmtDnodeInt.h" +#include "mgmtProfile.h" #include "mgmtTable.h" #include "mgmtVgroup.h" -void (*mgmtSendMsgToDnodeFp)(int8_t msgType, void *pCont, int32_t contLen, void *ahandle) = NULL; +void (*mgmtSendMsgToDnodeFp)(SRpcIpSet *ipSet, int8_t msgType, void *pCont, int32_t contLen, void *ahandle) = NULL; void (*mgmtSendRspToDnodeFp)(void *handle, int32_t code, void *pCont, int32_t contLen) = NULL; void *mgmtStatusTimer = NULL; @@ -44,10 +45,10 @@ static void mgmtSendMsgToDnodeQueueFp(SSchedMsg *sched) { rpcFreeCont(sched->msg); } -void mgmtSendMsgToDnode(int8_t msgType, void *pCont, int32_t contLen, void *ahandle) { +void mgmtSendMsgToDnode(SRpcIpSet *ipSet, int8_t msgType, void *pCont, int32_t contLen, void *ahandle) { mTrace("msg:%s is sent to dnode", taosMsg[msgType]); if (mgmtSendMsgToDnodeFp) { - mgmtSendMsgToDnodeFp(msgType, pCont, contLen, ahandle); + mgmtSendMsgToDnodeFp(ipSet, msgType, pCont, contLen, ahandle); } else { SSchedMsg schedMsg = {0}; schedMsg.fp = mgmtSendMsgToDnodeQueueFp; @@ -75,7 +76,7 @@ void mgmtSendRspToDnode(void *pConn, int8_t msgType, int32_t code, void *pCont, } } -static void mgmtProcessTableCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *pConn) { +static void mgmtProcessTableCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle) { STableCfgMsg *pCfg = (STableCfgMsg *) pCont; pCfg->dnode = htonl(pCfg->dnode); pCfg->vnode = htonl(pCfg->vnode); @@ -84,19 +85,21 @@ static void mgmtProcessTableCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLe if (!sdbMaster) { mError("dnode:%s, vnode:%d, sid:%d, not master, redirect it", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid); - mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_REDIRECT, NULL, 0); + mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_REDIRECT, NULL, 0); return; } STableInfo *pTable = mgmtGetTableByPos(pCfg->dnode, pCfg->vnode, pCfg->sid); if (pTable == NULL) { mError("dnode:%s, vnode:%d, sid:%d, table not found", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid); - mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_INVALID_TABLE, NULL, 0); + mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_INVALID_TABLE, NULL, 0); return; } - mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0); - mgmtSendCreateTableMsg(pTable, NULL); + mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0); + + SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode); + mgmtSendCreateTableMsg(pTable, &ipSet, NULL); } static void mgmtProcessVnodeCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *pConn) { @@ -117,11 +120,22 @@ static void mgmtProcessVnodeCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLe } mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0); - mgmtSendVPeersMsg(pVgroup, pCfg->vnode, NULL); + + SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode); + mgmtSendCreateVnodeMsg(pVgroup, pCfg->vnode, &ipSet, NULL); } static void mgmtProcessCreateTableRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) { mTrace("create table rsp received, handle:%p code:%d", thandle, code); + if (thandle == NULL) return; + + SProcessInfo *info = thandle; + if (info->type == TSDB_PROCESS_CREATE_TABLE) { + rpcSendResponse(info->thandle, code, NULL, 0); + } else { + mError("create table rsp received, handle:%p code:%d, invalid type:%d", info->type); + rpcSendResponse(info->thandle, TSDB_CODE_INVALID_MSG, NULL, 0); + } } static void mgmtProcessRemoveTableRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) { @@ -132,8 +146,59 @@ static void mgmtProcessFreeVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contL mTrace("free vnode rsp received, handle:%p code:%d", thandle, code); } -static void mgmtProcessVPeersRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) { - mTrace("vpeers rsp received, handle:%p code:%d", thandle, code); +static void mgmtProcessCreateVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *ahandle, int32_t code) { + mTrace("create vnode received, vgroup_info:%p code:%d", ahandle, code); + if (ahandle == NULL) { + rpcFreeCont(pCont); + return; + } + + SProcessInfo *vgroup_info = ahandle; + assert(vgroup_info->type == TSDB_PROCESS_CREATE_VGROUP); + + vgroup_info->received++; + SVgObj *pVgroup = vgroup_info->ahandle; + mTrace("vgroup:%d, received:%d numOfVnodes:%d table_info:%p", + pVgroup->vgId, vgroup_info->received, pVgroup->numOfVnodes, vgroup_info->thandle); + if (vgroup_info->received == pVgroup->numOfVnodes) { + SProcessInfo *table_info = vgroup_info->thandle; + assert(table_info->type == TSDB_PROCESS_CREATE_VGROUP_AND_TABLE); + + STableInfo *pTable = table_info->ahandle; + SProcessInfo *info = calloc(1, sizeof(SProcessInfo)); + info->type = TSDB_PROCESS_CREATE_TABLE; + info->thandle = table_info->thandle; + info->ahandle = pTable; + SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); + mgmtSendCreateTableMsg(pTable, &ipSet, info); + + free(vgroup_info); + free(table_info); + } + + rpcFreeCont(pCont); +} + +void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *table_info) { + SProcessInfo *vgroup_info = calloc(1, sizeof(SProcessInfo)); + vgroup_info->type = TSDB_PROCESS_CREATE_VGROUP; + vgroup_info->thandle = table_info; + vgroup_info->ahandle = pVgroup; + + mTrace("vgroup:%d, send create all vnodes msg, table_info:%p vgroup_info:%p ", pVgroup->vgId, table_info, vgroup_info); + for (int i = 0; i < pVgroup->numOfVnodes; ++i) { + SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip); + mgmtSendCreateVnodeMsg(pVgroup, pVgroup->vnodeGid[i].vnode, &ipSet, vgroup_info); + } +} + +void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *vgroup_info) { + mTrace("vgroup:%d, send create vnode:%d msg, vgroup_info:%p", pVgroup->vgId, vnode, vgroup_info); + + SVPeersMsg *pVpeer = mgmtBuildVpeersMsg(pVgroup, vnode); + if (pVpeer != NULL) { + mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_VPEERS, pVpeer, sizeof(SVPeersMsg), vgroup_info); + } } void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *handle, int32_t code) { @@ -146,61 +211,52 @@ void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *h } else if (msgType == TSDB_MSG_TYPE_DNODE_REMOVE_TABLE_RSP) { mgmtProcessRemoveTableRsp(msgType, pCont, contLen, handle, code); } else if (msgType == TSDB_MSG_TYPE_DNODE_VPEERS_RSP) { - mgmtProcessVPeersRsp(msgType, pCont, contLen, handle, code); + mgmtProcessCreateVnodeRsp(msgType, pCont, contLen, handle, code); } else if (msgType == TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP) { mgmtProcessFreeVnodeRsp(msgType, pCont, contLen, handle, code); } else if (msgType == TSDB_MSG_TYPE_DNODE_CFG_RSP) { } else if (msgType == TSDB_MSG_TYPE_ALTER_STREAM_RSP) { } else { - mError("%s from dnode is not processed", taosMsg[msgType]); + mError("%s from dnode is not processed", taosMsg[(int8_t)msgType]); } } -void mgmtSendCreateTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *handle) { +void mgmtSendCreateTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) { mTrace("table:%s, sid:%d send create table msg, handle:%p", pTable->tableId, pTable->sid); SDCreateTableMsg *pCreate = mgmtBuildCreateTableMsg(pTable); if (pCreate != NULL) { - mgmtSendMsgToDnode(TSDB_MSG_TYPE_DNODE_CREATE_TABLE, pCreate, htonl(pCreate->contLen), handle); + mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_CREATE_TABLE, pCreate, htonl(pCreate->contLen), ahandle); } } -void mgmtSendRemoveTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *handle) { +void mgmtSendRemoveTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) { mTrace("table:%s, sid:%d send remove table msg, handle:%p", pTable->tableId, pTable->sid); SDRemoveTableMsg *pRemove = mgmtBuildRemoveTableMsg(pTable); if (pRemove != NULL) { - mgmtSendMsgToDnode(TSDB_MSG_TYPE_DNODE_REMOVE_TABLE, pRemove, sizeof(SDRemoveTableMsg), handle); + mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_REMOVE_TABLE, pRemove, sizeof(SDRemoveTableMsg), ahandle); } } -void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *handle) { +void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) { mTrace("table:%s, sid:%d send alter stream msg, handle:%p", pTable->tableId, pTable->sid); } -void mgmtSendVPeersMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *handle) { - mTrace("vgroup:%d, vnode:%d send vpeer msg, handle:%p", pVgroup->vgId, vnode, handle); - - SVPeersMsg *pVpeer = mgmtBuildVpeersMsg(pVgroup, vnode); - if (pVpeer != NULL) { - mgmtSendMsgToDnode(TSDB_MSG_TYPE_DNODE_VPEERS, pVpeer, sizeof(SVPeersMsg), handle); - } -} - void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *handle) { mTrace("vnode:%d send free vnode msg, handle:%p", vnode, handle); SFreeVnodeMsg *pFreeVnode = rpcMallocCont(sizeof(SFreeVnodeMsg)); if (pFreeVnode != NULL) { pFreeVnode->vnode = htonl(vnode); - mgmtSendMsgToDnode(TSDB_MSG_TYPE_DNODE_FREE_VNODE, pFreeVnode, sizeof(SFreeVnodeMsg), handle); + mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_FREE_VNODE, pFreeVnode, sizeof(SFreeVnodeMsg), handle); } } void mgmtSendFreeVnodesMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *handle) { for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip); - mgmtSendOneFreeVnodeMsg(pVgroup->vnodeGid + i, &ipSet, handle); + mgmtSendOneFreeVnodeMsg(pVgroup->vnodeGid[i].vnode, &ipSet, handle); } } diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index e7211e2530..a9ab627298 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -273,7 +273,7 @@ int32_t mgmtInitNormalTables() { continue; } - mgmtAddTableIntoVgroup(pVgroup, pTable); + mgmtAddTableIntoVgroup(pVgroup, (STableInfo *)pTable); //pVgroup->tableList[pTable->sid] = (STableInfo*)pTable; taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid, 1); @@ -373,12 +373,10 @@ int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVg return TSDB_CODE_SDB_ERROR; } - mgmtSendCreateNormalTableMsg(pTable, pVgroup); - mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" PRIu64 " db:%s", pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name); - return 0; + return TSDB_CODE_SUCCESS; } int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) { @@ -387,7 +385,7 @@ int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) { return TSDB_CODE_OTHERS; } - mgmtSendRemoveTableMsg((STableInfo *) pTable, pVgroup); +// mgmtSendRemoveTableMsg((STableInfo *) pTable, pVgroup); sdbDeleteRow(tsNormalTableSdb, pTable); diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index a2a195d0c6..8fbf2241b6 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -155,7 +155,7 @@ int32_t mgmtProcessTableMetaMsg(void *pCont, int32_t contLen, void *ahandle) { memcpy(pCreateMsg->schema, pInfo->tags, sizeof(STagData)); strcpy(pCreateMsg->tableId, pInfo->tableId); - int32_t code = mgmtCreateTable(pDb, pCreateMsg); + int32_t code = mgmtCreateTable(pDb, pCreateMsg, NULL); char stableName[TSDB_TABLE_ID_LEN] = {0}; strncpy(stableName, pInfo->tags, TSDB_TABLE_ID_LEN); @@ -803,7 +803,7 @@ int32_t mgmtProcessCreateTableMsg(void *pCont, int32_t contLen, void *ahandle) { SDbObj *pDb = mgmtGetDb(pCreate->db); if (pDb) { - code = mgmtCreateTable(pDb, pCreate); + code = mgmtCreateTable(pDb, pCreate, ahandle); } else { code = TSDB_CODE_DB_NOT_SELECTED; } diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 3c19753931..6472c8737d 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -33,6 +33,7 @@ #include "mgmtDnodeInt.h" #include "mgmtGrant.h" #include "mgmtNormalTable.h" +#include "mgmtProfile.h" #include "mgmtSuperTable.h" #include "mgmtTable.h" #include "mgmtUser.h" @@ -110,7 +111,7 @@ int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, boo return TSDB_CODE_SUCCESS; } -int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate) { +int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate, void *thandle) { STableInfo *pTable = mgmtGetTable(pCreate->tableId); if (pTable != NULL) { if (pCreate->igExists) { @@ -143,13 +144,30 @@ int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate) { int32_t sid; SVgObj *pVgroup = mgmtGetAvailVgroup(pDb, &sid); if (pVgroup == NULL) { - // process it in a callback function - return TSDB_CODE_ACTION_IN_PROGRESS; + code = mgmtCreateVgroup(pDb); + if (code == TSDB_CODE_SUCCESS) { + SProcessInfo *info = calloc(1, sizeof(SProcessInfo)); + info->type = TSDB_PROCESS_CREATE_VGROUP_AND_TABLE; + info->ahandle = pTable; + info->thandle = thandle; + SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); + mgmtSendCreateVgroupMsg(pVgroup, &ipSet, info); + return TSDB_CODE_ACTION_IN_PROGRESS; + } } else { if (pCreate->numOfColumns == 0) { - return mgmtCreateChildTable(pDb, pCreate, pVgroup, sid); + code = mgmtCreateChildTable(pDb, pCreate, pVgroup, sid); } else { - return mgmtCreateNormalTable(pDb, pCreate, pVgroup, sid); + code = mgmtCreateNormalTable(pDb, pCreate, pVgroup, sid); + } + if (code == TSDB_CODE_SUCCESS) { + SProcessInfo *info = calloc(1, sizeof(SProcessInfo)); + info->type = TSDB_PROCESS_CREATE_TABLE; + info->ahandle = pTable; + info->thandle = thandle; + SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); + mgmtSendCreateTableMsg(pTable, &ipSet, info); + return TSDB_CODE_ACTION_IN_PROGRESS; } } } else { diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 8f7ea729d1..79792ee20e 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -216,8 +216,6 @@ SVgObj *mgmtCreateVgroup(SDbObj *pDb) { for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) mTrace("vgroup:%d, dnode:%s vnode:%d is created", pVgroup->vgId, taosIpStr(pVgroup->vnodeGid[i].ip), pVgroup->vnodeGid[i].vnode); - mgmtSendVPeersMsg(pVgroup); - return pVgroup; } @@ -234,7 +232,7 @@ int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) { } mTrace("vgroup:%d, db:%s replica:%d is deleted", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); - mgmtSendFreeVnodesMsg(pVgroup); +// mgmtSendFreeVnodesMsg(pVgroup); sdbDeleteRow(tsVgroupSdb, pVgroup); return 0; @@ -578,6 +576,6 @@ SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup) { } SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip) { - SRpcIpSet ipSet = {.ip = ip, .numOfIps = 1, .inUse = 0, .port = tsMgmtDnodePort + 1}; + SRpcIpSet ipSet = {.ip[0] = ip, .numOfIps = 1, .inUse = 0, .port = tsMgmtDnodePort + 1}; return ipSet; } \ No newline at end of file diff --git a/src/util/inc/tsched.h b/src/util/inc/tsched.h index b46a0c455a..c9ce6b388f 100644 --- a/src/util/inc/tsched.h +++ b/src/util/inc/tsched.h @@ -25,7 +25,7 @@ typedef struct _sched_msg { void (*tfp)(void *, void *); - int8_t *msg; + void *msg; void *ahandle; void *thandle; } SSchedMsg; diff --git a/src/util/inc/tsystem.h b/src/util/inc/tsystem.h index 9261261e1b..3ffa231296 100644 --- a/src/util/inc/tsystem.h +++ b/src/util/inc/tsystem.h @@ -23,7 +23,7 @@ extern "C" { #include #include -extern char dataDir[TSDB_FILENAME_LEN]; +extern char dataDir[]; bool taosGetSysMemory(float *memoryUsedMB); -- GitLab