From a00be5e3e83452f3a572266bcc2462d9ba18775f Mon Sep 17 00:00:00 2001 From: slguan Date: Fri, 6 Mar 2020 17:06:02 +0800 Subject: [PATCH] refact code for cluster --- src/inc/sdb.h | 15 ------------- src/inc/taosmsg.h | 11 ---------- src/mnode/inc/mgmtBalance.h | 1 - src/mnode/inc/mgmtDnode.h | 2 ++ src/mnode/src/mgmtDb.c | 4 ++-- src/mnode/src/mgmtDnode.c | 3 +++ src/mnode/src/mgmtDnodeInt.c | 2 +- src/mnode/src/mgmtShell.c | 1 - src/mnode/src/mgmtSystem.c | 4 ---- src/mnode/src/mgmtVgroup.c | 21 ++++++++++++++++++ src/sdb/inc/sdbint.h | 41 +++++++++++++++--------------------- src/sdb/src/sdbEngine.c | 9 ++++---- src/sdb/src/sdbstr.c | 2 +- 13 files changed, 51 insertions(+), 65 deletions(-) diff --git a/src/inc/sdb.h b/src/inc/sdb.h index d0239522a9..4b4de1ac4b 100644 --- a/src/inc/sdb.h +++ b/src/inc/sdb.h @@ -25,18 +25,10 @@ extern "C" { extern uint16_t tsMgmtMgmtPort; extern uint16_t tsMgmtSyncPort; -extern int sdbMaxNodes; extern int tsMgmtPeerHBTimer; // seconds -extern char sdbZone[]; -extern char sdbMasterIp[]; -extern char sdbPrivateIp[]; extern char * sdbStatusStr[]; extern char * sdbRoleStr[]; -extern void * mnodeSdb; -extern int sdbExtConns; extern int sdbMaster; -extern uint32_t sdbPublicIp; -extern uint32_t sdbMasterStartTime; extern SRpcIpSet *pSdbIpList; extern SRpcIpSet *pSdbPublicIpList; @@ -89,14 +81,9 @@ typedef struct { // internal int syncFd; void *hbTimer; - void *thandle; void *pSync; } SSdbPeer; -SSdbPeer *sdbAddPeer(uint32_t ip, uint32_t publicIp, char role); - -void sdbUpdateIpList(); - extern SSdbPeer *sdbPeer[]; #define sdbInited (sdbPeer[0]) #define sdbStatus (sdbPeer[0]->status) @@ -130,8 +117,6 @@ int sdbInitPeers(char *directory); void sdbCleanUpPeers(); -int sdbCfgNode(char *cont); - int64_t sdbGetVersion(); int32_t sdbGetRunStatus(); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 7914c6cc86..bf03187010 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -600,17 +600,6 @@ typedef struct { SVnodeAccess vnodeAccess[]; } SStatusRsp; -// internal message -typedef struct { - uint32_t destId; - uint32_t destIp; - char tableId[TSDB_UNI_LEN + 1]; - char empty[3]; - uint8_t msgType; - int32_t msgLen; - uint8_t content[0]; -} SIntMsg; - typedef struct { char spi; char encrypt; diff --git a/src/mnode/inc/mgmtBalance.h b/src/mnode/inc/mgmtBalance.h index 61331b9b3d..7a6bb3a9aa 100644 --- a/src/mnode/inc/mgmtBalance.h +++ b/src/mnode/inc/mgmtBalance.h @@ -22,7 +22,6 @@ extern "C" { #include "mnode.h" -void mgmtStartBalanceTimer(int64_t mseconds); int32_t mgmtInitBalance(); void mgmtCleanupBalance(); int32_t mgmtAllocVnodes(SVgObj *pVgroup); diff --git a/src/mnode/inc/mgmtDnode.h b/src/mnode/inc/mgmtDnode.h index 193d4544a1..6532c98612 100644 --- a/src/mnode/inc/mgmtDnode.h +++ b/src/mnode/inc/mgmtDnode.h @@ -58,6 +58,8 @@ SDnodeObj* mgmtGetDnode(uint32_t ip); extern int32_t (*mgmtCreateDnodeFp)(uint32_t ip); extern int32_t (*mgmtDropDnodeByIpFp)(uint32_t ip); +void mgmtCalcNumOfFreeVnodes(SDnodeObj *pDnode); + #ifdef __cplusplus } #endif diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index bedd51dcff..0186f51e16 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -455,9 +455,9 @@ int32_t mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) { // // SVgObj *pVgroup = pDb->pHead; // while (pVgroup != NULL) { -// mgmtUpdateVgroupState(pVgroup, TSDB_VG_LB_STATUS_UPDATE, 0); +// balanceUpdateVgroupState(pVgroup, TSDB_VG_LB_STATUS_UPDATE, 0); // if (oldReplicaNum < pDb->cfg.replications) { -// if (!mgmtAddVnode(pVgroup, NULL, NULL)) { +// if (!balanceAddVnode(pVgroup, NULL, NULL)) { // mWarn("db:%s vgroup:%d not enough dnode to add vnode", pAlter->db, pVgroup->vgId); // code = TSDB_CODE_NO_ENOUGH_DNODES; // } diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index 8fea669ea0..a9c784eca9 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -609,6 +609,9 @@ void *mgmtGetNextDnode(SShowObj *pShow, SDnodeObj **pDnode) { int32_t mgmtGetScoresMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { if (mgmtGetScoresMetaFp) { + SUserObj *pUser = mgmtGetUserFromConn(pConn); + if (pUser == NULL) return 0; + if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS; return mgmtGetScoresMetaFp(pMeta, pShow, pConn); } else { return TSDB_CODE_OPS_NOT_SUPPORT; diff --git a/src/mnode/src/mgmtDnodeInt.c b/src/mnode/src/mgmtDnodeInt.c index 5a365f220a..100e76b10b 100644 --- a/src/mnode/src/mgmtDnodeInt.c +++ b/src/mnode/src/mgmtDnodeInt.c @@ -312,7 +312,7 @@ int32_t mgmtCfgDynamicOptions(SDnodeObj *pDnode, char *msg) { mTrace("dnode:%s, custom score set from:%d to:%d", taosIpStr(pDnode->privateIp), pDnode->customScore, score); pDnode->customScore = score; mgmtUpdateDnode(pDnode); - mgmtStartBalanceTimer(15); + //mgmtStartBalanceTimer(15); } return TSDB_CODE_INVALID_SQL; } else if (strncasecmp(option, "bandwidth", 9) == 0) { diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 5c3b540916..a889c16f75 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -51,7 +51,6 @@ static void (*mgmtProcessShellMsg[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contL static void mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle); static int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); -uint32_t mgmtAccessSquence = 0; void *tsShellConnServer = NULL; void mgmtProcessTranRequest(SSchedMsg *sched) { diff --git a/src/mnode/src/mgmtSystem.c b/src/mnode/src/mgmtSystem.c index e36788a5dd..bcb5f64a7b 100644 --- a/src/mnode/src/mgmtSystem.c +++ b/src/mnode/src/mgmtSystem.c @@ -61,10 +61,6 @@ int32_t mgmtCheckMgmtRunning() { tsetModuleStatus(TSDB_MOD_MGMT); -// strcpy(sdbMasterIp, mgmtIpStr[0]); -// strcpy(sdbPrivateIp, tsPrivateIp); -// sdbPublicIp = inet_addr(tsPublicIp); - return 0; } diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 10b4244bf8..b0ff80a819 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -318,6 +318,27 @@ int32_t mgmtGetVgroupMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { return 0; } +char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode) { + SDnodeObj *pDnode = mgmtGetDnode(pVnode->ip); + if (pDnode == NULL) { + mError("dnode:%s, vgroup:%d, vnode:%d dnode not exist", taosIpStr(pVnode->ip), pVgroup->vgId, pVnode->vnode); + return "null"; + } + + if (pDnode->status == TSDB_DN_STATUS_OFFLINE) { + return "offline"; + } + + SVnodeLoad *vload = pDnode->vload + pVnode->vnode; + if (vload->vgId != pVgroup->vgId || vload->vnode != pVnode->vnode) { + mError("dnode:%s, vgroup:%d, vnode:%d not same with dnode vgroup:%d vnode:%d", + taosIpStr(pVnode->ip), pVgroup->vgId, pVnode->vnode, vload->vgId, vload->vnode); + return "null"; + } + + return (char*)taosGetVnodeStatusStr(vload->status); +} + int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn) { int32_t numOfRows = 0; SVgObj *pVgroup = NULL; diff --git a/src/sdb/inc/sdbint.h b/src/sdb/inc/sdbint.h index d0977f2e2f..8bb28b100e 100644 --- a/src/sdb/inc/sdbint.h +++ b/src/sdb/inc/sdbint.h @@ -48,6 +48,21 @@ #define sdbPrint(...) \ { tprintf("MND-SDB ", 255, __VA_ARGS__); } +#define mpeerError(...) \ + if (sdbDebugFlag & DEBUG_ERROR) { \ + tprintf("ERROR MND-MPEER ", 255, __VA_ARGS__); \ + } +#define mpeerWarn(...) \ + if (sdbDebugFlag & DEBUG_WARN) { \ + tprintf("WARN MND-MPEER ", sdbDebugFlag, __VA_ARGS__); \ + } +#define mpeerTrace(...) \ + if (sdbDebugFlag & DEBUG_TRACE) { \ + tprintf("MND-MPEER ", sdbDebugFlag, __VA_ARGS__); \ + } +#define mpeerPrint(...) \ + { tprintf("MND-MPEER ", 255, __VA_ARGS__); } + #define sdbLError(...) taosLogError(__VA_ARGS__) sdbError(__VA_ARGS__) #define sdbLWarn(...) taosLogWarn(__VA_ARGS__) sdbWarn(__VA_ARGS__) #define sdbLPrint(...) taosLogPrint(__VA_ARGS__) sdbPrint(__VA_ARGS__) @@ -69,11 +84,6 @@ typedef struct { char *row; } SSdbUpdate; -typedef struct { - char numOfTables; - uint64_t version[]; -} SSdbSync; - typedef struct { SSdbHeader header; int maxRows; @@ -109,23 +119,6 @@ typedef struct { char data[]; } SRowHead; -typedef struct { - char * buffer; - char * offset; - int trans; - int bufferSize; - pthread_mutex_t qmutex; -} STranQueue; - -typedef struct { - char status; - char role; - char numOfMnodes; - uint64_t dbVersion; - uint32_t numOfDnodes; - uint32_t publicIp; -} SMnodeStatus; - typedef struct { uint8_t dbId; char type; @@ -139,8 +132,8 @@ extern int sdbMaxPeers; extern int sdbNumOfTables; extern int64_t sdbVersion; -int sdbForwardDbReqToPeer(SSdbTable *pTable, char type, char *data, int dataLen); -int sdbRetrieveRows(int fd, SSdbTable *pTable, uint64_t version); +int mpeerForwardDbReqToPeer(SSdbTable *pTable, char type, char *data, int dataLen); +int mpeerRetrieveRows(int fd, SSdbTable *pTable, uint64_t version); void sdbResetTable(SSdbTable *pTable); extern const int16_t sdbFileVersion; diff --git a/src/sdb/src/sdbEngine.c b/src/sdb/src/sdbEngine.c index 4b000a30eb..0196ff084f 100644 --- a/src/sdb/src/sdbEngine.c +++ b/src/sdb/src/sdbEngine.c @@ -23,7 +23,6 @@ extern char version[]; const int16_t sdbFileVersion = 0; -int sdbExtConns = 0; SRpcIpSet *pSdbIpList = NULL; SRpcIpSet *pSdbPublicIpList = NULL; SSdbPeer * sdbPeer[SDB_MAX_PEERS]; // first slot for self @@ -431,7 +430,7 @@ int64_t sdbInsertRow(void *handle, void *row, int rowSize) { pthread_mutex_lock(&pTable->mutex); - if (sdbForwardDbReqToPeer(pTable, SDB_TYPE_INSERT, rowHead->data, rowHead->rowSize) == 0) { + if (mpeerForwardDbReqToPeer(pTable, SDB_TYPE_INSERT, rowHead->data, rowHead->rowSize) == 0) { pTable->id++; sdbVersion++; if (pTable->keyType == SDB_KEYTYPE_AUTO) { @@ -548,7 +547,7 @@ int sdbDeleteRow(void *handle, void *row) { pthread_mutex_lock(&pTable->mutex); - if (sdbForwardDbReqToPeer(pTable, SDB_TYPE_DELETE, (char *)row, rowSize) == 0) { + if (mpeerForwardDbReqToPeer(pTable, SDB_TYPE_DELETE, (char *)row, rowSize) == 0) { pTable->id++; sdbVersion++; @@ -666,7 +665,7 @@ int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated) { pthread_mutex_lock(&pTable->mutex); - if (sdbForwardDbReqToPeer(pTable, SDB_TYPE_UPDATE, rowHead->data, rowHead->rowSize) == 0) { + if (mpeerForwardDbReqToPeer(pTable, SDB_TYPE_UPDATE, rowHead->data, rowHead->rowSize) == 0) { pTable->id++; sdbVersion++; @@ -745,7 +744,7 @@ int sdbBatchUpdateRow(void *handle, void *row, int rowSize) { } pthread_mutex_lock(&pTable->mutex); - if (sdbForwardDbReqToPeer(pTable, SDB_TYPE_BATCH_UPDATE, row, rowSize) == 0) { + if (mpeerForwardDbReqToPeer(pTable, SDB_TYPE_BATCH_UPDATE, row, rowSize) == 0) { /* // write action */ /* write(pTable->fd, &action, sizeof(action)); */ /* pTable->size += sizeof(action); */ diff --git a/src/sdb/src/sdbstr.c b/src/sdb/src/sdbstr.c index 90bf2a3d43..6df779dff7 100644 --- a/src/sdb/src/sdbstr.c +++ b/src/sdb/src/sdbstr.c @@ -24,7 +24,7 @@ char* sdbRoleStr[] = {"unauthed", "undecided", "master", "slave", "null"}; /* * Lite Version sync request is always successful */ -int sdbForwardDbReqToPeer(SSdbTable *pTable, char type, char *data, int dataLen) { +int mpeerForwardDbReqToPeer(SSdbTable *pTable, char type, char *data, int dataLen) { return 0; } -- GitLab