diff --git a/src/dnode/src/dnodeMClient.c b/src/dnode/src/dnodeMClient.c index e3a7f99a5fbf48257d6605b903f0e24f17c67f4c..76bb8aa524f8e703dc7693215d54c1bbba6de18f 100644 --- a/src/dnode/src/dnodeMClient.c +++ b/src/dnode/src/dnodeMClient.c @@ -21,6 +21,8 @@ #include "tutil.h" #include "dnode.h" #include "dnodeMClient.h" +#include "dnodeModule.h" +#include "dnodeMClient.h" static bool dnodeReadMnodeIpList(); static void dnodeSaveMnodeIpList(); diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index caff618048b04a070874584a1dd27a7c7df868a2..e1c836a0ea383d72a26098369ec88c30d057824d 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -497,8 +497,8 @@ static void dnodeReadDnodeId() { int32_t num = 0; fscanf(fp, "%s %d", option, &value); - if (num != 2) return false; - if (strcmp(option, "dnodeId") != 0) return false; + if (num != 2) return; + if (strcmp(option, "dnodeId") != 0) return; tsDnodeId = value;; fclose(fp); @@ -510,16 +510,12 @@ static void dnodeSaveDnodeId() { sprintf(dnodeIdFile, "%s/dnodeId", tsDnodeDir); FILE *fp = fopen(dnodeIdFile, "w"); - if (!fp) { - return false; - } + if (!fp) return; fprintf(fp, "dnodeId %d\n", tsDnodeId); fclose(fp); dPrint("save dnodeId successed"); - - return true; } void dnodeUpdateDnodeId(int32_t dnodeId) { diff --git a/src/inc/mnode.h b/src/inc/mnode.h index d42d2f45f0929d67686487b45d7964b099e9f096..4cae1d5c678100e5341d50318b05899e448e64d6 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -42,46 +42,38 @@ extern "C" { typedef struct { int32_t dnodeId; uint32_t privateIp; - int32_t sid; + uint32_t publicIp; uint32_t moduleStatus; - int32_t openVnodes; - int32_t numOfVnodes; - int32_t numOfFreeVnodes; int64_t createdTime; - uint32_t publicIp; - int32_t status; uint32_t lastAccess; - uint32_t rebootTimes; - uint32_t lastReboot; // time stamp for last reboot + int32_t openVnodes; + int32_t numOfTotalVnodes; // from dnode status msg, config information + uint32_t rack; + uint16_t idc; + uint16_t slot; uint16_t numOfCores; // from dnode status msg - uint8_t alternativeRole; // from dnode status msg, 0-any, 1-mgmt, 2-dnode - uint8_t reserveStatus; - uint16_t numOfTotalVnodes; // from dnode status msg, config information - uint16_t unused; + int8_t alternativeRole; // from dnode status msg, 0-any, 1-mgmt, 2-dnode + int8_t lbStatus; // set in balance function + float lbScore; // calc in balance function + int32_t customScore; // config by user + char dnodeName[TSDB_DNODE_NAME_LEN + 1]; + char reserved[7]; + char updateEnd[1]; + SVnodeLoad vload[TSDB_MAX_VNODES]; + int32_t status; + uint32_t lastReboot; // time stamp for last reboot float diskAvailable; // from dnode status msg - int32_t bandwidthMb; // config by user + int16_t diskAvgUsage; // calc from sys.disk int16_t cpuAvgUsage; // calc from sys.cpu int16_t memoryAvgUsage; // calc from sys.mem - int16_t diskAvgUsage; // calc from sys.disk int16_t bandwidthUsage; // calc from sys.band - uint32_t rack; - uint16_t idc; - uint16_t slot; - int32_t customScore; // config by user - float lbScore; // calc in balance function - int16_t lbStatus; // set in balance function - int16_t lastAllocVnode; // increase while create vnode - SVnodeLoad vload[TSDB_MAX_VNODES]; - char reserved[16]; - char updateEnd[1]; - void * thandle; } SDnodeObj; typedef struct { int32_t dnodeId; - uint32_t ip; - uint32_t publicIp; int32_t vnode; + uint32_t privateIp; + uint32_t publicIp; } SVnodeGid; typedef struct { @@ -150,15 +142,13 @@ typedef struct _vg_obj { uint32_t vgId; char dbName[TSDB_DB_NAME_LEN + 1]; int64_t createdTime; - uint64_t lastCreate; - uint64_t lastRemove; - int32_t numOfVnodes; SVnodeGid vnodeGid[TSDB_VNODES_SUPPORT]; + int32_t numOfVnodes; int32_t numOfTables; int32_t lbIp; int32_t lbTime; int8_t lbStatus; - int8_t reserved[16]; + int8_t reserved[14]; int8_t updateEnd[1]; struct _vg_obj *prev, *next; void * idPool; @@ -170,8 +160,7 @@ typedef struct _db_obj { int8_t dirty; int64_t createdTime; SDbCfg cfg; - int8_t dropStatus; - char reserved[16]; + char reserved[15]; char updateEnd[1]; struct _db_obj *prev, *next; int32_t numOfVgroups; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 38fcb3fd42a58e3d7bbfa6a470053031652b98d3..c908d34136db2526d0f4fb73b9f547c6aacc4573 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -520,18 +520,20 @@ typedef struct { typedef struct { int32_t vgId; + int32_t vnode; int64_t totalStorage; int64_t compStorage; int64_t pointsWritten; uint8_t status; uint8_t syncStatus; uint8_t accessState; - uint8_t reserved[6]; + uint8_t reserved[5]; } SVnodeLoad; typedef struct { uint32_t vnode; - char accessState; + uint8_t accessState; + uint8_t reserved[3]; } SVnodeAccess; /* diff --git a/src/mnode/inc/mgmtBalance.h b/src/mnode/inc/mgmtBalance.h index ad55e0645c41f8f166637aca86fd0df14e174a4d..697bc46a7e8d3dcdb07f3bfd2c55595e05bb63b9 100644 --- a/src/mnode/inc/mgmtBalance.h +++ b/src/mnode/inc/mgmtBalance.h @@ -23,6 +23,7 @@ extern "C" { int32_t mgmtInitBalance(); void mgmtCleanupBalance(); +void mgmtStartBalance(int32_t afterMs) ; int32_t mgmtAllocVnodes(SVgObj *pVgroup); #ifdef __cplusplus diff --git a/src/mnode/inc/mgmtDServer.h b/src/mnode/inc/mgmtDServer.h index 9f3792907cc4f4e0244d4b883756516e6a8bf675..937ae8f1acd46c48a4811195e7aa5916b0a1db73 100644 --- a/src/mnode/inc/mgmtDServer.h +++ b/src/mnode/inc/mgmtDServer.h @@ -24,21 +24,6 @@ int32_t mgmtInitDServer(); void mgmtCleanupDServer(); void mgmtAddDServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); - -//extern void *mgmtStatusTimer; -// -//void mgmtSendCreateTableMsg(SMDCreateTableMsg *pCreate, SRpcIpSet *ipSet, void *ahandle); -//void mgmtSendDropTableMsg(SMDDropTableMsg *pRemove, SRpcIpSet *ipSet, void *ahandle); -//void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle); -//void mgmtSendDropVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle); -// -//int32_t mgmtInitDnodeInt(); -//void mgmtCleanUpDnodeInt(); -// -//void mgmtSendMsgToDnode(SRpcIpSet *ipSet, int8_t msgType, void *pCont, int32_t contLen, void *ahandle); -//void mgmtSendRspToDnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen); -//void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code); - #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtDnode.h b/src/mnode/inc/mgmtDnode.h index 2fd6cb3d8d395f527d77acb786b6807eb955f085..4dc82fefe381adee2a3f16e16d17523edd7aade1 100644 --- a/src/mnode/inc/mgmtDnode.h +++ b/src/mnode/inc/mgmtDnode.h @@ -25,7 +25,8 @@ int32_t mgmtInitDnodes(); void mgmtCleanUpDnodes(); int32_t mgmtGetDnodesNum(); int32_t mgmtUpdateDnode(SDnodeObj *pDnode); -SDnodeObj* mgmtGetDnode(uint32_t ip); +SDnodeObj* mgmtGetDnode(int32_t dnodeId); +SDnodeObj* mgmtGetDnodeByIp(uint32_t ip); bool mgmtCheckDnodeInRemoveState(SDnodeObj *pDnode); bool mgmtCheckDnodeInOfflineState(SDnodeObj *pDnode); diff --git a/src/mnode/src/mgmtAcct.c b/src/mnode/src/mgmtAcct.c index 4c25449c22ad0ed48d55726f8792851be120c22f..b1ee72386ef9bc36635c982685e7885bd1ab5f93 100644 --- a/src/mnode/src/mgmtAcct.c +++ b/src/mnode/src/mgmtAcct.c @@ -24,7 +24,7 @@ void (*mgmtCleanUpAcctsFp)() = NULL; SAcctObj *(*mgmtGetAcctFp)(char *acctName) = NULL; int32_t (*mgmtCheckUserLimitFp)(SAcctObj *pAcct) = NULL; int32_t (*mgmtCheckDbLimitFp)(SAcctObj *pAcct) = NULL; -int32_t (*mgmtCheckTimeSeriesLimitFp)(SAcctObj *pAcct, int32_t numOfTimeSeries) = NULL; +int32_t (*mgmtCheckTableLimitFp)(SAcctObj *pAcct, int32_t numOfTimeSeries) = NULL; int32_t mgmtAddDbIntoAcct(SAcctObj *pAcct, SDbObj *pDb) { pthread_mutex_lock(&pAcct->mutex); @@ -137,8 +137,8 @@ int32_t mgmtCheckDbLimit(SAcctObj *pAcct) { } int32_t mgmtCheckTableLimit(SAcctObj *pAcct, int32_t numOfTimeSeries) { - if (mgmtCheckTimeSeriesLimitFp) { - return (*mgmtCheckTimeSeriesLimitFp)(pAcct, numOfTimeSeries); + if (mgmtCheckTableLimitFp) { + return (*mgmtCheckTableLimitFp)(pAcct, numOfTimeSeries); } else { return 0; } diff --git a/src/mnode/src/mgmtBalance.c b/src/mnode/src/mgmtBalance.c index fae237c4f55d1ab4ed4a2eca0c47759141831207..116e0ef36ba1bbaaf95ea15e54253f2d93520264 100644 --- a/src/mnode/src/mgmtBalance.c +++ b/src/mnode/src/mgmtBalance.c @@ -20,6 +20,7 @@ int32_t (*mgmtInitBalanceFp)() = NULL; void (*mgmtCleanupBalanceFp)() = NULL; +void (*mgmtStartBalanceFp)(int32_t afterMs) = NULL; int32_t (*mgmtAllocVnodesFp)(SVgObj *pVgroup) = NULL; int32_t mgmtInitBalance() { @@ -36,33 +37,28 @@ void mgmtCleanupBalance() { } } +void mgmtStartBalance(int32_t afterMs) { + if (mgmtStartBalanceFp) { + (*mgmtStartBalanceFp)(afterMs); + } +} + int32_t mgmtAllocVnodes(SVgObj *pVgroup) { if (mgmtAllocVnodesFp) { - return mgmtAllocVnodesFp(pVgroup); + return (*mgmtAllocVnodesFp)(pVgroup); } - SDnodeObj *pDnode = mgmtGetDnode(0); + SDnodeObj *pDnode = mgmtGetDnode(1); if (pDnode == NULL) return TSDB_CODE_OTHERS; - int32_t selectedVnode = -1; - int32_t lastAllocVode = pDnode->lastAllocVnode; - - for (int32_t i = 0; i < pDnode->numOfVnodes; i++) { - int32_t vnode = (i + lastAllocVode) % pDnode->numOfVnodes; - if (pDnode->vload[vnode].vgId == 0 && pDnode->vload[vnode].status == TSDB_VN_STATUS_OFFLINE) { - selectedVnode = vnode; - break; - } - } - - if (selectedVnode == -1) { - mError("alloc vnode failed, free vnodes:%d", pDnode->numOfFreeVnodes); - return -1; + if (pDnode->openVnodes < pDnode->numOfTotalVnodes) { + pVgroup->vnodeGid[0].dnodeId = pDnode->dnodeId; + pVgroup->vnodeGid[0].privateIp = pDnode->privateIp; + pVgroup->vnodeGid[0].publicIp = pDnode->publicIp; + mTrace("dnode:%d, alloc one vnode to vgroup", pDnode->dnodeId); + return TSDB_CODE_SUCCESS; } else { - mTrace("allocate vnode:%d, last allocated vnode:%d", selectedVnode, lastAllocVode); - pVgroup->vnodeGid[0].vnode = selectedVnode; - pDnode->lastAllocVnode = selectedVnode + 1; - if (pDnode->lastAllocVnode >= pDnode->numOfVnodes) pDnode->lastAllocVnode = 0; - return 0; + mError("dnode:%d, failed to alloc vnode to vgroup", pDnode->dnodeId); + return TSDB_CODE_NO_ENOUGH_DNODES; } } diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index 1e9e55e38121696f5fb3d8329cb0432d292b391e..53304c7565502e8cf2e4be689ead66ceadefa612 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -34,7 +34,7 @@ #include "mgmtTable.h" #include "mgmtVgroup.h" -void *tsChildTableSdb; +void *tsChildTableSdb; int32_t tsChildTableUpdateSize; void *(*mgmtChildTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize); @@ -51,12 +51,12 @@ static void mgmtDestroyChildTable(SChildTableObj *pTable) { } static void mgmtChildTableActionInit() { - mgmtChildTableActionFp[SDB_TYPE_INSERT] = mgmtChildTableActionInsert; - mgmtChildTableActionFp[SDB_TYPE_DELETE] = mgmtChildTableActionDelete; - mgmtChildTableActionFp[SDB_TYPE_UPDATE] = mgmtChildTableActionUpdate; - mgmtChildTableActionFp[SDB_TYPE_ENCODE] = mgmtChildTableActionEncode; - mgmtChildTableActionFp[SDB_TYPE_DECODE] = mgmtChildTableActionDecode; - mgmtChildTableActionFp[SDB_TYPE_RESET] = mgmtChildTableActionReset; + mgmtChildTableActionFp[SDB_TYPE_INSERT] = mgmtChildTableActionInsert; + mgmtChildTableActionFp[SDB_TYPE_DELETE] = mgmtChildTableActionDelete; + mgmtChildTableActionFp[SDB_TYPE_UPDATE] = mgmtChildTableActionUpdate; + mgmtChildTableActionFp[SDB_TYPE_ENCODE] = mgmtChildTableActionEncode; + mgmtChildTableActionFp[SDB_TYPE_DECODE] = mgmtChildTableActionDecode; + mgmtChildTableActionFp[SDB_TYPE_RESET] = mgmtChildTableActionReset; mgmtChildTableActionFp[SDB_TYPE_DESTROY] = mgmtChildTableActionDestroy; } @@ -77,26 +77,26 @@ void *mgmtChildTableActionInsert(void *row, char *str, int32_t size, int32_t *ss SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { - mError("id:%s not in vgroup:%d", pTable->tableId, pTable->vgId); + mError("ctable:%s, not in vgroup:%d", pTable->tableId, pTable->vgId); return NULL; } SDbObj *pDb = mgmtGetDb(pVgroup->dbName); if (pDb == NULL) { - mError("vgroup:%d not in DB:%s", pVgroup->vgId, pVgroup->dbName); + mError("ctable:%s, vgroup:%d not in db:%s", pTable->tableId, pVgroup->vgId, pVgroup->dbName); return NULL; } SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); if (pAcct == NULL) { - mError("account not exists"); + mError("ctable:%s, account:%s not exists", pTable->tableId, pDb->cfg.acct); return NULL; } if (!sdbMaster) { int32_t sid = taosAllocateId(pVgroup->idPool); if (sid != pTable->sid) { - mError("sid:%d is not matched from the master:%d", sid, pTable->sid); + mError("ctable:%s, sid:%d is not matched from the master:%d", pTable->tableId, sid, pTable->sid); return NULL; } } @@ -128,13 +128,13 @@ void *mgmtChildTableActionDelete(void *row, char *str, int32_t size, int32_t *ss SDbObj *pDb = mgmtGetDb(pVgroup->dbName); if (pDb == NULL) { - mError("vgroup:%d not in DB:%s", pVgroup->vgId, pVgroup->dbName); + mError("ctable:%s, vgroup:%d not in DB:%s", pTable->tableId, pVgroup->vgId, pVgroup->dbName); return NULL; } SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); if (pAcct == NULL) { - mError("account not exists"); + mError("ctable:%s, account:%s not exists", pTable->tableId, pDb->cfg.acct); return NULL; } @@ -313,7 +313,7 @@ void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableObj *pTab void* mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t tid) { int32_t numOfTables = sdbGetNumOfRows(tsChildTableSdb); if (numOfTables >= tsMaxTables) { - mError("table:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, tsMaxTables); + mError("ctable:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, tsMaxTables); terrno = TSDB_CODE_TOO_MANY_TABLES; return NULL; } @@ -321,14 +321,14 @@ void* mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t char *pTagData = (char *) pCreate->schema; // it is a tag key SSuperTableObj *pSuperTable = mgmtGetSuperTable(pTagData); if (pSuperTable == NULL) { - mError("table:%s, corresponding super table does not exist", pCreate->tableId); + mError("ctable:%s, corresponding super table does not exist", pCreate->tableId); terrno = TSDB_CODE_INVALID_TABLE; return NULL; } SChildTableObj *pTable = (SChildTableObj *) calloc(sizeof(SChildTableObj), 1); if (pTable == NULL) { - mError("table:%s, failed to alloc memory", pCreate->tableId); + mError("ctable:%s, failed to alloc memory", pCreate->tableId); terrno = TSDB_CODE_SERV_OUT_OF_MEMORY; return NULL; } @@ -345,25 +345,25 @@ void* mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t if (sdbInsertRow(tsChildTableSdb, pTable, 0) < 0) { free(pTable); - mError("table:%s, update sdb error", pCreate->tableId); + mError("ctable:%s, update sdb error", pCreate->tableId); terrno = TSDB_CODE_SDB_ERROR; return NULL; } - mTrace("table:%s, create ctable in vgroup, uid:%" PRIu64 , pTable->tableId, pTable->uid); + mTrace("ctable:%s, create ctable in vgroup, uid:%" PRIu64 , pTable->tableId, pTable->uid); return pTable; } int32_t mgmtDropChildTable(SQueuedMsg *newMsg, SChildTableObj *pTable) { SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { - mError("table:%s, failed to drop child table, vgroup not exist", pTable->tableId); + mError("ctable:%s, failed to drop child table, vgroup not exist", pTable->tableId); return TSDB_CODE_OTHERS; } SMDDropTableMsg *pDrop = rpcMallocCont(sizeof(SMDDropTableMsg)); if (pDrop == NULL) { - mError("table:%s, failed to drop child table, no enough memory", pTable->tableId); + mError("ctable:%s, failed to drop child table, no enough memory", pTable->tableId); return TSDB_CODE_SERV_OUT_OF_MEMORY; } @@ -375,7 +375,7 @@ int32_t mgmtDropChildTable(SQueuedMsg *newMsg, SChildTableObj *pTable) { SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); - mTrace("table:%s, send drop table msg", pDrop->tableId); + mTrace("ctable:%s, send drop table msg", pDrop->tableId); SRpcMsg rpcMsg = { .handle = newMsg, .pCont = pDrop, @@ -395,6 +395,7 @@ void* mgmtGetChildTable(char *tableId) { } int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent) { +// TODO: send message to dnode // int32_t col = mgmtFindSuperTableTagIndex(pTable->superTable, tagName); // if (col < 0 || col > pTable->superTable->numOfTags) { // return TSDB_CODE_APP_ERROR; @@ -462,7 +463,7 @@ int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMeta *p if (usePublicIp) { pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp; } else { - pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].ip; + pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].privateIp; } pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); pMeta->vpeerDesc[i].vgId = htonl(pVgroup->vgId); diff --git a/src/mnode/src/mgmtDClient.c b/src/mnode/src/mgmtDClient.c index 22884cc7d005c0d595d1bf0229c4e9b1e8f373b6..da11ad20811734d158aeaa12733ddae178402273 100644 --- a/src/mnode/src/mgmtDClient.c +++ b/src/mnode/src/mgmtDClient.c @@ -83,90 +83,3 @@ static void mgmtProcessRspFromDnode(SRpcMsg *rpcMsg) { rpcFreeCont(rpcMsg->pCont); } - -//static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) { -// mTrace("drop table rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); -//} -// -//static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg) { -// mTrace("alter table rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); -//} -// -// -//static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) { -// mTrace("drop vnode rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); -//} -// -//static void mgmtProcessAlterVnodeRsp(SRpcMsg *rpcMsg) { -// mTrace("alter vnode rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); -//} -// -//static void mgmtProcessDropStableRsp(SRpcMsg *rpcMsg) { -// mTrace("drop stable rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); -//} -// -//static void mgmtProcessAlterStreamRsp(SRpcMsg *rpcMsg) { -// mTrace("alter stream rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); -//} -// -//static void mgmtProcessConfigDnodeRsp(SRpcMsg *rpcMsg) { -// mTrace("config dnode rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); -//} -// - - -// -//void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) { -// mTrace("table:%s, send alter stream msg, ahandle:%p", pTable->tableId, pTable->sid, ahandle); -//} -// -//void mgmtSendDropVnodeMsg(int32_t vgId, int32_t vnode, SRpcIpSet *ipSet, void *ahandle) { -// mTrace("vnode:%d send free vnode msg, ahandle:%p", vnode, ahandle); -// SMDDropVnodeMsg *pDrop = rpcMallocCont(sizeof(SMDDropVnodeMsg)); -// SRpcMsg rpcMsg = { -// .handle = ahandle, -// .pCont = pDrop, -// .contLen = pDrop ? sizeof(SMDDropVnodeMsg) : 0, -// .code = 0, -// .msgType = TSDB_MSG_TYPE_MD_DROP_VNODE -// }; -// rpcSendRequest(tsMgmtDClientRpc, ipSet, &rpcMsg); -//} -// - -//// -////int32_t mgmtCfgDynamicOptions(SDnodeObj *pDnode, char *msg) { -//// char *option, *value; -//// int32_t olen, valen; -//// -//// paGetToken(msg, &option, &olen); -//// if (strncasecmp(option, "unremove", 8) == 0) { -//// mgmtSetDnodeUnRemove(pDnode); -//// return TSDB_CODE_SUCCESS; -//// } else if (strncasecmp(option, "score", 5) == 0) { -//// paGetToken(option + olen + 1, &value, &valen); -//// if (valen > 0) { -//// int32_t score = atoi(value); -//// mTrace("dnode:%s, custom score set from:%d to:%d", taosIpStr(pDnode->privateIp), pDnode->customScore, score); -//// pDnode->customScore = score; -//// mgmtUpdateDnode(pDnode); -//// //mgmtStartBalanceTimer(15); -//// } -//// return TSDB_CODE_INVALID_SQL; -//// } else if (strncasecmp(option, "bandwidth", 9) == 0) { -//// paGetToken(msg, &value, &valen); -//// if (valen > 0) { -//// int32_t bandwidthMb = atoi(value); -//// if (bandwidthMb >= 0 && bandwidthMb < 10000000) { -//// mTrace("dnode:%s, bandwidth(Mb) set from:%d to:%d", taosIpStr(pDnode->privateIp), pDnode->bandwidthMb, bandwidthMb); -//// pDnode->bandwidthMb = bandwidthMb; -//// mgmtUpdateDnode(pDnode); -//// return TSDB_CODE_SUCCESS; -//// } -//// } -//// return TSDB_CODE_INVALID_SQL; -//// } -//// -//// return -1; -////} -//// diff --git a/src/mnode/src/mgmtDServer.c b/src/mnode/src/mgmtDServer.c index ce7ac8cb39dc5a6ac41fd7174dd5c1b9ec3cf4c1..177a45764ccd773a3ceb549fd0dccb624c4257c3 100644 --- a/src/mnode/src/mgmtDServer.c +++ b/src/mnode/src/mgmtDServer.c @@ -85,218 +85,3 @@ static void mgmtProcessMsgFromDnode(SRpcMsg *rpcMsg) { static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) { return TSDB_CODE_SUCCESS; } - -// -// -//static void mgmtProcessTableCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle) { -// SDMConfigTableMsg *pCfg = (SDMConfigTableMsg *) pCont; -// pCfg->dnode = htonl(pCfg->dnode); -// pCfg->vnode = htonl(pCfg->vnode); -// pCfg->sid = htonl(pCfg->sid); -// mTrace("dnode:%s, vnode:%d, sid:%d, receive table config msg", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid); -// -// if (!sdbMaster) { -// mError("dnode:%s, vnode:%d, sid:%d, not master, redirect it", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid); -// mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_REDIRECT, NULL, 0); -// return; -// } -// -// STableInfo *pTable = mgmtGetTableByPos(pCfg->dnode, pCfg->vnode, pCfg->sid); -// if (pTable == NULL) { -// mError("dnode:%s, vnode:%d, sid:%d, table not found", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid); -// mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_INVALID_TABLE, NULL, 0); -// return; -// } -// -// mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0); -// -// //TODO -// SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode); -// mgmtSendCreateTableMsg(NULL, &ipSet, NULL); -//} -// -//static void mgmtProcessVnodeCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *pConn) { -// if (!sdbMaster) { -// mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_REDIRECT, NULL, 0); -// return; -// } -// -// SDMConfigVnodeMsg *pCfg = (SDMConfigVnodeMsg *) pCont; -// pCfg->dnode = htonl(pCfg->dnode); -// pCfg->vnode = htonl(pCfg->vnode); -// -// SVgObj *pVgroup = mgmtGetVgroupByVnode(pCfg->dnode, pCfg->vnode); -// if (pVgroup == NULL) { -// mTrace("dnode:%s, vnode:%d, no vgroup info", taosIpStr(pCfg->dnode), pCfg->vnode); -// mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_NOT_ACTIVE_VNODE, NULL, 0); -// return; -// } -// -// mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0); -// -// SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode); -// mgmtSendCreateVnodeMsg(pVgroup, pCfg->vnode, &ipSet, NULL); -//} -// -//static void mgmtProcessCreateTableRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) { -// mTrace("create table rsp received, thandle:%p code:%d", thandle, code); -// if (thandle == NULL) return; -// -// SProcessInfo *info = thandle; -// assert(info->type == TSDB_PROCESS_CREATE_TABLE || info->type == TSDB_PROCESS_CREATE_TABLE_GET_META); -// STableInfo *pTable = info->ahandle; -// -// if (code != TSDB_CODE_SUCCESS) { -// mError("table:%s, failed to create in dnode, code:%d, set it dirty", pTable->tableId); -// mgmtSetTableDirty(pTable, true); -// } else { -// mTrace("table:%s, created in dnode", pTable->tableId); -// mgmtSetTableDirty(pTable, false); -// } -// -// if (code != TSDB_CODE_SUCCESS) { -// SRpcMsg rpcMsg = {0}; -// rpcMsg.code = code; -// rpcMsg.handle = info->thandle; -// rpcSendResponse(&rpcMsg); -// } else { -// if (info->type == TSDB_PROCESS_CREATE_TABLE_GET_META) { -// mTrace("table:%s, start to process get meta", pTable->tableId); -// mgmtProcessGetTableMeta(pTable, thandle); -// } else { -// SRpcMsg rpcMsg = {0}; -// rpcMsg.code = code; -// rpcMsg.handle = info->thandle; -// rpcSendResponse(&rpcMsg); -// } -// } -// -// free(info); -//} -// - -//static void mgmtProcessRemoveTableRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) { -// mTrace("remove table rsp received, thandle:%p code:%d", thandle, code); -//} -// - -// -//static void mgmtProcessDropVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) { -// mTrace("free vnode rsp received, thandle:%p code:%d", thandle, code); -//} -// -//static void mgmtProcessDropStableRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) { -// mTrace("drop stable rsp received, thandle:%p code:%d", thandle, code); -//} -// -//static void mgmtProcessCreateVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) { -// mTrace("create vnode rsp received, thandle:%p code:%d", thandle, code); -// if (thandle == NULL) return; -// -// SProcessInfo *info = thandle; -// assert(info->type == TSDB_PROCESS_CREATE_VGROUP || info->type == TSDB_PROCESS_CREATE_VGROUP_GET_META); -// info->received++; -// SVgObj *pVgroup = info->ahandle; -// -// bool isGetMeta = false; -// if (info->type == TSDB_PROCESS_CREATE_VGROUP_GET_META) { -// isGetMeta = true; -// } -// -// mTrace("vgroup:%d, received:%d numOfVnodes:%d", pVgroup->vgId, info->received, pVgroup->numOfVnodes); -// if (info->received == pVgroup->numOfVnodes) { -// mgmtProcessCreateTable(pVgroup, info->cont, info->contLen, info->thandle, isGetMeta); -// free(info); -// } -//} -// -//void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) { -// if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) { -// mError("invalid msg type:%d", msgType); -// return; -// } -// -// mTrace("msg:%d:%s is received from dnode, pConn:%p", msgType, taosMsg[(int8_t)msgType], pConn); -// -// if (msgType == TSDB_MSG_TYPE_DM_CONFIG_TABLE) { -// mgmtProcessTableCfgMsg(msgType, pCont, contLen, pConn); -// } else if (msgType == TSDB_MSG_TYPE_DM_CONFIG_VNODE) { -// mgmtProcessVnodeCfgMsg(msgType, pCont, contLen, pConn); -// } else if (msgType == TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP) { -// mgmtProcessCreateTableRsp(msgType, pCont, contLen, pConn, code); -// } else if (msgType == TSDB_MSG_TYPE_MD_DROP_TABLE_RSP) { -// mgmtProcessRemoveTableRsp(msgType, pCont, contLen, pConn, code); -// } else if (msgType == TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP) { -// mgmtProcessCreateVnodeRsp(msgType, pCont, contLen, pConn, code); -// } else if (msgType == TSDB_MSG_TYPE_MD_DROP_VNODE_RSP) { -// mgmtProcessDropVnodeRsp(msgType, pCont, contLen, pConn, code); -// } else if (msgType == TSDB_MSG_TYPE_MD_DROP_STABLE) { -// mgmtProcessDropStableRsp(msgType, pCont, contLen, pConn, code); -// } else if (msgType == TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP) { -// } else if (msgType == TSDB_MSG_TYPE_CM_ALTER_STREAM_RSP) { -// } else if (msgType == TSDB_MSG_TYPE_DM_STATUS) { -// mgmtProcessDnodeStatus(msgType, pCont, contLen, pConn, code); -// } else { -// mError("%s from dnode is not processed", taosMsg[(int8_t)msgType]); -// } -// -// //rpcFreeCont(pCont); -//} -// -//void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) { -// mTrace("table:%s, sid:%d send alter stream msg, ahandle:%p", pTable->tableId, pTable->sid, ahandle); -//} -// -//void mgmtSendDropVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle) { -// mTrace("vnode:%d send free vnode msg, ahandle:%p", vnode, ahandle); -// -// SMDDropVnodeMsg *pFreeVnode = rpcMallocCont(sizeof(SMDDropVnodeMsg)); -// if (pFreeVnode != NULL) { -// pFreeVnode->vnode = htonl(vnode); -// mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_MD_DROP_VNODE, pFreeVnode, sizeof(SMDDropVnodeMsg), ahandle); -// } -//} -// - -//int32_t mgmtCfgDynamicOptions(SDnodeObj *pDnode, char *msg) { -// char *option, *value; -// int32_t olen, valen; -// -// paGetToken(msg, &option, &olen); -// if (strncasecmp(option, "unremove", 8) == 0) { -// mgmtSetDnodeUnRemove(pDnode); -// return TSDB_CODE_SUCCESS; -// } else if (strncasecmp(option, "score", 5) == 0) { -// paGetToken(option + olen + 1, &value, &valen); -// if (valen > 0) { -// int32_t score = atoi(value); -// mTrace("dnode:%s, custom score set from:%d to:%d", taosIpStr(pDnode->privateIp), pDnode->customScore, score); -// pDnode->customScore = score; -// mgmtUpdateDnode(pDnode); -// //mgmtStartBalanceTimer(15); -// } -// return TSDB_CODE_INVALID_SQL; -// } else if (strncasecmp(option, "bandwidth", 9) == 0) { -// paGetToken(msg, &value, &valen); -// if (valen > 0) { -// int32_t bandwidthMb = atoi(value); -// if (bandwidthMb >= 0 && bandwidthMb < 10000000) { -// mTrace("dnode:%s, bandwidth(Mb) set from:%d to:%d", taosIpStr(pDnode->privateIp), pDnode->bandwidthMb, bandwidthMb); -// pDnode->bandwidthMb = bandwidthMb; -// mgmtUpdateDnode(pDnode); -// return TSDB_CODE_SUCCESS; -// } -// } -// return TSDB_CODE_INVALID_SQL; -// } -// -// return -1; -//} -// -// -//void mgmtCleanUpDnodeInt() { -// if (mgmtCleanUpDnodeIntFp) { -// mgmtCleanUpDnodeIntFp(); -// } -//} -// \ No newline at end of file diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index 10a60cf927be2c291b544e40ca5d0732e891ca53..caf84737b0c149459ca5a397061e4513dfc34c9c 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -34,7 +34,7 @@ #include "mgmtUser.h" #include "mgmtVgroup.h" -static void *tsDbSdb = NULL; +static void *tsDbSdb = NULL; static int32_t tsDbUpdateSize; static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate); @@ -679,7 +679,7 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void * cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - strcpy(pWrite, pDb->dropStatus != TSDB_DB_STATUS_READY ? "dropping" : "ready"); + strcpy(pWrite, pDb->dirty != TSDB_DB_STATUS_READY ? "dropping" : "ready"); cols++; numOfRows++; diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index 19ee53af0c5bd128dc49259408a661c863859a91..37106323e9bc0accf184b7f543313b93e4840bbc 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -51,75 +51,22 @@ static void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg); void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode) { int32_t maxVnodes = pDnode->numOfCores * tsNumOfVnodesPerCore; + maxVnodes = maxVnodes > TSDB_MAX_VNODES ? TSDB_MAX_VNODES : maxVnodes; maxVnodes = maxVnodes < TSDB_MIN_VNODES ? TSDB_MIN_VNODES : maxVnodes; - if (pDnode->numOfTotalVnodes != 0) { - maxVnodes = pDnode->numOfTotalVnodes; + + if (pDnode->numOfTotalVnodes == 0) { + pDnode->numOfTotalVnodes = maxVnodes; } + if (pDnode->alternativeRole == TSDB_DNODE_ROLE_MGMT) { - maxVnodes = 0; + pDnode->numOfTotalVnodes = 0; } - pDnode->numOfVnodes = maxVnodes; - pDnode->numOfFreeVnodes = maxVnodes; pDnode->openVnodes = 0; pDnode->status = TSDB_DN_STATUS_OFFLINE; } -void mgmtCalcNumOfFreeVnodes(SDnodeObj *pDnode) { - int32_t totalVnodes = 0; - - mTrace("dnode:%s, begin calc free vnodes", taosIpStr(pDnode->privateIp)); - for (int32_t i = 0; i < pDnode->numOfVnodes; ++i) { - SVnodeLoad *pVload = pDnode->vload + i; - if (pVload->vgId != 0) { - mTrace("dnode:%d, calc free vnodes, vnode:%d, status:%d %s, syncstatus:%d %s", - pDnode->dnodeId, pVload->vgId, - pVload->status, taosGetVnodeStatusStr(pVload->status), - pVload->syncStatus, taosGetVnodeSyncStatusStr(pVload->syncStatus)); - totalVnodes++; - } - } - - pDnode->numOfFreeVnodes = pDnode->numOfVnodes - totalVnodes; - mTrace("dnode:%s, numOfVnodes:%d, numOfFreeVnodes:%d, totalVnodes:%d", - taosIpStr(pDnode->privateIp), pDnode->numOfVnodes, pDnode->numOfFreeVnodes, totalVnodes); -} - -void mgmtSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes, int32_t vgId) { - SDnodeObj *pDnode; - - for (int32_t i = 0; i < numOfVnodes; ++i) { - pDnode = mgmtGetDnode(vnodeGid[i].ip); - if (pDnode) { - SVnodeLoad *pVload = pDnode->vload + vnodeGid[i].vnode; - memset(pVload, 0, sizeof(SVnodeLoad)); - //pVload->vnode = vnodeGid[i].vnode; - pVload->vgId = vgId; - mTrace("dnode:%s, vnode:%d add to vgroup:%d", taosIpStr(pDnode->privateIp), vnodeGid[i].vnode, pVload->vgId); - mgmtCalcNumOfFreeVnodes(pDnode); - } else { - mError("dnode:%s, not in dnode DB!!!", taosIpStr(vnodeGid[i].ip)); - } - } -} - -void mgmtUnSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes) { - SDnodeObj *pDnode; - - for (int32_t i = 0; i < numOfVnodes; ++i) { - pDnode = mgmtGetDnode(vnodeGid[i].ip); - if (pDnode) { - SVnodeLoad *pVload = pDnode->vload + vnodeGid[i].vnode; - mTrace("dnode:%s, vnode:%d remove from vgroup:%d", taosIpStr(vnodeGid[i].ip), vnodeGid[i].vnode, pVload->vgId); - memset(pVload, 0, sizeof(SVnodeLoad)); - mgmtCalcNumOfFreeVnodes(pDnode); - } else { - mError("dnode:%s not in dnode DB!!!", taosIpStr(vnodeGid[i].ip)); - } - } -} - bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) { uint32_t status = pDnode->moduleStatus & (1 << moduleType); return status > 0; @@ -338,11 +285,10 @@ static int32_t mgmtGetVnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) pShow->offset[0] = 0; for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; - // TODO: if other thread drop dnode ???? SDnodeObj *pDnode = NULL; if (pShow->payloadLen > 0 ) { uint32_t ip = ip2uint(pShow->payload); - pDnode = mgmtGetDnode(ip); + pDnode = mgmtGetDnodeByIp(ip); if (NULL == pDnode) { return TSDB_CODE_NODE_OFFLINE; } @@ -434,15 +380,14 @@ int32_t mgmtInitDnodes() { return mgmtInitDnodesFp(); } else { tsDnodeObj.dnodeId = 1; - tsDnodeObj.privateIp = inet_addr(tsPrivateIp);; + tsDnodeObj.privateIp = inet_addr(tsPrivateIp); + tsDnodeObj.publicIp = inet_addr(tsPublicIp); tsDnodeObj.createdTime = taosGetTimestampMs(); - tsDnodeObj.lastReboot = taosGetTimestampSec(); + tsDnodeObj.numOfTotalVnodes = tsNumOfTotalVnodes; tsDnodeObj.numOfCores = (uint16_t) tsNumOfCores; - tsDnodeObj.status = TSDB_DN_STATUS_READY; tsDnodeObj.alternativeRole = TSDB_DNODE_ROLE_ANY; - tsDnodeObj.numOfTotalVnodes = tsNumOfTotalVnodes; - tsDnodeObj.thandle = (void *) (1); //hack way - tsDnodeObj.status = TSDB_DN_STATUS_READY; + tsDnodeObj.status = TSDB_DN_STATUS_OFFLINE; + tsDnodeObj.lastReboot = taosGetTimestampSec(); mgmtSetDnodeMaxVnodes(&tsDnodeObj); tsDnodeObj.moduleStatus |= (1 << TSDB_MOD_MGMT); @@ -458,31 +403,30 @@ int32_t mgmtInitDnodes() { void mgmtCleanUpDnodes() { if (mgmtCleanUpDnodesFp) { - mgmtCleanUpDnodesFp(); + (*mgmtCleanUpDnodesFp)(); } } -SDnodeObj *mgmtGetDnode(uint32_t ip) { +SDnodeObj *mgmtGetDnode(int32_t dnodeId) { if (mgmtGetDnodeFp) { - return mgmtGetDnodeFp(ip); - } else { + return (*mgmtGetDnodeFp)(dnodeId); + } + if (dnodeId == 1) { return &tsDnodeObj; } + return NULL; } -SDnodeObj *mgmtGetDnodeByIp(int32_t dnodeId) { +SDnodeObj *mgmtGetDnodeByIp(uint32_t ip) { if (mgmtGetDnodeByIpFp) { - return mgmtGetDnodeByIpFp(dnodeId); + return (*mgmtGetDnodeByIpFp)(ip); } - if (dnodeId != 0) { - return &tsDnodeObj; - } - return NULL; + return &tsDnodeObj; } int32_t mgmtGetDnodesNum() { if (mgmtGetDnodesNumFp) { - return mgmtGetDnodesNumFp(); + return (*mgmtGetDnodesNumFp)(); } else { return 1; } @@ -490,7 +434,7 @@ int32_t mgmtGetDnodesNum() { int32_t mgmtUpdateDnode(SDnodeObj *pDnode) { if (mgmtUpdateDnodeFp) { - return mgmtUpdateDnodeFp(pDnode); + return (*mgmtUpdateDnodeFp)(pDnode); } else { return 0; } @@ -498,7 +442,7 @@ int32_t mgmtUpdateDnode(SDnodeObj *pDnode) { void *mgmtGetNextDnode(SShowObj *pShow, SDnodeObj **pDnode) { if (mgmtGetNextDnodeFp) { - return mgmtGetNextDnodeFp(pShow, pDnode); + return (*mgmtGetNextDnodeFp)(pShow, pDnode); } else { if (*pDnode == NULL) { *pDnode = &tsDnodeObj; @@ -512,14 +456,12 @@ void *mgmtGetNextDnode(SShowObj *pShow, SDnodeObj **pDnode) { void mgmtSetDnodeUnRemove(SDnodeObj *pDnode) { if (mgmtSetDnodeUnRemoveFp) { - mgmtSetDnodeUnRemoveFp(pDnode); + (*mgmtSetDnodeUnRemoveFp)(pDnode); } } bool mgmtCheckConfigShow(SGlobalConfig *cfg) { - if (cfg->cfgType & TSDB_CFG_CTYPE_B_CLUSTER) - return false; - if (cfg->cfgType & TSDB_CFG_CTYPE_B_NOT_PRINT) + if (!(cfg->cfgType & TSDB_CFG_CTYPE_B_SHOW)) return false; return true; } @@ -583,7 +525,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { if (pStatus->dnodeId == 0) { pDnode = mgmtGetDnodeByIp(pStatus->privateIp); if (pDnode == NULL) { - mTrace("dnode not created in cluster, privateIp:%s, name:%s, ", taosIpStr(htonl(pStatus->dnodeId)), pStatus->dnodeName); + mTrace("dnode not created, privateIp:%s, name:%s, ", taosIpStr(htonl(pStatus->dnodeId)), pStatus->dnodeName); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_DNODE_NOT_EXIST); return; } @@ -603,7 +545,6 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { pDnode->publicIp = htonl(pStatus->publicIp); pDnode->lastReboot = htonl(pStatus->lastReboot); pDnode->numOfTotalVnodes = htons(pStatus->numOfTotalVnodes); - pDnode->openVnodes = htons(pStatus->openVnodes); pDnode->numOfCores = htons(pStatus->numOfCores); pDnode->diskAvailable = pStatus->diskAvailable; pDnode->alternativeRole = pStatus->alternativeRole; @@ -619,48 +560,21 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { //mgmtUpdateMnodeIp(); } - for (int32_t j = 0; j < pDnode->openVnodes; ++j) { - pStatus->load[j].vgId = htonl(pStatus->load[j].vgId); - pStatus->load[j].totalStorage = htobe64(pStatus->load[j].totalStorage); - pStatus->load[j].compStorage = htobe64(pStatus->load[j].compStorage); - pStatus->load[j].pointsWritten = htobe64(pStatus->load[j].pointsWritten); + int32_t openVnodes = htons(pStatus->openVnodes); + for (int32_t j = 0; j < openVnodes; ++j) { + pDnode->vload[j].vgId = htonl(pStatus->load[j].vgId); + pDnode->vload[j].totalStorage = htobe64(pStatus->load[j].totalStorage); + pDnode->vload[j].compStorage = htobe64(pStatus->load[j].compStorage); + pDnode->vload[j].pointsWritten = htobe64(pStatus->load[j].pointsWritten); - bool existInMnode = false; - for (int32_t vnode = 0; vnode < pDnode->numOfVnodes; ++vnode) { - SVnodeLoad *pVload = &(pDnode->vload[vnode]); - if (pVload->vgId == pStatus->load[j].vgId) { - existInMnode = true; - } - } - - if (!existInMnode) { + SVgObj *pVgroup = mgmtGetVgroup(pStatus->load[j].vgId); + if (pVgroup == NULL) { SRpcIpSet ipSet = mgmtGetIpSetFromIp(pDnode->privateIp); mPrint("dnode:%d, vnode:%d not exist in mnode, drop it", pDnode->dnodeId, pStatus->load[j].vgId); mgmtSendDropVnodeMsg(pStatus->load[j].vgId, &ipSet, NULL); } } - for (int32_t vnode = 0; vnode < pDnode->numOfVnodes; ++vnode) { - SVnodeLoad *pVload = &(pDnode->vload[vnode]); - - bool existInDnode = false; - for (int32_t j = 0; j < pDnode->openVnodes; ++j) { - if (htonl(pStatus->load[j].vgId) == pVload->vgId) { - existInDnode = true; - break; - } - } - - if (!existInDnode) { - mPrint("dnode:%d, vnode:%d not exist in dnode, create it", pDnode->dnodeId, pVload->vgId); - SVgObj *pVgroup = mgmtGetVgroup(pVload->vgId); - if (pVgroup != NULL) { - SRpcIpSet ipSet = mgmtGetIpSetFromIp(pDnode->privateIp); - mgmtSendCreateVnodeMsg(pVgroup, &ipSet, NULL); - } - } - } - if (pDnode->status != TSDB_DN_STATUS_READY) { mTrace("dnode:%d, from offline to online", pDnode->dnodeId); pDnode->status = TSDB_DN_STATUS_READY; diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index 799d123ac3cdce6cdde6dcc15480531da444463e..ca138bf42de4352ba4398a8b45bf9a2a0a4c39e7 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -540,7 +540,7 @@ int32_t mgmtGetNormalTableMeta(SDbObj *pDb, SNormalTableObj *pTable, STableMeta if (usePublicIp) { pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp; } else { - pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].ip; + pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].privateIp; } pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); pMeta->vpeerDesc[i].vgId = htonl(pVgroup->vgId); diff --git a/src/mnode/src/mgmtSuperTable.c b/src/mnode/src/mgmtSuperTable.c index a4539f7b9f2b6200e7b4aecdaaa9113902f16743..b0d506980a3cd63e1fe54900920846ba86ca3921 100644 --- a/src/mnode/src/mgmtSuperTable.c +++ b/src/mnode/src/mgmtSuperTable.c @@ -65,8 +65,6 @@ static void mgmtSuperTableActionInit() { mgmtSuperTableActionFp[SDB_TYPE_DECODE] = mgmtSuperTableActionDecode; mgmtSuperTableActionFp[SDB_TYPE_RESET] = mgmtSuperTableActionReset; mgmtSuperTableActionFp[SDB_TYPE_DESTROY] = mgmtSuperTableActionDestroy; - mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_METRIC, mgmtGetShowSuperTableMeta); - mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_METRIC, mgmtRetrieveShowSuperTables); } void *mgmtSuperTableActionReset(void *row, char *str, int32_t size, int32_t *ssize) { diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index abf1d9162c216e7fe14b6081abc2c9bcbaaa3e65..6001c97a92296506708dcab874d8ad44fb7530b8 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -52,6 +52,8 @@ static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *queueMsg); static void mgmtProcessSuperTableMetaMsg(SQueuedMsg *queueMsg); static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg); static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg); +static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg); +static void mgmtProcessDropStableRsp(SRpcMsg *rpcMsg); static int32_t mgmtGetShowTableMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); static void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle); @@ -84,8 +86,8 @@ int32_t mgmtInitTables() { mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mgmtRetrieveShowTables); mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP, mgmtProcessCreateTableRsp); mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_TABLE_RSP, mgmtProcessDropTableRsp); - mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP, NULL); - mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_STABLE_RSP, NULL); + mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP, mgmtProcessAlterTableRsp); + mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_STABLE_RSP, mgmtProcessDropStableRsp); return TSDB_CODE_SUCCESS; } @@ -109,17 +111,15 @@ STableInfo* mgmtGetTable(char *tableId) { return NULL; } -STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid) { - SDnodeObj *pObj = mgmtGetDnode(dnodeIp); - if (pObj != NULL && vnode >= 0 && vnode < pObj->numOfVnodes) { - int32_t vgId = pObj->vload[vnode].vgId; - SVgObj *pVgroup = mgmtGetVgroup(vgId); - if (pVgroup) { - return pVgroup->tableList[sid]; - } +STableInfo* mgmtGetTableByPos(uint32_t dnodeId, int32_t vnode, int32_t sid) { + SDnodeObj *pObj = mgmtGetDnode(dnodeId); + SVgObj *pVgroup = mgmtGetVgroup(vnode); + + if (pObj == NULL || pVgroup == NULL) { + return NULL; } - return NULL; + return pVgroup->tableList[sid]; } int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, bool usePublicIp) { @@ -587,7 +587,7 @@ void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) { void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle) { SRpcMsg rpcRsp = {.handle = thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SDbObj* pDb = mgmtGetDbByTableId(pTable->tableId); - if (pDb == NULL || pDb->dropStatus != TSDB_DB_STATUS_READY) { + if (pDb == NULL || pDb->dirty) { mError("table:%s, failed to get table meta, db not selected", pTable->tableId); rpcRsp.code = TSDB_CODE_DB_NOT_SELECTED; rpcSendResponse(&rpcRsp); @@ -783,6 +783,10 @@ static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) { free(queueMsg); } +static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg) { + mTrace("alter table rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); +} + static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) { if (rpcMsg->handle == NULL) return; @@ -831,3 +835,37 @@ static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) { mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SUCCESS); free(queueMsg); } + +static void mgmtProcessDropStableRsp(SRpcMsg *rpcMsg) { + mTrace("drop stable rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); +} + +// +// +//static void mgmtProcessTableCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle) { +// SDMConfigTableMsg *pCfg = (SDMConfigTableMsg *) pCont; +// pCfg->dnode = htonl(pCfg->dnode); +// pCfg->vnode = htonl(pCfg->vnode); +// pCfg->sid = htonl(pCfg->sid); +// mTrace("dnode:%s, vnode:%d, sid:%d, receive table config msg", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid); +// +// if (!sdbMaster) { +// mError("dnode:%s, vnode:%d, sid:%d, not master, redirect it", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid); +// mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_REDIRECT, NULL, 0); +// return; +// } +// +// STableInfo *pTable = mgmtGetTableByPos(pCfg->dnode, pCfg->vnode, pCfg->sid); +// if (pTable == NULL) { +// mError("dnode:%s, vnode:%d, sid:%d, table not found", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid); +// mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_INVALID_TABLE, NULL, 0); +// return; +// } +// +// mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0); +// +// //TODO +// SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode); +// mgmtSendCreateTableMsg(NULL, &ipSet, NULL); +//} +// \ No newline at end of file diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 7faaed1d8097284f1176e0ac6813931c265b5e52..96854ab9c047f91932c1a2d7c6bba61593c14454 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -111,11 +111,11 @@ int32_t mgmtInitVgroups() { if (tsIsCluster && pVgroup->vnodeGid[0].publicIp == 0) { pVgroup->vnodeGid[0].publicIp = inet_addr(tsPublicIp); - pVgroup->vnodeGid[0].ip = inet_addr(tsPrivateIp); + pVgroup->vnodeGid[0].privateIp = inet_addr(tsPrivateIp); sdbUpdateRow(tsVgroupSdb, pVgroup, tsVgUpdateSize, 1); } - mgmtSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes, pVgroup->vgId); + // mgmtSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes, pVgroup->vgId); } mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_VGROUP, mgmtGetVgroupMeta); @@ -131,9 +131,6 @@ SVgObj *mgmtGetVgroup(int32_t vgId) { return (SVgObj *)sdbGetRow(tsVgroupSdb, &vgId); } -/* - * TODO: check if there is enough sids - */ SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb) { return pDb->pHead; } @@ -162,13 +159,13 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg) { pVgroup->idPool = taosInitIdPool(pDb->cfg.maxSessions); mgmtAddVgroupIntoDb(pDb, pVgroup); - mgmtSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes, pVgroup->vgId); + // mgmtSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes, pVgroup->vgId); sdbInsertRow(tsVgroupSdb, pVgroup, 0); mPrint("vgroup:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { - mPrint("vgroup:%d, dnode:%s vnode:%d", pVgroup->vgId, taosIpStr(pVgroup->vnodeGid[i].ip), pVgroup->vnodeGid[i].vnode); + mPrint("vgroup:%d, dnode:%d vnode:%d", pVgroup->vgId, taosIpStr(pVgroup->vnodeGid[i].dnodeId), pVgroup->vnodeGid[i].vnode); } pMsg->ahandle = pVgroup; @@ -305,9 +302,9 @@ int32_t mgmtGetVgroupMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { } char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode) { - SDnodeObj *pDnode = mgmtGetDnode(pVnode->ip); + SDnodeObj *pDnode = mgmtGetDnode(pVnode->dnodeId); if (pDnode == NULL) { - mError("dnode:%s, vgroup:%d, vnode:%d dnode not exist", taosIpStr(pVnode->ip), pVgroup->vgId, pVnode->vnode); + mError("vgroup:%d, not exist in dnode:%d", pVgroup->vgId, pDnode->dnodeId); return "null"; } @@ -315,14 +312,13 @@ char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode) { return "offline"; } - SVnodeLoad *vload = pDnode->vload + pVnode->vnode; - if (vload->vgId != pVgroup->vgId) { - mError("dnode:%s, vgroup:%d, not same with dnode vgroup:%d", - taosIpStr(pVnode->ip), pVgroup->vgId, vload->vgId); - return "null"; + for (int i = 0; i < pDnode->openVnodes; ++i) { + if (pDnode->vload[i].vgId == pVgroup->vgId) { + return (char*)taosGetVnodeStatusStr(pDnode->vload[i].status); + } } - - return (char*)taosGetVnodeStatusStr(vload->status); + + return "null"; } int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn) { @@ -362,7 +358,7 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo cols++; for (int32_t i = 0; i < maxReplica; ++i) { - tinet_ntoa(ipstr, pVgroup->vnodeGid[i].ip); + tinet_ntoa(ipstr, pVgroup->vnodeGid[i].privateIp); pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; strcpy(pWrite, ipstr); cols++; @@ -372,7 +368,7 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - if (pVgroup->vnodeGid[i].ip != 0) { + if (pVgroup->vnodeGid[i].dnodeId != 0) { char *vnodeStatus = mgmtGetVnodeStatus(pVgroup, pVgroup->vnodeGid + i); strcpy(pWrite, vnodeStatus); } else { @@ -394,6 +390,11 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo } static void *mgmtVgroupActionInsert(void *row, char *str, int32_t size, int32_t *ssize) { + SVgObj *pVgroup = row; + for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { + pVgroup->vnodeGid[i].vnode = pVgroup->vgId; + } + return NULL; } @@ -405,7 +406,7 @@ static void *mgmtVgroupActionDelete(void *row, char *str, int32_t size, int32_t mgmtRemoveVgroupFromDb(pDb, pVgroup); } - mgmtUnSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes); + // mgmtUnSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes); tfree(pVgroup->tableList); return NULL; @@ -515,7 +516,7 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) { SVnodeDesc *vpeerDesc = pVnode->vpeerDesc; for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) { vpeerDesc[j].vgId = htonl(pVgroup->vgId); - vpeerDesc[j].ip = htonl(pVgroup->vnodeGid[j].ip); + vpeerDesc[j].ip = htonl(pVgroup->vnodeGid[j].privateIp); } return pVnode; @@ -542,7 +543,7 @@ SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup) { .port = tsDnodeMnodePort }; for (int i = 0; i < pVgroup->numOfVnodes; ++i) { - ipSet.ip[i] = pVgroup->vnodeGid[i].ip; + ipSet.ip[i] = pVgroup->vnodeGid[i].privateIp; } return ipSet; } @@ -573,7 +574,7 @@ void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) { void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) { mTrace("vgroup:%d, send create all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle); for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { - SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip); + SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].privateIp); mgmtSendCreateVnodeMsg(pVgroup, &ipSet, ahandle); } } @@ -637,7 +638,7 @@ void mgmtSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle) { static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) { mTrace("vgroup:%d, send drop all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle); for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { - SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip); + SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].privateIp); mgmtSendDropVnodeMsg(pVgroup->vgId, &ipSet, ahandle); } } @@ -686,12 +687,36 @@ void mgmtUpdateVgroupIp(SDnodeObj *pDnode) { SVnodeGid *vnodeGid = pVgroup->vnodeGid + i; if (vnodeGid->dnodeId == pDnode->dnodeId) { mPrint("vgroup:%d, dnode:%d, privateIp:%s change to %s, publicIp:%s change to %s", - pVgroup->vgId, vnodeGid->dnodeId, pDnode->privateIp, taosIpStr(vnodeGid->ip), + pVgroup->vgId, vnodeGid->dnodeId, pDnode->privateIp, taosIpStr(vnodeGid->privateIp), pDnode->publicIp, taosIpStr(vnodeGid->publicIp)); vnodeGid->publicIp = pDnode->publicIp; - vnodeGid->ip = pDnode->privateIp; + vnodeGid->privateIp = pDnode->privateIp; sdbUpdateRow(tsVgroupSdb, pVgroup, tsVgUpdateSize, 1); } } } } + +//static void mgmtProcessVnodeCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *pConn) { +// if (!sdbMaster) { +// mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_REDIRECT, NULL, 0); +// return; +// } +// +// SDMConfigVnodeMsg *pCfg = (SDMConfigVnodeMsg *) pCont; +// pCfg->dnode = htonl(pCfg->dnode); +// pCfg->vnode = htonl(pCfg->vnode); +// +// SVgObj *pVgroup = mgmtGetVgroupByVnode(pCfg->dnode, pCfg->vnode); +// if (pVgroup == NULL) { +// mTrace("dnode:%s, vnode:%d, no vgroup info", taosIpStr(pCfg->dnode), pCfg->vnode); +// mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_NOT_ACTIVE_VNODE, NULL, 0); +// return; +// } +// +// mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0); +// +// SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode); +// mgmtSendCreateVnodeMsg(pVgroup, pCfg->vnode, &ipSet, NULL); +//} +// \ No newline at end of file