diff --git a/src/dnode/inc/dnodeMgmt.h b/src/dnode/inc/dnodeMgmt.h index aae3bb31186d67d1bbafc36fd3336b68d37699e7..5158b12ae295be94c71b0936300cdd9de76dff38 100644 --- a/src/dnode/inc/dnodeMgmt.h +++ b/src/dnode/inc/dnodeMgmt.h @@ -26,7 +26,7 @@ extern "C" { int32_t dnodeInitMgmt(); void dnodeInitMgmtIp(); -void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int contLen, void *pConn, int32_t code); +void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code); void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen); void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen); diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 3bfaaa7305ec2fb30c7c069f8942cc5f4677ef7e..d26574f7cb3e7bca5205259127852e7f89d2f38f 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -43,11 +43,11 @@ static void dnodeSendMsgToMnodeQueueFp(SSchedMsg *sched) { void *handle = sched->ahandle; int8_t *pCont = sched->msg; - mgmtProcessMsgFromDnode(pCont, contLen, handle, code); + mgmtProcessMsgFromDnode(msgType, pCont, contLen, handle, code); rpcFreeCont(sched->msg); } -void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen, void *ahandle) { +void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen) { dTrace("msg:%s is sent to mnode", taosMsg[msgType]); if (dnodeSendMsgToMnodeFp) { dnodeSendMsgToMnodeFp(msgType, pCont, contLen); @@ -55,7 +55,6 @@ void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen, void *aha SSchedMsg schedMsg = {0}; schedMsg.fp = dnodeSendMsgToMnodeQueueFp; schedMsg.msg = pCont; - schedMsg.ahandle = ahandle; *(int32_t *) (pCont - 4) = contLen; *(int32_t *) (pCont - 8) = TSDB_CODE_SUCCESS; *(int8_t *) (pCont - 9) = msgType; @@ -69,7 +68,7 @@ void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont, dnodeSendRspToMnodeFp(pConn, code, pCont, contLen); } else { SSchedMsg schedMsg = {0}; - schedMsg.fp = dnodeSendMsgToMnodeFp; + schedMsg.fp = dnodeSendMsgToMnodeQueueFp; schedMsg.msg = pCont; *(int32_t *) (pCont - 4) = contLen; *(int32_t *) (pCont - 8) = code; @@ -93,7 +92,7 @@ void dnodeInitMgmtIp() { } } -void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int contLen, void *handle, int32_t code) { +void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) { if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) { dError("invalid msg type:%d", msgType); } else { @@ -221,7 +220,7 @@ void dnodeSendVpeerCfgMsg(int32_t vnode) { } cfg->vnode = htonl(vnode); - dnodeSendMsgToMnode((int8_t*)cfg, sizeof(SVpeerCfgMsg), TSDB_MSG_TYPE_VNODE_CFG); + dnodeSendMsgToMnode(TSDB_MSG_TYPE_VNODE_CFG, cfg, sizeof(SVpeerCfgMsg)); } void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) { @@ -231,7 +230,7 @@ void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) { } cfg->vnode = htonl(vnode); - dnodeSendMsgToMnode((int8_t*)cfg, sizeof(STableCfgMsg), TSDB_MSG_TYPE_TABLE_CFG); + dnodeSendMsgToMnode(TSDB_MSG_TYPE_TABLE_CFG, cfg, sizeof(STableCfgMsg)); } void dnodeInitProcessShellMsg() { diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 894ef245fad099b21eb12e083dbeddfe3c8befd9..45ff014dc51fa92d1abfb335751d3c44f61246a0 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -49,7 +49,7 @@ extern int32_t (*dnodeCheckSystem)(); // dnodeSystem extern void *tsDnodeMgmtQhandle; -void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int contLen, void *pConn, int32_t code); +void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code); // dnodeModule extern void (*dnodeStartModules)(); diff --git a/src/mnode/inc/mgmtDnodeInt.h b/src/mnode/inc/mgmtDnodeInt.h index 19a7c7b034e082e16ce166c296a2afdc6c7dd7ce..2e0ff6ab4c21a9bd3dc60700c568e33f4d5f4195 100644 --- a/src/mnode/inc/mgmtDnodeInt.h +++ b/src/mnode/inc/mgmtDnodeInt.h @@ -26,18 +26,19 @@ extern "C" { extern void *mgmtStatusTimer; -void mgmtSendCreateTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *handle); -void mgmtSendRemoveTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *handle); -void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *handle); -void mgmtSendVPeersMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *handle); -void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *handle); +void mgmtSendCreateTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle); +void mgmtSendRemoveTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle); +void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle); +void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle); +void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle); +void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle); extern int32_t (*mgmtInitDnodeInt)(); extern void (*mgmtCleanUpDnodeInt)(); extern void (*mgmtProcessDnodeStatus)(void *handle, void *tmrId); -void mgmtSendMsgToDnode(int8_t msgType, void *pCont, int32_t contLen, void *ahandle); +void mgmtSendMsgToDnode(SRpcIpSet *ipSet, int8_t msgType, void *pCont, int32_t contLen, void *ahandle); void mgmtSendRspToDnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen); #ifdef __cplusplus diff --git a/src/mnode/inc/mgmtProfile.h b/src/mnode/inc/mgmtProfile.h index 65d2a02cd6f4bd6600e90b009712858eca3bbc70..94e4f44bcc1a86658fe02100849af012795eb974 100644 --- a/src/mnode/inc/mgmtProfile.h +++ b/src/mnode/inc/mgmtProfile.h @@ -38,6 +38,23 @@ int32_t mgmtKillStream(char *qidstr, void *pConn); int32_t mgmtKillConnection(char *qidstr, void *pConn); +enum { + TSDB_PROCESS_CREATE_TABLE, + TSDB_PROCESS_CREATE_VGROUP, + TSDB_PROCESS_CREATE_VGROUP_AND_TABLE, + TSDB_PROCESS_CREATE_VNODE, + TSDB_PROCESS_TABLE_CFG, +}; + +typedef struct { + void *thandle; // come from uplayer + void *ahandle; // object to process + void *cont; // additional information of object to process + int32_t type; // the type of sync process + int32_t received; // num of received, such as numOfVnodes + int32_t contLen; // the length of additional information +} SProcessInfo; + #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtTable.h b/src/mnode/inc/mgmtTable.h index 50e6991fed8b474beaa4af3973eacf3f7be3ea87..c3c003fe0fa18891caa6a60b6a26527812760096 100644 --- a/src/mnode/inc/mgmtTable.h +++ b/src/mnode/inc/mgmtTable.h @@ -31,7 +31,7 @@ STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid); int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, bool usePublicIp); int32_t mgmtRetrieveMetricMeta(void *pConn, char **pStart, SSuperTableMetaMsg *pInfo); -int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate); +int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate, void *thandle); int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore); int32_t mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter); int32_t mgmtGetShowTableMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index 091243db278b89a7578573ed370e7539e7ceb570..9f2acd779228bc8edeeec2fa687469eb4ef4d383 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -31,6 +31,7 @@ #include "mgmtDb.h" #include "mgmtDnodeInt.h" #include "mgmtGrant.h" +#include "mgmtProfile.h" #include "mgmtSuperTable.h" #include "mgmtTable.h" #include "mgmtVgroup.h" @@ -339,19 +340,16 @@ int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgr mError("table:%s, corresponding super table schema is null", pCreate->tableId); return TSDB_CODE_INVALID_TABLE; } - // memcpy(schema, pTagData + TSDB_TABLE_ID_LEN + 1, size); if (sdbInsertRow(tsChildTableSdb, pTable, 0) < 0) { mError("table:%s, update sdb error", pCreate->tableId); return TSDB_CODE_SDB_ERROR; } - mgmtSendCreateTableMsg(pTable, pVgroup); - mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" PRIu64 " db:%s", pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name); - return 0; + return TSDB_CODE_SUCCESS; } int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) { @@ -360,7 +358,7 @@ int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) { return TSDB_CODE_OTHERS; } - mgmtSendRemoveTableMsg((STableInfo *) pTable, pVgroup); +// mgmtSendRemoveTableMsg((STableInfo *) pTable, pVgroup); sdbDeleteRow(tsChildTableSdb, pTable); diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index 7e9613d5b3261c578144d629f98b2e6dedbaa8d1..3ce254d6c6900536584c30a97c4b6494853ab82e 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -295,7 +295,7 @@ int32_t mgmtSetDbDropping(SDbObj *pDb) { } } } - mgmtSendFreeVnodesMsg(pVgroup); +// mgmtSendFreeVnodesMsg(pVgroup); pVgroup = pVgroup->next; } @@ -465,7 +465,7 @@ int32_t mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) { //rebuild meterList in mgmtVgroup.c mgmtUpdateVgroup(pVgroup); } - mgmtSendVPeersMsg(pVgroup); +// mgmtSendCreateVnodeMsg(pVgroup); pVgroup = pVgroup->next; } mgmtStartBalanceTimer(10); diff --git a/src/mnode/src/mgmtDnodeInt.c b/src/mnode/src/mgmtDnodeInt.c index d4294e64a43169622b17c7a848b17840174c87bc..c051af171e16cf0169df437f58906bfb7711e53c 100644 --- a/src/mnode/src/mgmtDnodeInt.c +++ b/src/mnode/src/mgmtDnodeInt.c @@ -26,10 +26,11 @@ #include "mgmtDb.h" #include "mgmtDnode.h" #include "mgmtDnodeInt.h" +#include "mgmtProfile.h" #include "mgmtTable.h" #include "mgmtVgroup.h" -void (*mgmtSendMsgToDnodeFp)(int8_t msgType, void *pCont, int32_t contLen, void *ahandle) = NULL; +void (*mgmtSendMsgToDnodeFp)(SRpcIpSet *ipSet, int8_t msgType, void *pCont, int32_t contLen, void *ahandle) = NULL; void (*mgmtSendRspToDnodeFp)(void *handle, int32_t code, void *pCont, int32_t contLen) = NULL; void *mgmtStatusTimer = NULL; @@ -44,10 +45,10 @@ static void mgmtSendMsgToDnodeQueueFp(SSchedMsg *sched) { rpcFreeCont(sched->msg); } -void mgmtSendMsgToDnode(int8_t msgType, void *pCont, int32_t contLen, void *ahandle) { +void mgmtSendMsgToDnode(SRpcIpSet *ipSet, int8_t msgType, void *pCont, int32_t contLen, void *ahandle) { mTrace("msg:%s is sent to dnode", taosMsg[msgType]); if (mgmtSendMsgToDnodeFp) { - mgmtSendMsgToDnodeFp(msgType, pCont, contLen, ahandle); + mgmtSendMsgToDnodeFp(ipSet, msgType, pCont, contLen, ahandle); } else { SSchedMsg schedMsg = {0}; schedMsg.fp = mgmtSendMsgToDnodeQueueFp; @@ -75,7 +76,7 @@ void mgmtSendRspToDnode(void *pConn, int8_t msgType, int32_t code, void *pCont, } } -static void mgmtProcessTableCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *pConn) { +static void mgmtProcessTableCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle) { STableCfgMsg *pCfg = (STableCfgMsg *) pCont; pCfg->dnode = htonl(pCfg->dnode); pCfg->vnode = htonl(pCfg->vnode); @@ -84,19 +85,21 @@ static void mgmtProcessTableCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLe if (!sdbMaster) { mError("dnode:%s, vnode:%d, sid:%d, not master, redirect it", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid); - mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_REDIRECT, NULL, 0); + mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_REDIRECT, NULL, 0); return; } STableInfo *pTable = mgmtGetTableByPos(pCfg->dnode, pCfg->vnode, pCfg->sid); if (pTable == NULL) { mError("dnode:%s, vnode:%d, sid:%d, table not found", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid); - mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_INVALID_TABLE, NULL, 0); + mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_INVALID_TABLE, NULL, 0); return; } - mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0); - mgmtSendCreateTableMsg(pTable, NULL); + mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0); + + SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode); + mgmtSendCreateTableMsg(pTable, &ipSet, NULL); } static void mgmtProcessVnodeCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *pConn) { @@ -117,11 +120,22 @@ static void mgmtProcessVnodeCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLe } mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0); - mgmtSendVPeersMsg(pVgroup, pCfg->vnode, NULL); + + SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode); + mgmtSendCreateVnodeMsg(pVgroup, pCfg->vnode, &ipSet, NULL); } static void mgmtProcessCreateTableRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) { mTrace("create table rsp received, handle:%p code:%d", thandle, code); + if (thandle == NULL) return; + + SProcessInfo *info = thandle; + if (info->type == TSDB_PROCESS_CREATE_TABLE) { + rpcSendResponse(info->thandle, code, NULL, 0); + } else { + mError("create table rsp received, handle:%p code:%d, invalid type:%d", info->type); + rpcSendResponse(info->thandle, TSDB_CODE_INVALID_MSG, NULL, 0); + } } static void mgmtProcessRemoveTableRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) { @@ -132,8 +146,59 @@ static void mgmtProcessFreeVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contL mTrace("free vnode rsp received, handle:%p code:%d", thandle, code); } -static void mgmtProcessVPeersRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) { - mTrace("vpeers rsp received, handle:%p code:%d", thandle, code); +static void mgmtProcessCreateVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *ahandle, int32_t code) { + mTrace("create vnode received, vgroup_info:%p code:%d", ahandle, code); + if (ahandle == NULL) { + rpcFreeCont(pCont); + return; + } + + SProcessInfo *vgroup_info = ahandle; + assert(vgroup_info->type == TSDB_PROCESS_CREATE_VGROUP); + + vgroup_info->received++; + SVgObj *pVgroup = vgroup_info->ahandle; + mTrace("vgroup:%d, received:%d numOfVnodes:%d table_info:%p", + pVgroup->vgId, vgroup_info->received, pVgroup->numOfVnodes, vgroup_info->thandle); + if (vgroup_info->received == pVgroup->numOfVnodes) { + SProcessInfo *table_info = vgroup_info->thandle; + assert(table_info->type == TSDB_PROCESS_CREATE_VGROUP_AND_TABLE); + + STableInfo *pTable = table_info->ahandle; + SProcessInfo *info = calloc(1, sizeof(SProcessInfo)); + info->type = TSDB_PROCESS_CREATE_TABLE; + info->thandle = table_info->thandle; + info->ahandle = pTable; + SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); + mgmtSendCreateTableMsg(pTable, &ipSet, info); + + free(vgroup_info); + free(table_info); + } + + rpcFreeCont(pCont); +} + +void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *table_info) { + SProcessInfo *vgroup_info = calloc(1, sizeof(SProcessInfo)); + vgroup_info->type = TSDB_PROCESS_CREATE_VGROUP; + vgroup_info->thandle = table_info; + vgroup_info->ahandle = pVgroup; + + mTrace("vgroup:%d, send create all vnodes msg, table_info:%p vgroup_info:%p ", pVgroup->vgId, table_info, vgroup_info); + for (int i = 0; i < pVgroup->numOfVnodes; ++i) { + SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip); + mgmtSendCreateVnodeMsg(pVgroup, pVgroup->vnodeGid[i].vnode, &ipSet, vgroup_info); + } +} + +void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *vgroup_info) { + mTrace("vgroup:%d, send create vnode:%d msg, vgroup_info:%p", pVgroup->vgId, vnode, vgroup_info); + + SVPeersMsg *pVpeer = mgmtBuildVpeersMsg(pVgroup, vnode); + if (pVpeer != NULL) { + mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_VPEERS, pVpeer, sizeof(SVPeersMsg), vgroup_info); + } } void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *handle, int32_t code) { @@ -146,61 +211,52 @@ void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *h } else if (msgType == TSDB_MSG_TYPE_DNODE_REMOVE_TABLE_RSP) { mgmtProcessRemoveTableRsp(msgType, pCont, contLen, handle, code); } else if (msgType == TSDB_MSG_TYPE_DNODE_VPEERS_RSP) { - mgmtProcessVPeersRsp(msgType, pCont, contLen, handle, code); + mgmtProcessCreateVnodeRsp(msgType, pCont, contLen, handle, code); } else if (msgType == TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP) { mgmtProcessFreeVnodeRsp(msgType, pCont, contLen, handle, code); } else if (msgType == TSDB_MSG_TYPE_DNODE_CFG_RSP) { } else if (msgType == TSDB_MSG_TYPE_ALTER_STREAM_RSP) { } else { - mError("%s from dnode is not processed", taosMsg[msgType]); + mError("%s from dnode is not processed", taosMsg[(int8_t)msgType]); } } -void mgmtSendCreateTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *handle) { +void mgmtSendCreateTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) { mTrace("table:%s, sid:%d send create table msg, handle:%p", pTable->tableId, pTable->sid); SDCreateTableMsg *pCreate = mgmtBuildCreateTableMsg(pTable); if (pCreate != NULL) { - mgmtSendMsgToDnode(TSDB_MSG_TYPE_DNODE_CREATE_TABLE, pCreate, htonl(pCreate->contLen), handle); + mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_CREATE_TABLE, pCreate, htonl(pCreate->contLen), ahandle); } } -void mgmtSendRemoveTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *handle) { +void mgmtSendRemoveTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) { mTrace("table:%s, sid:%d send remove table msg, handle:%p", pTable->tableId, pTable->sid); SDRemoveTableMsg *pRemove = mgmtBuildRemoveTableMsg(pTable); if (pRemove != NULL) { - mgmtSendMsgToDnode(TSDB_MSG_TYPE_DNODE_REMOVE_TABLE, pRemove, sizeof(SDRemoveTableMsg), handle); + mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_REMOVE_TABLE, pRemove, sizeof(SDRemoveTableMsg), ahandle); } } -void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *handle) { +void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) { mTrace("table:%s, sid:%d send alter stream msg, handle:%p", pTable->tableId, pTable->sid); } -void mgmtSendVPeersMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *handle) { - mTrace("vgroup:%d, vnode:%d send vpeer msg, handle:%p", pVgroup->vgId, vnode, handle); - - SVPeersMsg *pVpeer = mgmtBuildVpeersMsg(pVgroup, vnode); - if (pVpeer != NULL) { - mgmtSendMsgToDnode(TSDB_MSG_TYPE_DNODE_VPEERS, pVpeer, sizeof(SVPeersMsg), handle); - } -} - void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *handle) { mTrace("vnode:%d send free vnode msg, handle:%p", vnode, handle); SFreeVnodeMsg *pFreeVnode = rpcMallocCont(sizeof(SFreeVnodeMsg)); if (pFreeVnode != NULL) { pFreeVnode->vnode = htonl(vnode); - mgmtSendMsgToDnode(TSDB_MSG_TYPE_DNODE_FREE_VNODE, pFreeVnode, sizeof(SFreeVnodeMsg), handle); + mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_FREE_VNODE, pFreeVnode, sizeof(SFreeVnodeMsg), handle); } } void mgmtSendFreeVnodesMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *handle) { for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip); - mgmtSendOneFreeVnodeMsg(pVgroup->vnodeGid + i, &ipSet, handle); + mgmtSendOneFreeVnodeMsg(pVgroup->vnodeGid[i].vnode, &ipSet, handle); } } diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index e7211e2530957cc29f7baec570df2ee91be281f8..a9ab627298b0dcdb8b7fc35e88ec71f00b00c45d 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -273,7 +273,7 @@ int32_t mgmtInitNormalTables() { continue; } - mgmtAddTableIntoVgroup(pVgroup, pTable); + mgmtAddTableIntoVgroup(pVgroup, (STableInfo *)pTable); //pVgroup->tableList[pTable->sid] = (STableInfo*)pTable; taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid, 1); @@ -373,12 +373,10 @@ int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVg return TSDB_CODE_SDB_ERROR; } - mgmtSendCreateNormalTableMsg(pTable, pVgroup); - mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" PRIu64 " db:%s", pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name); - return 0; + return TSDB_CODE_SUCCESS; } int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) { @@ -387,7 +385,7 @@ int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) { return TSDB_CODE_OTHERS; } - mgmtSendRemoveTableMsg((STableInfo *) pTable, pVgroup); +// mgmtSendRemoveTableMsg((STableInfo *) pTable, pVgroup); sdbDeleteRow(tsNormalTableSdb, pTable); diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index a2a195d0c6858ccb23ac2ba3de2060ca63124414..8fbf2241b622f9ed9981029e6cf60764a82dd8ca 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -155,7 +155,7 @@ int32_t mgmtProcessTableMetaMsg(void *pCont, int32_t contLen, void *ahandle) { memcpy(pCreateMsg->schema, pInfo->tags, sizeof(STagData)); strcpy(pCreateMsg->tableId, pInfo->tableId); - int32_t code = mgmtCreateTable(pDb, pCreateMsg); + int32_t code = mgmtCreateTable(pDb, pCreateMsg, NULL); char stableName[TSDB_TABLE_ID_LEN] = {0}; strncpy(stableName, pInfo->tags, TSDB_TABLE_ID_LEN); @@ -803,7 +803,7 @@ int32_t mgmtProcessCreateTableMsg(void *pCont, int32_t contLen, void *ahandle) { SDbObj *pDb = mgmtGetDb(pCreate->db); if (pDb) { - code = mgmtCreateTable(pDb, pCreate); + code = mgmtCreateTable(pDb, pCreate, ahandle); } else { code = TSDB_CODE_DB_NOT_SELECTED; } diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 3c19753931bf68dc195ce273f4b71c6668e1ce55..6472c8737d8d6786cf01bbe1bd9240cc9afda8ae 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -33,6 +33,7 @@ #include "mgmtDnodeInt.h" #include "mgmtGrant.h" #include "mgmtNormalTable.h" +#include "mgmtProfile.h" #include "mgmtSuperTable.h" #include "mgmtTable.h" #include "mgmtUser.h" @@ -110,7 +111,7 @@ int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, boo return TSDB_CODE_SUCCESS; } -int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate) { +int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate, void *thandle) { STableInfo *pTable = mgmtGetTable(pCreate->tableId); if (pTable != NULL) { if (pCreate->igExists) { @@ -143,13 +144,30 @@ int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate) { int32_t sid; SVgObj *pVgroup = mgmtGetAvailVgroup(pDb, &sid); if (pVgroup == NULL) { - // process it in a callback function - return TSDB_CODE_ACTION_IN_PROGRESS; + code = mgmtCreateVgroup(pDb); + if (code == TSDB_CODE_SUCCESS) { + SProcessInfo *info = calloc(1, sizeof(SProcessInfo)); + info->type = TSDB_PROCESS_CREATE_VGROUP_AND_TABLE; + info->ahandle = pTable; + info->thandle = thandle; + SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); + mgmtSendCreateVgroupMsg(pVgroup, &ipSet, info); + return TSDB_CODE_ACTION_IN_PROGRESS; + } } else { if (pCreate->numOfColumns == 0) { - return mgmtCreateChildTable(pDb, pCreate, pVgroup, sid); + code = mgmtCreateChildTable(pDb, pCreate, pVgroup, sid); } else { - return mgmtCreateNormalTable(pDb, pCreate, pVgroup, sid); + code = mgmtCreateNormalTable(pDb, pCreate, pVgroup, sid); + } + if (code == TSDB_CODE_SUCCESS) { + SProcessInfo *info = calloc(1, sizeof(SProcessInfo)); + info->type = TSDB_PROCESS_CREATE_TABLE; + info->ahandle = pTable; + info->thandle = thandle; + SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); + mgmtSendCreateTableMsg(pTable, &ipSet, info); + return TSDB_CODE_ACTION_IN_PROGRESS; } } } else { diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 8f7ea729d114064ec83568fc715fbb8d163e0949..79792ee20e68449c02f7c69bbcf82c0640207db1 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -216,8 +216,6 @@ SVgObj *mgmtCreateVgroup(SDbObj *pDb) { for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) mTrace("vgroup:%d, dnode:%s vnode:%d is created", pVgroup->vgId, taosIpStr(pVgroup->vnodeGid[i].ip), pVgroup->vnodeGid[i].vnode); - mgmtSendVPeersMsg(pVgroup); - return pVgroup; } @@ -234,7 +232,7 @@ int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) { } mTrace("vgroup:%d, db:%s replica:%d is deleted", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); - mgmtSendFreeVnodesMsg(pVgroup); +// mgmtSendFreeVnodesMsg(pVgroup); sdbDeleteRow(tsVgroupSdb, pVgroup); return 0; @@ -578,6 +576,6 @@ SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup) { } SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip) { - SRpcIpSet ipSet = {.ip = ip, .numOfIps = 1, .inUse = 0, .port = tsMgmtDnodePort + 1}; + SRpcIpSet ipSet = {.ip[0] = ip, .numOfIps = 1, .inUse = 0, .port = tsMgmtDnodePort + 1}; return ipSet; } \ No newline at end of file diff --git a/src/util/inc/tsched.h b/src/util/inc/tsched.h index b46a0c455ab50a0b42decb7f700438507e9192b8..c9ce6b388f9374bf21abab9036c4f0707d547f14 100644 --- a/src/util/inc/tsched.h +++ b/src/util/inc/tsched.h @@ -25,7 +25,7 @@ typedef struct _sched_msg { void (*tfp)(void *, void *); - int8_t *msg; + void *msg; void *ahandle; void *thandle; } SSchedMsg; diff --git a/src/util/inc/tsystem.h b/src/util/inc/tsystem.h index 9261261e1b860f51d731a8ba911ce07e3bcaa5a3..3ffa2312969d212f7cac79ab22a8965fed6fd736 100644 --- a/src/util/inc/tsystem.h +++ b/src/util/inc/tsystem.h @@ -23,7 +23,7 @@ extern "C" { #include #include -extern char dataDir[TSDB_FILENAME_LEN]; +extern char dataDir[]; bool taosGetSysMemory(float *memoryUsedMB);