/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #define _DEFAULT_SOURCE #include "os.h" #include "taoserror.h" #include "trpc.h" #include "tsync.h" #include "tbalance.h" #include "tutil.h" #include "ttime.h" #include "tsocket.h" #include "tdataformat.h" #include "mnodeDef.h" #include "mnodeInt.h" #include "mnodeMnode.h" #include "mnodeDnode.h" #include "mnodeSdb.h" #include "mnodeShow.h" #include "mnodeUser.h" static void * tsMnodeSdb = NULL; static int32_t tsMnodeUpdateSize = 0; static SRpcIpSet tsMnodeIpSetForShell; static SRpcIpSet tsMnodeIpSetForPeer; static SDMMnodeInfos tsMnodeInfos; static int32_t mnodeGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); #if defined(LINUX) static pthread_rwlock_t tsMnodeLock; #define mnodeMnodeWrLock() pthread_rwlock_wrlock(&tsMnodeLock) #define mnodeMnodeRdLock() pthread_rwlock_rdlock(&tsMnodeLock) #define mnodeMnodeUnLock() pthread_rwlock_unlock(&tsMnodeLock) #define mnodeMnodeInitLock() pthread_rwlock_init(&tsMnodeLock, NULL) #define mnodeMnodeDestroyLock() pthread_rwlock_destroy(&tsMnodeLock) #else static pthread_mutex_t tsMnodeLock; #define mnodeMnodeWrLock() pthread_mutex_lock(&tsMnodeLock) #define mnodeMnodeRdLock() pthread_mutex_lock(&tsMnodeLock) #define mnodeMnodeUnLock() pthread_mutex_unlock(&tsMnodeLock) #define mnodeMnodeInitLock() pthread_mutex_init(&tsMnodeLock, NULL) #define mnodeMnodeDestroyLock() pthread_mutex_destroy(&tsMnodeLock) #endif static int32_t mnodeMnodeActionDestroy(SSdbOper *pOper) { tfree(pOper->pObj); return TSDB_CODE_SUCCESS; } static int32_t mnodeMnodeActionInsert(SSdbOper *pOper) { SMnodeObj *pMnode = pOper->pObj; SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId); if (pDnode == NULL) return TSDB_CODE_DNODE_NOT_EXIST; pDnode->isMgmt = true; mnodeDecDnodeRef(pDnode); return TSDB_CODE_SUCCESS; } static int32_t mnodeMnodeActionDelete(SSdbOper *pOper) { SMnodeObj *pMnode = pOper->pObj; SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId); if (pDnode == NULL) return TSDB_CODE_DNODE_NOT_EXIST; pDnode->isMgmt = false; mnodeDecDnodeRef(pDnode); mTrace("mnode:%d, is dropped from sdb", pMnode->mnodeId); return TSDB_CODE_SUCCESS; } static int32_t mnodeMnodeActionUpdate(SSdbOper *pOper) { SMnodeObj *pMnode = pOper->pObj; SMnodeObj *pSaved = mnodeGetMnode(pMnode->mnodeId); if (pMnode != pSaved) { memcpy(pSaved, pMnode, pOper->rowSize); free(pMnode); } mnodeDecMnodeRef(pSaved); return TSDB_CODE_SUCCESS; } static int32_t mnodeMnodeActionEncode(SSdbOper *pOper) { SMnodeObj *pMnode = pOper->pObj; memcpy(pOper->rowData, pMnode, tsMnodeUpdateSize); pOper->rowSize = tsMnodeUpdateSize; return TSDB_CODE_SUCCESS; } static int32_t mnodeMnodeActionDecode(SSdbOper *pOper) { SMnodeObj *pMnode = calloc(1, sizeof(SMnodeObj)); if (pMnode == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY; memcpy(pMnode, pOper->rowData, tsMnodeUpdateSize); pOper->pObj = pMnode; return TSDB_CODE_SUCCESS; } static int32_t mnodeMnodeActionRestored() { if (mnodeGetMnodesNum() == 1) { SMnodeObj *pMnode = NULL; void *pIter = mnodeGetNextMnode(NULL, &pMnode); if (pMnode != NULL) { pMnode->role = TAOS_SYNC_ROLE_MASTER; mnodeDecMnodeRef(pMnode); } sdbFreeIter(pIter); } mnodeUpdateMnodeIpSet(); return TSDB_CODE_SUCCESS; } int32_t mnodeInitMnodes() { mnodeMnodeInitLock(); SMnodeObj tObj; tsMnodeUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj; SSdbTableDesc tableDesc = { .tableId = SDB_TABLE_MNODE, .tableName = "mnodes", .hashSessions = TSDB_DEFAULT_MNODES_HASH_SIZE, .maxRowSize = tsMnodeUpdateSize, .refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj, .keyType = SDB_KEY_INT, .insertFp = mnodeMnodeActionInsert, .deleteFp = mnodeMnodeActionDelete, .updateFp = mnodeMnodeActionUpdate, .encodeFp = mnodeMnodeActionEncode, .decodeFp = mnodeMnodeActionDecode, .destroyFp = mnodeMnodeActionDestroy, .restoredFp = mnodeMnodeActionRestored }; tsMnodeSdb = sdbOpenTable(&tableDesc); if (tsMnodeSdb == NULL) { mError("failed to init mnodes data"); return -1; } mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_MNODE, mnodeGetMnodeMeta); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_MNODE, mnodeRetrieveMnodes); mTrace("table:mnodes table is created"); return TSDB_CODE_SUCCESS; } void mnodeCleanupMnodes() { sdbCloseTable(tsMnodeSdb); mnodeMnodeDestroyLock(); } int32_t mnodeGetMnodesNum() { return sdbGetNumOfRows(tsMnodeSdb); } void *mnodeGetMnode(int32_t mnodeId) { return sdbGetRow(tsMnodeSdb, &mnodeId); } void mnodeIncMnodeRef(SMnodeObj *pMnode) { sdbIncRef(tsMnodeSdb, pMnode); } void mnodeDecMnodeRef(SMnodeObj *pMnode) { sdbDecRef(tsMnodeSdb, pMnode); } void *mnodeGetNextMnode(void *pIter, SMnodeObj **pMnode) { return sdbFetchRow(tsMnodeSdb, pIter, (void **)pMnode); } char *mnodeGetMnodeRoleStr(int32_t role) { switch (role) { case TAOS_SYNC_ROLE_OFFLINE: return "offline"; case TAOS_SYNC_ROLE_UNSYNCED: return "unsynced"; case TAOS_SYNC_ROLE_SLAVE: return "slave"; case TAOS_SYNC_ROLE_MASTER: return "master"; default: return "undefined"; } } void mnodeUpdateMnodeIpSet() { mPrint("update mnodes ipset, numOfIps:%d ", mnodeGetMnodesNum()); mnodeMnodeWrLock(); memset(&tsMnodeIpSetForShell, 0, sizeof(SRpcIpSet)); memset(&tsMnodeIpSetForPeer, 0, sizeof(SRpcIpSet)); memset(&tsMnodeInfos, 0, sizeof(SDMMnodeInfos)); int32_t index = 0; void * pIter = NULL; while (1) { SMnodeObj *pMnode = NULL; pIter = mnodeGetNextMnode(pIter, &pMnode); if (pMnode == NULL) break; SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId); if (pDnode != NULL) { strcpy(tsMnodeIpSetForShell.fqdn[index], pDnode->dnodeFqdn); tsMnodeIpSetForShell.port[index] = htons(pDnode->dnodePort); mTrace("mnode:%d, for shell fqdn:%s %d", pDnode->dnodeId, tsMnodeIpSetForShell.fqdn[index], htons(tsMnodeIpSetForShell.port[index])); strcpy(tsMnodeIpSetForPeer.fqdn[index], pDnode->dnodeFqdn); tsMnodeIpSetForPeer.port[index] = htons(pDnode->dnodePort + TSDB_PORT_DNODEDNODE); mTrace("mnode:%d, for peer fqdn:%s %d", pDnode->dnodeId, tsMnodeIpSetForPeer.fqdn[index], htons(tsMnodeIpSetForPeer.port[index])); tsMnodeInfos.nodeInfos[index].nodeId = htonl(pMnode->mnodeId); strcpy(tsMnodeInfos.nodeInfos[index].nodeEp, pDnode->dnodeEp); if (pMnode->role == TAOS_SYNC_ROLE_MASTER) { tsMnodeIpSetForShell.inUse = index; tsMnodeIpSetForPeer.inUse = index; tsMnodeInfos.inUse = index; } mPrint("mnode:%d, ep:%s %s", pDnode->dnodeId, pDnode->dnodeEp, pMnode->role == TAOS_SYNC_ROLE_MASTER ? "master" : ""); index++; } mnodeDecDnodeRef(pDnode); mnodeDecMnodeRef(pMnode); } tsMnodeInfos.nodeNum = index; tsMnodeIpSetForShell.numOfIps = index; tsMnodeIpSetForPeer.numOfIps = index; sdbFreeIter(pIter); mnodeMnodeUnLock(); } void mnodeGetMnodeIpSetForPeer(SRpcIpSet *ipSet) { mnodeMnodeRdLock(); *ipSet = tsMnodeIpSetForPeer; mnodeMnodeUnLock(); } void mnodeGetMnodeIpSetForShell(SRpcIpSet *ipSet) { mnodeMnodeRdLock(); *ipSet = tsMnodeIpSetForShell; mnodeMnodeUnLock(); } void mnodeGetMnodeInfos(void *mnodeInfos) { mnodeMnodeRdLock(); *(SDMMnodeInfos *)mnodeInfos = tsMnodeInfos; mnodeMnodeUnLock(); } int32_t mnodeAddMnode(int32_t dnodeId) { SMnodeObj *pMnode = calloc(1, sizeof(SMnodeObj)); pMnode->mnodeId = dnodeId; pMnode->createdTime = taosGetTimestampMs(); SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsMnodeSdb, .pObj = pMnode, }; int32_t code = sdbInsertRow(&oper); if (code != TSDB_CODE_SUCCESS) { tfree(pMnode); code = TSDB_CODE_SDB_ERROR; } mnodeUpdateMnodeIpSet(); return code; } void mnodeDropMnodeLocal(int32_t dnodeId) { SMnodeObj *pMnode = mnodeGetMnode(dnodeId); if (pMnode != NULL) { SSdbOper oper = {.type = SDB_OPER_LOCAL, .table = tsMnodeSdb, .pObj = pMnode}; sdbDeleteRow(&oper); mnodeDecMnodeRef(pMnode); } mnodeUpdateMnodeIpSet(); } int32_t mnodeDropMnode(int32_t dnodeId) { SMnodeObj *pMnode = mnodeGetMnode(dnodeId); if (pMnode == NULL) { return TSDB_CODE_DNODE_NOT_EXIST; } SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsMnodeSdb, .pObj = pMnode }; int32_t code = sdbDeleteRow(&oper); if (code != TSDB_CODE_SUCCESS) { code = TSDB_CODE_SDB_ERROR; } sdbDecRef(tsMnodeSdb, pMnode); mnodeUpdateMnodeIpSet(); return code; } static int32_t mnodeGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { sdbUpdateMnodeRoles(); SUserObj *pUser = mnodeGetUserFromConn(pConn); if (pUser == NULL) return 0; if (strcmp(pUser->pAcct->user, "root") != 0) { mnodeDecUserRef(pUser); return TSDB_CODE_NO_RIGHTS; } int32_t cols = 0; SSchema *pSchema = pMeta->schema; pShow->bytes[cols] = 2; pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; strcpy(pSchema[cols].name, "id"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 40 + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "end_point"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 12 + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "role"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; strcpy(pSchema[cols].name, "create_time"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; pMeta->numOfColumns = htons(cols); pShow->numOfColumns = cols; pShow->offset[0] = 0; for (int32_t i = 1; i < cols; ++i) { pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; } pShow->numOfRows = mnodeGetMnodesNum(); pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->pIter = NULL; mnodeDecUserRef(pUser); return 0; } static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn) { int32_t numOfRows = 0; int32_t cols = 0; SMnodeObj *pMnode = NULL; char *pWrite; while (numOfRows < rows) { pShow->pIter = mnodeGetNextMnode(pShow->pIter, &pMnode); if (pMnode == NULL) break; cols = 0; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; *(int16_t *)pWrite = pMnode->mnodeId; cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId); if (pDnode != NULL) { STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDnode->dnodeEp, pShow->bytes[cols] - VARSTR_HEADER_SIZE); } else { STR_WITH_MAXSIZE_TO_VARSTR(pWrite, "invalid ep", pShow->bytes[cols] - VARSTR_HEADER_SIZE); } mnodeDecDnodeRef(pDnode); cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; char* roles = mnodeGetMnodeRoleStr(pMnode->role); STR_TO_VARSTR(pWrite, roles); cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; *(int64_t *)pWrite = pMnode->createdTime; cols++; numOfRows++; mnodeDecMnodeRef(pMnode); } pShow->numOfReads += numOfRows; return numOfRows; }