提交 1e33660c 编写于 作者: S slguan

Merge branch 'feature/balance' into feature/sync

# Conflicts:
#	src/vnode/main/inc/vnodeInt.h
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
#include "dnodeRead.h" #include "dnodeRead.h"
#include "dnodeShell.h" #include "dnodeShell.h"
#include "dnodeWrite.h" #include "dnodeWrite.h"
#include "mgmtGrant.h" #include "tgrant.h"
static int32_t dnodeInitSystem(); static int32_t dnodeInitSystem();
static int32_t dnodeInitStorage(); static int32_t dnodeInitStorage();
...@@ -220,7 +220,6 @@ static int32_t dnodeInitStorage() { ...@@ -220,7 +220,6 @@ static int32_t dnodeInitStorage() {
sprintf(tsMnodeDir, "%s/mnode", dataDir); sprintf(tsMnodeDir, "%s/mnode", dataDir);
sprintf(tsVnodeDir, "%s/vnode", dataDir); sprintf(tsVnodeDir, "%s/vnode", dataDir);
sprintf(tsDnodeDir, "%s/dnode", dataDir); sprintf(tsDnodeDir, "%s/dnode", dataDir);
mkdir(tsMnodeDir, 0755);
mkdir(tsVnodeDir, 0755); mkdir(tsVnodeDir, 0755);
mkdir(tsDnodeDir, 0755); mkdir(tsDnodeDir, 0755);
......
...@@ -20,7 +20,6 @@ ...@@ -20,7 +20,6 @@
#include "taosmsg.h" #include "taosmsg.h"
#include "tlog.h" #include "tlog.h"
#include "trpc.h" #include "trpc.h"
#include "tstatus.h"
#include "tsdb.h" #include "tsdb.h"
#include "ttime.h" #include "ttime.h"
#include "ttimer.h" #include "ttimer.h"
......
...@@ -21,7 +21,6 @@ extern "C" { ...@@ -21,7 +21,6 @@ extern "C" {
#endif #endif
#include "os.h" #include "os.h"
#include "taosdef.h" #include "taosdef.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "taoserror.h" #include "taoserror.h"
...@@ -40,7 +39,8 @@ extern "C" { ...@@ -40,7 +39,8 @@ extern "C" {
struct _vg_obj; struct _vg_obj;
struct _db_obj; struct _db_obj;
struct _acctObj; struct _acct_obj;
struct _user_obj;
typedef struct { typedef struct {
int32_t mnodeId; int32_t mnodeId;
...@@ -65,7 +65,7 @@ typedef struct { ...@@ -65,7 +65,7 @@ typedef struct {
void *pSync; void *pSync;
} SMnodeObj; } SMnodeObj;
typedef struct { typedef struct _dnode_obj {
int32_t dnodeId; int32_t dnodeId;
uint32_t privateIp; uint32_t privateIp;
uint32_t publicIp; uint32_t publicIp;
...@@ -79,16 +79,15 @@ typedef struct { ...@@ -79,16 +79,15 @@ typedef struct {
uint16_t slot; uint16_t slot;
uint16_t numOfCores; // from dnode status msg uint16_t numOfCores; // from dnode status msg
int8_t alternativeRole; // from dnode status msg, 0-any, 1-mgmt, 2-dnode int8_t alternativeRole; // from dnode status msg, 0-any, 1-mgmt, 2-dnode
int8_t lbStatus; // set in balance function int8_t status; // set in balance function
float lbScore; // calc in balance function
int32_t customScore; // config by user int32_t customScore; // config by user
char dnodeName[TSDB_DNODE_NAME_LEN + 1]; char dnodeName[TSDB_DNODE_NAME_LEN + 1];
int8_t reserved[15]; int8_t reserved[15];
int8_t updateEnd[1]; int8_t updateEnd[1];
int32_t refCount; int32_t refCount;
SVnodeLoad vload[TSDB_MAX_VNODES]; SVnodeLoad vload[TSDB_MAX_VNODES];
int32_t status;
uint32_t lastReboot; // time stamp for last reboot uint32_t lastReboot; // time stamp for last reboot
float score; // calc in balance function
float diskAvailable; // from dnode status msg float diskAvailable; // from dnode status msg
int16_t diskAvgUsage; // calc from sys.disk int16_t diskAvgUsage; // calc from sys.disk
int16_t cpuAvgUsage; // calc from sys.cpu int16_t cpuAvgUsage; // calc from sys.cpu
...@@ -147,9 +146,9 @@ typedef struct _vg_obj { ...@@ -147,9 +146,9 @@ typedef struct _vg_obj {
int64_t createdTime; int64_t createdTime;
SVnodeGid vnodeGid[TSDB_VNODES_SUPPORT]; SVnodeGid vnodeGid[TSDB_VNODES_SUPPORT];
int32_t numOfVnodes; int32_t numOfVnodes;
int32_t lbIp; int32_t lbDnodeId;
int32_t lbTime; int32_t lbTime;
int8_t lbStatus; int8_t status;
int8_t reserved[14]; int8_t reserved[14];
int8_t updateEnd[1]; int8_t updateEnd[1];
int32_t refCount; int32_t refCount;
...@@ -162,7 +161,7 @@ typedef struct _vg_obj { ...@@ -162,7 +161,7 @@ typedef struct _vg_obj {
typedef struct _db_obj { typedef struct _db_obj {
char name[TSDB_DB_NAME_LEN + 1]; char name[TSDB_DB_NAME_LEN + 1];
int8_t dirty; int8_t status;
int64_t createdTime; int64_t createdTime;
SDbCfg cfg; SDbCfg cfg;
int8_t reserved[15]; int8_t reserved[15];
...@@ -173,7 +172,7 @@ typedef struct _db_obj { ...@@ -173,7 +172,7 @@ typedef struct _db_obj {
int32_t numOfSuperTables; int32_t numOfSuperTables;
SVgObj *pHead; SVgObj *pHead;
SVgObj *pTail; SVgObj *pTail;
struct _acctObj *pAcct; struct _acct_obj *pAcct;
} SDbObj; } SDbObj;
typedef struct _user_obj { typedef struct _user_obj {
...@@ -186,7 +185,7 @@ typedef struct _user_obj { ...@@ -186,7 +185,7 @@ typedef struct _user_obj {
int8_t reserved[13]; int8_t reserved[13];
int8_t updateEnd[1]; int8_t updateEnd[1];
int32_t refCount; int32_t refCount;
struct _acctObj * pAcct; struct _acct_obj * pAcct;
SQqueryList * pQList; // query list SQqueryList * pQList; // query list
SStreamList * pSList; // stream list SStreamList * pSList; // stream list
} SUserObj; } SUserObj;
...@@ -209,7 +208,7 @@ typedef struct { ...@@ -209,7 +208,7 @@ typedef struct {
int8_t accessState; // Checked by mgmt heartbeat message int8_t accessState; // Checked by mgmt heartbeat message
} SAcctInfo; } SAcctInfo;
typedef struct _acctObj { typedef struct _acct_obj {
char user[TSDB_USER_LEN + 1]; char user[TSDB_USER_LEN + 1];
char pass[TSDB_KEY_LEN + 1]; char pass[TSDB_KEY_LEN + 1];
SAcctCfg cfg; SAcctCfg cfg;
......
...@@ -13,31 +13,34 @@ ...@@ -13,31 +13,34 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef TDENGINE_MGMT_ACCT_H #ifndef TDENGINE_ACCT_H
#define TDENGINE_MGMT_ACCT_H #define TDENGINE_ACCT_H
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "mnode.h"
struct _acct_obj;
struct _user_obj;
struct _db_obj;
typedef enum { typedef enum {
TSDB_ACCT_USER, TSDB_ACCT_USER,
TSDB_ACCT_DB, TSDB_ACCT_DB,
TSDB_ACCT_TABLE TSDB_ACCT_TABLE
} EAcctGrantType; } EAcctGrantType;
int32_t acctInit(); int32_t acctInit();
void acctCleanUp(); void acctCleanUp();
SAcctObj *acctGetAcct(char *acctName); void *acctGetAcct(char *acctName);
void acctIncRef(SAcctObj *pAcct); void acctIncRef(struct _acct_obj *pAcct);
void acctDecRef(SAcctObj *pAcct); void acctReleaseAcct(struct _acct_obj *pAcct);
int32_t acctCheck(SAcctObj *pAcct, EAcctGrantType type); int32_t acctCheck(struct _acct_obj *pAcct, EAcctGrantType type);
void acctAddDb(SAcctObj *pAcct, SDbObj *pDb); void acctAddDb(struct _acct_obj *pAcct, struct _db_obj *pDb);
void acctRemoveDb(SAcctObj *pAcct, SDbObj *pDb); void acctRemoveDb(struct _acct_obj *pAcct, struct _db_obj *pDb);
void acctAddUser(SAcctObj *pAcct, SUserObj *pUser); void acctAddUser(struct _acct_obj *pAcct, struct _user_obj *pUser);
void acctRemoveUser(SAcctObj *pAcct, SUserObj *pUser); void acctRemoveUser(struct _acct_obj *pAcct, struct _user_obj *pUser);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -13,18 +13,22 @@ ...@@ -13,18 +13,22 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef TDENGINE_MGMT_BALANCE_H #ifndef TDENGINE_ADMIN_H
#define TDENGINE_MGMT_BALANCE_H #define TDENGINE_ADMIN_H
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "mnode.h"
int32_t mgmtInitBalance(); #include <stdint.h>
void mgmtCleanupBalance(); #include <stdbool.h>
void mgmtBalanceNotify() ;
int32_t mgmtAllocVnodes(SVgObj *pVgroup); void adminInit();
struct _http_server_obj_;
extern void (*adminInitHandleFp)(struct _http_server_obj_* pServer);
extern void (*opInitHandleFp)(struct _http_server_obj_* pServer);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -515,7 +515,7 @@ typedef struct { ...@@ -515,7 +515,7 @@ typedef struct {
int64_t compStorage; int64_t compStorage;
int64_t pointsWritten; int64_t pointsWritten;
uint8_t status; uint8_t status;
uint8_t syncStatus; uint8_t role;
uint8_t accessState; uint8_t accessState;
uint8_t reserved[5]; uint8_t reserved[5];
} SVnodeLoad; } SVnodeLoad;
......
...@@ -13,22 +13,26 @@ ...@@ -13,22 +13,26 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef TDENGINE_MGMT_DNODE_H #ifndef TDENGINE_BALANCE_H
#define TDENGINE_MGMT_DNODE_H #define TDENGINE_BALANCE_H
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "mnode.h"
int32_t mgmtInitDnodes(); #include <stdint.h>
void mgmtCleanUpDnodes(); #include <stdbool.h>
int32_t mgmtGetDnodesNum(); #include <pthread.h>
void * mgmtGetNextDnode(void *pNode, SDnodeObj **pDnode);
void mgmtIncDnodeRef(SDnodeObj *pDnode); struct _db_obj;
void mgmtDecDnodeRef(SDnodeObj *pDnode); struct _vg_obj;
SDnodeObj* mgmtGetDnode(int32_t dnodeId); struct _dnode_obj;
SDnodeObj* mgmtGetDnodeByIp(uint32_t ip);
int32_t balanceInit();
void balanceCleanUp();
void balanceNotify();
int32_t balanceAllocVnodes(struct _vg_obj *pVgroup);
int32_t balanceDropDnode(struct _dnode_obj *pDnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_CLUSTER_H
#define TDENGINE_CLUSTER_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#include <stdbool.h>
#include <pthread.h>
struct _dnode_obj;
enum _TAOS_DN_STATUS {
TAOS_DN_STATUS_OFFLINE,
TAOS_DN_STATUS_DROPPING,
TAOS_DN_STATUS_BALANCING,
TAOS_DN_STATUS_READY
};
int32_t clusterInit();
void clusterCleanUp();
char* clusterGetDnodeStatusStr(int32_t dnodeStatus);
int32_t clusterInitDnodes();
void clusterCleanupDnodes();
int32_t clusterGetDnodesNum();
void * clusterGetNextDnode(void *pNode, struct _dnode_obj **pDnode);
void clusterReleaseDnode(struct _dnode_obj *pDnode);
void * clusterGetDnode(int32_t dnodeId);
void * clusterGetDnodeByIp(uint32_t ip);
void clusterUpdateDnode(struct _dnode_obj *pDnode);
int32_t clusterDropDnode(struct _dnode_obj *pDnode);
#ifdef __cplusplus
}
#endif
#endif
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef TDENGINE_MGMT_GRANT_H #ifndef TDENGINE_GTANT_H
#define TDENGINE_MGMT_GRANT_H #define TDENGINE_GTANT_H
#ifdef __cplusplus #ifdef __cplusplus
"C" { "C" {
......
...@@ -20,6 +20,14 @@ ...@@ -20,6 +20,14 @@
extern "C" { extern "C" {
#endif #endif
typedef enum _VN_STATUS {
TAOS_VN_STATUS_INIT,
TAOS_VN_STATUS_CREATING,
TAOS_VN_STATUS_READY,
TAOS_VN_STATUS_CLOSING,
TAOS_VN_STATUS_DELETING,
} EVnStatus;
typedef struct { typedef struct {
int len; int len;
int code; int code;
...@@ -41,7 +49,7 @@ void* vnodeGetWqueue(int32_t vgId); ...@@ -41,7 +49,7 @@ void* vnodeGetWqueue(int32_t vgId);
void* vnodeGetWal(void *pVnode); void* vnodeGetWal(void *pVnode);
void* vnodeGetTsdb(void *pVnode); void* vnodeGetTsdb(void *pVnode);
int32_t vnodeProcessWrite(void *pVnode, int qtype, SWalHead *pHead, void *item); int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item);
void vnodeBuildStatusMsg(void * param); void vnodeBuildStatusMsg(void * param);
int32_t vnodeProcessRead(void *pVnode, int msgType, void *pCont, int32_t contLen, SRspRet *ret); int32_t vnodeProcessRead(void *pVnode, int msgType, void *pCont, int32_t contLen, SRspRet *ret);
......
...@@ -22,13 +22,18 @@ extern "C" { ...@@ -22,13 +22,18 @@ extern "C" {
#include "mnode.h" #include "mnode.h"
enum _TSDB_DB_STATUS {
TSDB_DB_STATUS_READY,
TSDB_DB_STATUS_DROPPING
};
// api // api
int32_t mgmtInitDbs(); int32_t mgmtInitDbs();
void mgmtCleanUpDbs(); void mgmtCleanUpDbs();
SDbObj *mgmtGetDb(char *db); SDbObj *mgmtGetDb(char *db);
SDbObj *mgmtGetDbByTableId(char *db); SDbObj *mgmtGetDbByTableId(char *db);
void mgmtIncDbRef(SDbObj *pDb); void mgmtIncDbRef(SDbObj *pDb);
void mgmtDecDbRef(SDbObj *pDb); void mgmtReleaseDb(SDbObj *pDb);
bool mgmtCheckIsMonitorDB(char *db, char *monitordb); bool mgmtCheckIsMonitorDB(char *db, char *monitordb);
void mgmtDropAllDbs(SAcctObj *pAcct); void mgmtDropAllDbs(SAcctObj *pAcct);
......
...@@ -20,6 +20,19 @@ ...@@ -20,6 +20,19 @@
extern "C" { extern "C" {
#endif #endif
enum _TSDB_MN_STATUS {
TSDB_MN_STATUS_OFFLINE,
TSDB_MN_STATUS_UNSYNCED,
TSDB_MN_STATUS_SYNCING,
TSDB_MN_STATUS_SERVING
};
enum _TSDB_MN_ROLE {
TSDB_MN_ROLE_UNDECIDED,
TSDB_MN_ROLE_SLAVE,
TSDB_MN_ROLE_MASTER
};
int32_t mgmtInitMnodes(); int32_t mgmtInitMnodes();
void mgmtCleanupMnodes(); void mgmtCleanupMnodes();
......
...@@ -24,8 +24,7 @@ extern "C" { ...@@ -24,8 +24,7 @@ extern "C" {
int32_t mgmtInitUsers(); int32_t mgmtInitUsers();
void mgmtCleanUpUsers(); void mgmtCleanUpUsers();
SUserObj *mgmtGetUser(char *name); SUserObj *mgmtGetUser(char *name);
void mgmtIncUserRef(SUserObj *pUser); void mgmtReleaseUser(SUserObj *pUser);
void mgmtDecUserRef(SUserObj *pUser);
SUserObj *mgmtGetUserFromConn(void *pConn, bool *usePublicIp); SUserObj *mgmtGetUserFromConn(void *pConn, bool *usePublicIp);
int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass); int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass);
void mgmtDropAllUsers(SAcctObj *pAcct); void mgmtDropAllUsers(SAcctObj *pAcct);
......
...@@ -24,13 +24,20 @@ extern "C" { ...@@ -24,13 +24,20 @@ extern "C" {
#include <stdbool.h> #include <stdbool.h>
#include "mnode.h" #include "mnode.h"
enum _TSDB_VG_STATUS {
TSDB_VG_STATUS_READY,
TSDB_VG_STATUS_UPDATE
};
int32_t mgmtInitVgroups(); int32_t mgmtInitVgroups();
void mgmtCleanUpVgroups(); void mgmtCleanUpVgroups();
SVgObj *mgmtGetVgroup(int32_t vgId); SVgObj *mgmtGetVgroup(int32_t vgId);
void mgmtIncVgroupRef(SVgObj *pVgroup); void mgmtReleaseVgroup(SVgObj *pVgroup);
void mgmtDecVgroupRef(SVgObj *pVgroup);
void mgmtDropAllVgroups(SDbObj *pDropDb); void mgmtDropAllVgroups(SDbObj *pDropDb);
void * mgmtGetNextVgroup(void *pNode, SVgObj **pVgroup);
void mgmtUpdateVgroup(SVgObj *pVgroup);
void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb); void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb);
void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle); void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle);
void mgmtAlterVgroup(SVgObj *pVgroup, void *ahandle); void mgmtAlterVgroup(SVgObj *pVgroup, void *ahandle);
......
...@@ -17,9 +17,10 @@ ...@@ -17,9 +17,10 @@
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "mnode.h" #include "mnode.h"
#include "mgmtAcct.h" #include "taccount.h"
#include "mgmtDb.h" #include "mgmtDb.h"
#include "mgmtUser.h" #include "mgmtUser.h"
#ifndef _ACCOUNT #ifndef _ACCOUNT
static SAcctObj tsAcctObj = {0}; static SAcctObj tsAcctObj = {0};
...@@ -30,11 +31,12 @@ int32_t acctInit() { ...@@ -30,11 +31,12 @@ int32_t acctInit() {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void acctCleanUp() {} void acctCleanUp() {}
SAcctObj *acctGetAcct(char *acctName) { return &tsAcctObj; } void *acctGetAcct(char *acctName) { return &tsAcctObj; }
void acctIncRef(SAcctObj *pAcct) {} void acctIncRef(struct _acct_obj *pAcct) {}
void acctDecRef(SAcctObj *pAcct) {} void acctReleaseAcct(SAcctObj *pAcct) {}
int32_t acctCheck(SAcctObj *pAcct, EAcctGrantType type) { return TSDB_CODE_SUCCESS; } int32_t acctCheck(SAcctObj *pAcct, EAcctGrantType type) { return TSDB_CODE_SUCCESS; }
#endif #endif
void acctAddDb(SAcctObj *pAcct, SDbObj *pDb) { void acctAddDb(SAcctObj *pAcct, SDbObj *pDb) {
...@@ -46,7 +48,7 @@ void acctAddDb(SAcctObj *pAcct, SDbObj *pDb) { ...@@ -46,7 +48,7 @@ void acctAddDb(SAcctObj *pAcct, SDbObj *pDb) {
void acctRemoveDb(SAcctObj *pAcct, SDbObj *pDb) { void acctRemoveDb(SAcctObj *pAcct, SDbObj *pDb) {
atomic_sub_fetch_32(&pAcct->acctInfo.numOfDbs, 1); atomic_sub_fetch_32(&pAcct->acctInfo.numOfDbs, 1);
pDb->pAcct = NULL; pDb->pAcct = NULL;
acctIncRef(pAcct); acctReleaseAcct(pAcct);
} }
void acctAddUser(SAcctObj *pAcct, SUserObj *pUser) { void acctAddUser(SAcctObj *pAcct, SUserObj *pUser) {
...@@ -58,5 +60,5 @@ void acctAddUser(SAcctObj *pAcct, SUserObj *pUser) { ...@@ -58,5 +60,5 @@ void acctAddUser(SAcctObj *pAcct, SUserObj *pUser) {
void acctRemoveUser(SAcctObj *pAcct, SUserObj *pUser) { void acctRemoveUser(SAcctObj *pAcct, SUserObj *pUser) {
atomic_sub_fetch_32(&pAcct->acctInfo.numOfUsers, 1); atomic_sub_fetch_32(&pAcct->acctInfo.numOfUsers, 1);
pUser->pAcct = NULL; pUser->pAcct = NULL;
acctIncRef(pAcct); acctReleaseAcct(pAcct);
} }
\ No newline at end of file
...@@ -14,56 +14,34 @@ ...@@ -14,56 +14,34 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "tstatus.h" #include "tbalance.h"
#include "mgmtBalance.h" #include "mnode.h"
#include "mgmtDnode.h" #include "tcluster.h"
#include "mgmtVgroup.h"
extern int32_t balanceInit(); #ifndef _VPEER
extern void balanceCleanUp(); int32_t balanceInit() { return 0; }
extern void balanceNotify(); void balanceCleanUp() {}
extern int32_t balanceAllocVnodes(SVgObj *pVgroup); void balanceNotify() {}
int32_t mgmtInitBalance() { int32_t balanceAllocVnodes(SVgObj *pVgroup) {
#ifdef _VPEER
return balanceInit();
#else
return 0;
#endif
}
void mgmtCleanupBalance() {
#ifdef _VPEER
balanceCleanUp();
#endif
}
void mgmtBalanceNotify() {
#ifdef _VPEER
balanceNotify();
#endif
}
int32_t mgmtAllocVnodes(SVgObj *pVgroup) {
#ifdef _VPEER
return balanceAllocVnodes(pVgroup);
#else
void * pNode = NULL; void * pNode = NULL;
SDnodeObj *pDnode = NULL; SDnodeObj *pDnode = NULL;
SDnodeObj *pSelDnode = NULL; SDnodeObj *pSelDnode = NULL;
float vnodeUsage = 1.0; float vnodeUsage = 1.0;
while (1) { while (1) {
mgmtDecDnodeRef(pDnode); pNode = clusterGetNextDnode(pNode, &pDnode);
pNode = mgmtGetNextDnode(pNode, &pDnode);
if (pDnode == NULL) break; if (pDnode == NULL) break;
if (pDnode->numOfTotalVnodes <= 0) continue;
if (pDnode->openVnodes == pDnode->numOfTotalVnodes) continue;
float usage = (float)pDnode->openVnodes / pDnode->numOfTotalVnodes; if (pDnode->numOfTotalVnodes > 0 && pDnode->openVnodes < pDnode->numOfTotalVnodes) {
if (usage <= vnodeUsage) { float usage = (float)pDnode->openVnodes / pDnode->numOfTotalVnodes;
pSelDnode = pDnode; if (usage <= vnodeUsage) {
vnodeUsage = usage; pSelDnode = pDnode;
vnodeUsage = usage;
}
} }
clusterReleaseDnode(pDnode);
} }
if (pSelDnode == NULL) { if (pSelDnode == NULL) {
...@@ -77,5 +55,6 @@ int32_t mgmtAllocVnodes(SVgObj *pVgroup) { ...@@ -77,5 +55,6 @@ int32_t mgmtAllocVnodes(SVgObj *pVgroup) {
mTrace("dnode:%d, alloc one vnode to vgroup, openVnodes:%d", pSelDnode->dnodeId, pSelDnode->openVnodes); mTrace("dnode:%d, alloc one vnode to vgroup, openVnodes:%d", pSelDnode->dnodeId, pSelDnode->openVnodes);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
#endif
} }
#endif
...@@ -17,15 +17,14 @@ ...@@ -17,15 +17,14 @@
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tsched.h" #include "tsched.h"
#include "tstatus.h"
#include "tsystem.h" #include "tsystem.h"
#include "tutil.h" #include "tutil.h"
#include "dnode.h" #include "dnode.h"
#include "mnode.h" #include "mnode.h"
#include "mgmtBalance.h" #include "tbalance.h"
#include "mgmtDb.h" #include "mgmtDb.h"
#include "mgmtDnode.h" #include "tcluster.h"
#include "mgmtGrant.h" #include "tgrant.h"
#include "mgmtProfile.h" #include "mgmtProfile.h"
#include "mgmtShell.h" #include "mgmtShell.h"
#include "mgmtTable.h" #include "mgmtTable.h"
......
...@@ -18,15 +18,14 @@ ...@@ -18,15 +18,14 @@
#include "taoserror.h" #include "taoserror.h"
#include "trpc.h" #include "trpc.h"
#include "tsched.h" #include "tsched.h"
#include "tstatus.h"
#include "tsystem.h" #include "tsystem.h"
#include "tutil.h" #include "tutil.h"
#include "dnode.h" #include "dnode.h"
#include "mnode.h" #include "mnode.h"
#include "mgmtBalance.h" #include "tbalance.h"
#include "mgmtDb.h" #include "mgmtDb.h"
#include "mgmtDServer.h" #include "mgmtDServer.h"
#include "mgmtGrant.h" #include "tgrant.h"
#include "mgmtProfile.h" #include "mgmtProfile.h"
#include "mgmtShell.h" #include "mgmtShell.h"
#include "mgmtTable.h" #include "mgmtTable.h"
......
...@@ -16,15 +16,14 @@ ...@@ -16,15 +16,14 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tstatus.h"
#include "tutil.h" #include "tutil.h"
#include "name.h" #include "name.h"
#include "mnode.h" #include "mnode.h"
#include "mgmtAcct.h" #include "taccount.h"
#include "mgmtBalance.h" #include "tbalance.h"
#include "mgmtDb.h" #include "mgmtDb.h"
#include "mgmtDnode.h" #include "tcluster.h"
#include "mgmtGrant.h" #include "tgrant.h"
#include "mgmtShell.h" #include "mgmtShell.h"
#include "mgmtMnode.h" #include "mgmtMnode.h"
#include "mgmtProfile.h" #include "mgmtProfile.h"
...@@ -38,7 +37,7 @@ static int32_t tsDbUpdateSize; ...@@ -38,7 +37,7 @@ static int32_t tsDbUpdateSize;
static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate); static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate);
static void mgmtDropDb(SQueuedMsg *newMsg); static void mgmtDropDb(SQueuedMsg *newMsg);
static int32_t mgmtSetDbDirty(SDbObj *pDb); static int32_t mgmtSetDbDropping(SDbObj *pDb);
static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg); static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg);
...@@ -146,11 +145,11 @@ SDbObj *mgmtGetDb(char *db) { ...@@ -146,11 +145,11 @@ SDbObj *mgmtGetDb(char *db) {
return (SDbObj *)sdbGetRow(tsDbSdb, db); return (SDbObj *)sdbGetRow(tsDbSdb, db);
} }
void mgmtIncDbRef(SDbObj *pDb) { void mgmtIncDbRef(SDbObj *pDb) {
return sdbIncRef(tsDbSdb, pDb); return sdbIncRef(tsDbSdb, pDb);
} }
void mgmtDecDbRef(SDbObj *pDb) { void mgmtReleaseDb(SDbObj *pDb) {
return sdbDecRef(tsDbSdb, pDb); return sdbDecRef(tsDbSdb, pDb);
} }
...@@ -289,7 +288,7 @@ static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) { ...@@ -289,7 +288,7 @@ static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) {
SDbObj *pDb = mgmtGetDb(pCreate->db); SDbObj *pDb = mgmtGetDb(pCreate->db);
if (pDb != NULL) { if (pDb != NULL) {
mgmtDecDbRef(pDb); mgmtReleaseDb(pDb);
return TSDB_CODE_DB_ALREADY_EXIST; return TSDB_CODE_DB_ALREADY_EXIST;
} }
...@@ -519,7 +518,7 @@ static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) ...@@ -519,7 +518,7 @@ static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn)
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->numOfRows = pUser->pAcct->acctInfo.numOfDbs; pShow->numOfRows = pUser->pAcct->acctInfo.numOfDbs;
mgmtDecUserRef(pUser); mgmtReleaseUser(pUser);
return 0; return 0;
} }
...@@ -631,15 +630,15 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void * ...@@ -631,15 +630,15 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, pDb->dirty != TSDB_DB_STATUS_READY ? "dropping" : "ready"); strcpy(pWrite, pDb->status != TSDB_DB_STATUS_READY ? "dropping" : "ready");
cols++; cols++;
numOfRows++; numOfRows++;
mgmtDecDbRef(pDb); mgmtReleaseDb(pDb);
} }
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
mgmtDecUserRef(pUser); mgmtReleaseUser(pUser);
return numOfRows; return numOfRows;
} }
...@@ -659,10 +658,10 @@ void mgmtRemoveTableFromDb(SDbObj *pDb) { ...@@ -659,10 +658,10 @@ void mgmtRemoveTableFromDb(SDbObj *pDb) {
atomic_add_fetch_32(&pDb->numOfTables, -1); atomic_add_fetch_32(&pDb->numOfTables, -1);
} }
static int32_t mgmtSetDbDirty(SDbObj *pDb) { static int32_t mgmtSetDbDropping(SDbObj *pDb) {
if (pDb->dirty) return TSDB_CODE_SUCCESS; if (pDb->status) return TSDB_CODE_SUCCESS;
pDb->dirty = true; pDb->status = true;
SSdbOperDesc oper = { SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_GLOBAL, .type = SDB_OPER_TYPE_GLOBAL,
.table = tsDbSdb, .table = tsDbSdb,
...@@ -850,7 +849,7 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) { ...@@ -850,7 +849,7 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) {
return; return;
} }
int32_t code = mgmtSetDbDirty(pDb); int32_t code = mgmtSetDbDropping(pDb);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
mError("db:%s, failed to drop, reason:%s", pDrop->db, tstrerror(code)); mError("db:%s, failed to drop, reason:%s", pDrop->db, tstrerror(code));
mgmtSendSimpleResp(pMsg->thandle, code); mgmtSendSimpleResp(pMsg->thandle, code);
...@@ -881,11 +880,11 @@ void mgmtDropAllDbs(SAcctObj *pAcct) { ...@@ -881,11 +880,11 @@ void mgmtDropAllDbs(SAcctObj *pAcct) {
if (pDb == NULL) break; if (pDb == NULL) break;
if (pDb->pAcct == pAcct) { if (pDb->pAcct == pAcct) {
mgmtSetDbDirty(pDb); mgmtSetDbDropping(pDb);
numOfDbs++; numOfDbs++;
} }
mgmtDecDbRef(pDb); mgmtReleaseDb(pDb);
} }
mTrace("acct:%s, all dbs is is set dirty", pAcct->user, numOfDbs); mTrace("acct:%s, all dbs is is set dirty", pAcct->user, numOfDbs);
} }
\ No newline at end of file
...@@ -16,9 +16,9 @@ ...@@ -16,9 +16,9 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "tmodule.h" #include "tmodule.h"
#include "tstatus.h" #include "tbalance.h"
#include "mgmtBalance.h" #include "tcluster.h"
#include "mgmtDnode.h" #include "mnode.h"
#include "mgmtDClient.h" #include "mgmtDClient.h"
#include "mgmtMnode.h" #include "mgmtMnode.h"
#include "mgmtShell.h" #include "mgmtShell.h"
...@@ -26,52 +26,29 @@ ...@@ -26,52 +26,29 @@
#include "mgmtUser.h" #include "mgmtUser.h"
#include "mgmtVgroup.h" #include "mgmtVgroup.h"
static void mgmtProcessCfgDnodeMsg(SQueuedMsg *pMsg); static void clusterProcessCfgDnodeMsg(SQueuedMsg *pMsg);
static void mgmtProcessCfgDnodeMsgRsp(SRpcMsg *rpcMsg) ; static void clusterProcessCfgDnodeMsgRsp(SRpcMsg *rpcMsg) ;
static void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg); static void clusterProcessDnodeStatusMsg(SRpcMsg *rpcMsg);
static int32_t mgmtGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t clusterGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t clusterRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mgmtGetConfigMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t clusterGetConfigMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t clusterRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t clusterGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t clusterRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mgmtGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t clusterGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t clusterRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
extern int32_t clusterInit();
extern void clusterCleanUp();
extern int32_t clusterGetDnodesNum();
extern void * clusterGetNextDnode(void *pNode, SDnodeObj **pDnode);
extern void clusterIncDnodeRef(SDnodeObj *pDnode);
extern void clusterDecDnodeRef(SDnodeObj *pDnode);
extern SDnodeObj* clusterGetDnode(int32_t dnodeId);
extern SDnodeObj* clusterGetDnodeByIp(uint32_t ip);
#ifndef _CLUSTER #ifndef _CLUSTER
static SDnodeObj tsDnodeObj = {0}; static SDnodeObj tsDnodeObj = {0};
#endif
int32_t mgmtInitDnodes() { int32_t clusterInitDnodes() {
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CONFIG_DNODE, mgmtProcessCfgDnodeMsg);
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP, mgmtProcessCfgDnodeMsgRsp);
mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_STATUS, mgmtProcessDnodeStatusMsg);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_MODULE, mgmtGetModuleMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_MODULE, mgmtRetrieveModules);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_CONFIGS, mgmtGetConfigMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_CONFIGS, mgmtRetrieveConfigs);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_VNODES, mgmtGetVnodeMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_VNODES, mgmtRetrieveVnodes);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_DNODE, mgmtGetDnodeMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_DNODE, mgmtRetrieveDnodes);
#ifdef _CLUSTER
return clusterInit();
#else
tsDnodeObj.dnodeId = 1; tsDnodeObj.dnodeId = 1;
tsDnodeObj.privateIp = inet_addr(tsPrivateIp); tsDnodeObj.privateIp = inet_addr(tsPrivateIp);
tsDnodeObj.publicIp = inet_addr(tsPublicIp); tsDnodeObj.publicIp = inet_addr(tsPublicIp);
tsDnodeObj.createdTime = taosGetTimestampMs(); tsDnodeObj.createdTime = taosGetTimestampMs();
tsDnodeObj.numOfTotalVnodes = tsNumOfTotalVnodes; tsDnodeObj.numOfTotalVnodes = tsNumOfTotalVnodes;
tsDnodeObj.status = TSDB_DN_STATUS_OFFLINE; tsDnodeObj.status = TAOS_DN_STATUS_OFFLINE;
tsDnodeObj.lastReboot = taosGetTimestampSec(); tsDnodeObj.lastReboot = taosGetTimestampSec();
sprintf(tsDnodeObj.dnodeName, "%d", tsDnodeObj.dnodeId); sprintf(tsDnodeObj.dnodeName, "%d", tsDnodeObj.dnodeId);
...@@ -83,69 +60,47 @@ int32_t mgmtInitDnodes() { ...@@ -83,69 +60,47 @@ int32_t mgmtInitDnodes() {
tsDnodeObj.moduleStatus |= (1 << TSDB_MOD_MONITOR); tsDnodeObj.moduleStatus |= (1 << TSDB_MOD_MONITOR);
} }
return 0; return 0;
#endif
} }
void mgmtCleanUpDnodes() { void *clusterGetNextDnode(void *pNode, SDnodeObj **pDnode) {
#ifdef _CLUSTER if (*pDnode == NULL) {
clusterCleanUp(); *pDnode = &tsDnodeObj;
#endif
}
SDnodeObj *mgmtGetDnode(int32_t dnodeId) {
#ifdef _CLUSTER
return clusterGetDnode(dnodeId);
#else
if (dnodeId == 1) {
return &tsDnodeObj;
} else { } else {
return NULL; *pDnode = NULL;
} }
#endif return *pDnode;
}
SDnodeObj *mgmtGetDnodeByIp(uint32_t ip) {
#ifdef _CLUSTER
return clusterGetDnodeByIp(ip);
#else
return &tsDnodeObj;
#endif
} }
int32_t mgmtGetDnodesNum() { void clusterCleanupDnodes() {}
#ifdef _CLUSTER int32_t clusterGetDnodesNum() { return 1; }
return clusterGetDnodesNum(); void * clusterGetDnode(int32_t dnodeId) { return dnodeId == 1 ? &tsDnodeObj : NULL; }
#else void * clusterGetDnodeByIp(uint32_t ip) { return &tsDnodeObj; }
return 1; void clusterReleaseDnode(struct _dnode_obj *pDnode) {}
#endif void clusterUpdateDnode(struct _dnode_obj *pDnode) {}
}
void mgmtIncDnodeRef(SDnodeObj *pDnode) {
#ifdef _CLUSTER
return clusterIncDnodeRef(pDnode);
#endif #endif
}
void mgmtDecDnodeRef(SDnodeObj *pDnode) { int32_t clusterInit() {
#ifdef _CLUSTER mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CONFIG_DNODE, clusterProcessCfgDnodeMsg);
return clusterDecDnodeRef(pDnode); mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP, clusterProcessCfgDnodeMsgRsp);
#endif mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_STATUS, clusterProcessDnodeStatusMsg);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_MODULE, clusterGetModuleMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_MODULE, clusterRetrieveModules);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_CONFIGS, clusterGetConfigMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_CONFIGS, clusterRetrieveConfigs);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_VNODES, clusterGetVnodeMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_VNODES, clusterRetrieveVnodes);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_DNODE, clusterGetDnodeMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_DNODE, clusterRetrieveDnodes);
return clusterInitDnodes();
} }
void * mgmtGetNextDnode(void *pNode, SDnodeObj **pDnode) { void clusterCleanUp() {
#ifdef _CLUSTER clusterCleanupDnodes();
return clusterGetNextDnode(pNode, pDnode);
#else
if (*pDnode == NULL) {
*pDnode = &tsDnodeObj;
} else {
*pDnode = NULL;
}
return *pDnode;
#endif
} }
void mgmtProcessCfgDnodeMsg(SQueuedMsg *pMsg) { void clusterProcessCfgDnodeMsg(SQueuedMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SCMCfgDnodeMsg *pCmCfgDnode = pMsg->pCont; SCMCfgDnodeMsg *pCmCfgDnode = pMsg->pCont;
...@@ -181,11 +136,11 @@ void mgmtProcessCfgDnodeMsg(SQueuedMsg *pMsg) { ...@@ -181,11 +136,11 @@ void mgmtProcessCfgDnodeMsg(SQueuedMsg *pMsg) {
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
static void mgmtProcessCfgDnodeMsgRsp(SRpcMsg *rpcMsg) { static void clusterProcessCfgDnodeMsgRsp(SRpcMsg *rpcMsg) {
mPrint("cfg vnode rsp is received, result:%s", tstrerror(rpcMsg->code)); mPrint("cfg vnode rsp is received, result:%s", tstrerror(rpcMsg->code));
} }
void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { void clusterProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
if (mgmtCheckRedirect(rpcMsg->handle)) return; if (mgmtCheckRedirect(rpcMsg->handle)) return;
SDMStatusMsg *pStatus = rpcMsg->pCont; SDMStatusMsg *pStatus = rpcMsg->pCont;
...@@ -205,14 +160,14 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { ...@@ -205,14 +160,14 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
SDnodeObj *pDnode = NULL; SDnodeObj *pDnode = NULL;
if (pStatus->dnodeId == 0) { if (pStatus->dnodeId == 0) {
pDnode = mgmtGetDnodeByIp(pStatus->privateIp); pDnode = clusterGetDnodeByIp(pStatus->privateIp);
if (pDnode == NULL) { if (pDnode == NULL) {
mTrace("dnode not created, privateIp:%s", taosIpStr(pStatus->privateIp)); mTrace("dnode not created, privateIp:%s", taosIpStr(pStatus->privateIp));
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_DNODE_NOT_EXIST); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_DNODE_NOT_EXIST);
return; return;
} }
} else { } else {
pDnode = mgmtGetDnode(pStatus->dnodeId); pDnode = clusterGetDnode(pStatus->dnodeId);
if (pDnode == NULL) { if (pDnode == NULL) {
mError("dnode:%d, not exist, privateIp:%s", pStatus->dnodeId, taosIpStr(pStatus->privateIp)); mError("dnode:%d, not exist, privateIp:%s", pStatus->dnodeId, taosIpStr(pStatus->privateIp));
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_DNODE_NOT_EXIST); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_DNODE_NOT_EXIST);
...@@ -245,16 +200,16 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { ...@@ -245,16 +200,16 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
mPrint("dnode:%d, vgroup:%d not exist in mnode, drop it", pDnode->dnodeId, pDnode->vload[j].vgId); mPrint("dnode:%d, vgroup:%d not exist in mnode, drop it", pDnode->dnodeId, pDnode->vload[j].vgId);
mgmtSendDropVnodeMsg(pDnode->vload[j].vgId, &ipSet, NULL); mgmtSendDropVnodeMsg(pDnode->vload[j].vgId, &ipSet, NULL);
} }
mgmtDecVgroupRef(pVgroup); mgmtReleaseVgroup(pVgroup);
} }
if (pDnode->status != TSDB_DN_STATUS_READY) { if (pDnode->status == TAOS_DN_STATUS_OFFLINE) {
mTrace("dnode:%d, from offline to online", pDnode->dnodeId); mTrace("dnode:%d, from offline to online", pDnode->dnodeId);
pDnode->status = TSDB_DN_STATUS_READY; pDnode->status = TAOS_DN_STATUS_READY;
mgmtBalanceNotify(); balanceNotify();
} }
mgmtDecDnodeRef(pDnode); clusterReleaseDnode(pDnode);
int32_t contLen = sizeof(SDMStatusRsp) + TSDB_MAX_VNODES * sizeof(SVnodeAccess); int32_t contLen = sizeof(SDMStatusRsp) + TSDB_MAX_VNODES * sizeof(SVnodeAccess);
SDMStatusRsp *pRsp = rpcMallocCont(contLen); SDMStatusRsp *pRsp = rpcMallocCont(contLen);
...@@ -284,7 +239,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { ...@@ -284,7 +239,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
static int32_t mgmtGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { static int32_t clusterGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL); SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL);
if (pUser == NULL) return 0; if (pUser == NULL) return 0;
...@@ -351,16 +306,16 @@ static int32_t mgmtGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo ...@@ -351,16 +306,16 @@ static int32_t mgmtGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
} }
pShow->numOfRows = mgmtGetDnodesNum(); pShow->numOfRows = clusterGetDnodesNum();
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->pNode = NULL; pShow->pNode = NULL;
mgmtDecUserRef(pUser); mgmtReleaseUser(pUser);
return 0; return 0;
} }
static int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn) { static int32_t clusterRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0; int32_t numOfRows = 0;
int32_t cols = 0; int32_t cols = 0;
SDnodeObj *pDnode = NULL; SDnodeObj *pDnode = NULL;
...@@ -368,8 +323,8 @@ static int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, voi ...@@ -368,8 +323,8 @@ static int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, voi
char ipstr[32]; char ipstr[32];
while (numOfRows < rows) { while (numOfRows < rows) {
mgmtDecDnodeRef(pDnode); clusterReleaseDnode(pDnode);
pShow->pNode = mgmtGetNextDnode(pShow->pNode, (SDnodeObj **)&pDnode); pShow->pNode = clusterGetNextDnode(pShow->pNode, (SDnodeObj **)&pDnode);
if (pDnode == NULL) break; if (pDnode == NULL) break;
cols = 0; cols = 0;
...@@ -393,7 +348,7 @@ static int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, voi ...@@ -393,7 +348,7 @@ static int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, voi
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, taosGetDnodeStatusStr(pDnode->status) ); strcpy(pWrite, clusterGetDnodeStatusStr(pDnode->status));
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
...@@ -406,7 +361,7 @@ static int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, voi ...@@ -406,7 +361,7 @@ static int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, voi
#ifdef _VPEER #ifdef _VPEER
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, taosGetDnodeLbStatusStr(pDnode->lbStatus)); strcpy(pWrite, clusterGetDnodeStatusStr(pDnode->status));
cols++; cols++;
#endif #endif
...@@ -422,7 +377,7 @@ static bool clusterCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) { ...@@ -422,7 +377,7 @@ static bool clusterCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) {
return status > 0; return status > 0;
} }
static int32_t mgmtGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { static int32_t clusterGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0; int32_t cols = 0;
SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL); SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL);
...@@ -461,7 +416,7 @@ static int32_t mgmtGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC ...@@ -461,7 +416,7 @@ static int32_t mgmtGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
pShow->numOfRows = 0; pShow->numOfRows = 0;
SDnodeObj *pDnode = NULL; SDnodeObj *pDnode = NULL;
while (1) { while (1) {
pShow->pNode = mgmtGetNextDnode(pShow->pNode, (SDnodeObj **)&pDnode); pShow->pNode = clusterGetNextDnode(pShow->pNode, (SDnodeObj **)&pDnode);
if (pDnode == NULL) break; if (pDnode == NULL) break;
for (int32_t moduleType = 0; moduleType < TSDB_MOD_MAX; ++moduleType) { for (int32_t moduleType = 0; moduleType < TSDB_MOD_MAX; ++moduleType) {
if (clusterCheckModuleInDnode(pDnode, moduleType)) { if (clusterCheckModuleInDnode(pDnode, moduleType)) {
...@@ -472,12 +427,12 @@ static int32_t mgmtGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC ...@@ -472,12 +427,12 @@ static int32_t mgmtGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->pNode = NULL; pShow->pNode = NULL;
mgmtDecUserRef(pUser); mgmtReleaseUser(pUser);
return 0; return 0;
} }
int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pConn) { int32_t clusterRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0; int32_t numOfRows = 0;
SDnodeObj *pDnode = NULL; SDnodeObj *pDnode = NULL;
char * pWrite; char * pWrite;
...@@ -485,8 +440,8 @@ int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pCo ...@@ -485,8 +440,8 @@ int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pCo
char ipstr[20]; char ipstr[20];
while (numOfRows < rows) { while (numOfRows < rows) {
mgmtDecDnodeRef(pDnode); clusterReleaseDnode(pDnode);
pShow->pNode = mgmtGetNextDnode(pShow->pNode, (SDnodeObj **)&pDnode); pShow->pNode = clusterGetNextDnode(pShow->pNode, (SDnodeObj **)&pDnode);
if (pDnode == NULL) break; if (pDnode == NULL) break;
for (int32_t moduleType = 0; moduleType < TSDB_MOD_MAX; ++moduleType) { for (int32_t moduleType = 0; moduleType < TSDB_MOD_MAX; ++moduleType) {
...@@ -506,7 +461,7 @@ int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pCo ...@@ -506,7 +461,7 @@ int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pCo
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, taosGetDnodeStatusStr(pDnode->status) ); strcpy(pWrite, clusterGetDnodeStatusStr(pDnode->status));
cols++; cols++;
numOfRows++; numOfRows++;
...@@ -523,7 +478,7 @@ static bool clusterCheckConfigShow(SGlobalConfig *cfg) { ...@@ -523,7 +478,7 @@ static bool clusterCheckConfigShow(SGlobalConfig *cfg) {
return true; return true;
} }
static int32_t mgmtGetConfigMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { static int32_t clusterGetConfigMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0; int32_t cols = 0;
SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL); SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL);
...@@ -560,12 +515,12 @@ static int32_t mgmtGetConfigMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC ...@@ -560,12 +515,12 @@ static int32_t mgmtGetConfigMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->pNode = NULL; pShow->pNode = NULL;
mgmtDecUserRef(pUser); mgmtReleaseUser(pUser);
return 0; return 0;
} }
static int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, void *pConn) { static int32_t clusterRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0; int32_t numOfRows = 0;
for (int32_t i = tsGlobalConfigNum - 1; i >= 0 && numOfRows < rows; --i) { for (int32_t i = tsGlobalConfigNum - 1; i >= 0 && numOfRows < rows; --i) {
...@@ -612,7 +567,7 @@ static int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, vo ...@@ -612,7 +567,7 @@ static int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, vo
return numOfRows; return numOfRows;
} }
static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { static int32_t clusterGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0; int32_t cols = 0;
SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL); SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL);
if (pUser == NULL) return 0; if (pUser == NULL) return 0;
...@@ -632,12 +587,6 @@ static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo ...@@ -632,12 +587,6 @@ static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = 12;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "sync_status");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols); pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols; pShow->numOfColumns = cols;
...@@ -647,7 +596,7 @@ static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo ...@@ -647,7 +596,7 @@ static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
SDnodeObj *pDnode = NULL; SDnodeObj *pDnode = NULL;
if (pShow->payloadLen > 0 ) { if (pShow->payloadLen > 0 ) {
uint32_t ip = ip2uint(pShow->payload); uint32_t ip = ip2uint(pShow->payload);
pDnode = mgmtGetDnodeByIp(ip); pDnode = clusterGetDnodeByIp(ip);
if (NULL == pDnode) { if (NULL == pDnode) {
return TSDB_CODE_NODE_OFFLINE; return TSDB_CODE_NODE_OFFLINE;
} }
...@@ -664,7 +613,7 @@ static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo ...@@ -664,7 +613,7 @@ static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
pShow->pNode = pDnode; pShow->pNode = pDnode;
} else { } else {
while (true) { while (true) {
pShow->pNode = mgmtGetNextDnode(pShow->pNode, (SDnodeObj **)&pDnode); pShow->pNode = clusterGetNextDnode(pShow->pNode, (SDnodeObj **)&pDnode);
if (pDnode == NULL) break; if (pDnode == NULL) break;
pShow->numOfRows += pDnode->openVnodes; pShow->numOfRows += pDnode->openVnodes;
...@@ -675,13 +624,13 @@ static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo ...@@ -675,13 +624,13 @@ static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
} }
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
mgmtDecDnodeRef(pDnode); clusterReleaseDnode(pDnode);
mgmtDecUserRef(pUser); mgmtReleaseUser(pUser);
return 0; return 0;
} }
static int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn) { static int32_t clusterRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0; int32_t numOfRows = 0;
SDnodeObj *pDnode = NULL; SDnodeObj *pDnode = NULL;
char * pWrite; char * pWrite;
...@@ -707,11 +656,7 @@ static int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, voi ...@@ -707,11 +656,7 @@ static int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, voi
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, taosGetVnodeStatusStr(pVnode->status)); strcpy(pWrite, pVnode->status ? "ready" : "offline");
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, taosGetVnodeSyncStatusStr(pVnode->syncStatus));
cols++; cols++;
numOfRows++; numOfRows++;
...@@ -724,4 +669,14 @@ static int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, voi ...@@ -724,4 +669,14 @@ static int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, voi
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
return numOfRows; return numOfRows;
} }
\ No newline at end of file
char* clusterGetDnodeStatusStr(int32_t dnodeStatus) {
switch (dnodeStatus) {
case TAOS_DN_STATUS_OFFLINE: return "offline";
case TAOS_DN_STATUS_DROPPING: return "dropping";
case TAOS_DN_STATUS_BALANCING: return "balancing";
case TAOS_DN_STATUS_READY: return "ready";
default: return "undefined";
}
}
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tlog.h" #include "tlog.h"
#include "mgmtGrant.h" #include "tgrant.h"
int32_t grantInit() { return TSDB_CODE_SUCCESS; } int32_t grantInit() { return TSDB_CODE_SUCCESS; }
void grantCleanUp() {} void grantCleanUp() {}
......
...@@ -19,13 +19,13 @@ ...@@ -19,13 +19,13 @@
#include "tmodule.h" #include "tmodule.h"
#include "tsched.h" #include "tsched.h"
#include "mnode.h" #include "mnode.h"
#include "mgmtAcct.h" #include "taccount.h"
#include "mgmtBalance.h" #include "tbalance.h"
#include "tcluster.h"
#include "mgmtDb.h" #include "mgmtDb.h"
#include "mgmtDClient.h" #include "mgmtDClient.h"
#include "mgmtDnode.h"
#include "mgmtDServer.h" #include "mgmtDServer.h"
#include "mgmtGrant.h" #include "tgrant.h"
#include "mgmtMnode.h" #include "mgmtMnode.h"
#include "mgmtSdb.h" #include "mgmtSdb.h"
#include "mgmtVgroup.h" #include "mgmtVgroup.h"
...@@ -89,7 +89,7 @@ int32_t mgmtStartSystem() { ...@@ -89,7 +89,7 @@ int32_t mgmtStartSystem() {
return -1; return -1;
} }
if (mgmtInitDnodes() < 0) { if (clusterInit() < 0) {
mError("failed to init dnodes"); mError("failed to init dnodes");
return -1; return -1;
} }
...@@ -122,7 +122,7 @@ int32_t mgmtStartSystem() { ...@@ -122,7 +122,7 @@ int32_t mgmtStartSystem() {
return -1; return -1;
} }
if (mgmtInitBalance() < 0) { if (balanceInit() < 0) {
mError("failed to init dnode balance") mError("failed to init dnode balance")
} }
...@@ -148,14 +148,14 @@ void mgmtCleanUpSystem() { ...@@ -148,14 +148,14 @@ void mgmtCleanUpSystem() {
mPrint("starting to clean up mgmt"); mPrint("starting to clean up mgmt");
grantCleanUp(); grantCleanUp();
mgmtCleanupMnodes(); mgmtCleanupMnodes();
mgmtCleanupBalance(); balanceCleanUp();
mgmtCleanUpShell(); mgmtCleanUpShell();
mgmtCleanupDClient(); mgmtCleanupDClient();
mgmtCleanupDServer(); mgmtCleanupDServer();
mgmtCleanUpTables(); mgmtCleanUpTables();
mgmtCleanUpVgroups(); mgmtCleanUpVgroups();
mgmtCleanUpDbs(); mgmtCleanUpDbs();
mgmtCleanUpDnodes(); clusterCleanUp();
mgmtCleanUpUsers(); mgmtCleanUpUsers();
acctCleanUp(); acctCleanUp();
taosTmrCleanUp(tsMgmtTmr); taosTmrCleanUp(tsMgmtTmr);
......
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tstatus.h"
#include "trpc.h" #include "trpc.h"
#include "mgmtMnode.h" #include "mgmtMnode.h"
#include "mgmtSdb.h" #include "mgmtSdb.h"
...@@ -64,6 +63,25 @@ static void *mgmtGetNextMnode(void *pNode, SMnodeObj **pMnode) { ...@@ -64,6 +63,25 @@ static void *mgmtGetNextMnode(void *pNode, SMnodeObj **pMnode) {
return *pMnode; return *pMnode;
} }
char *taosGetMnodeStatusStr(int32_t mnodeStatus) {
switch (mnodeStatus) {
case TSDB_MN_STATUS_OFFLINE: return "offline";
case TSDB_MN_STATUS_UNSYNCED: return "unsynced";
case TSDB_MN_STATUS_SYNCING: return "syncing";
case TSDB_MN_STATUS_SERVING: return "serving";
default: return "undefined";
}
}
char *taosGetMnodeRoleStr(int32_t mnodeRole) {
switch (mnodeRole) {
case TSDB_MN_ROLE_UNDECIDED: return "undicided";
case TSDB_MN_ROLE_SLAVE: return "slave";
case TSDB_MN_ROLE_MASTER: return "master";
default: return "undefined";
}
}
static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL); SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL);
if (pUser == NULL) return 0; if (pUser == NULL) return 0;
...@@ -120,7 +138,7 @@ static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo ...@@ -120,7 +138,7 @@ static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
pShow->numOfRows = mgmtGetMnodesNum(); pShow->numOfRows = mgmtGetMnodesNum();
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->pNode = NULL; pShow->pNode = NULL;
mgmtDecUserRef(pUser); mgmtReleaseUser(pUser);
return 0; return 0;
} }
......
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "taccount.h"
#include "tcluster.h"
#include "mgmtDb.h" #include "mgmtDb.h"
#include "mgmtMnode.h" #include "mgmtMnode.h"
#include "mgmtProfile.h" #include "mgmtProfile.h"
...@@ -787,12 +789,12 @@ void *mgmtMallocQueuedMsg(SRpcMsg *rpcMsg) { ...@@ -787,12 +789,12 @@ void *mgmtMallocQueuedMsg(SRpcMsg *rpcMsg) {
void mgmtFreeQueuedMsg(SQueuedMsg *pMsg) { void mgmtFreeQueuedMsg(SQueuedMsg *pMsg) {
if (pMsg != NULL) { if (pMsg != NULL) {
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
if (pMsg->pUser) mgmtDecUserRef(pMsg->pUser); if (pMsg->pUser) mgmtReleaseUser(pMsg->pUser);
if (pMsg->pDb) mgmtDecDbRef(pMsg->pDb); if (pMsg->pDb) mgmtReleaseDb(pMsg->pDb);
if (pMsg->pVgroup) mgmtDecVgroupRef(pMsg->pVgroup); if (pMsg->pVgroup) mgmtReleaseVgroup(pMsg->pVgroup);
if (pMsg->pTable) mgmtDecTableRef(pMsg->pTable); if (pMsg->pTable) mgmtDecTableRef(pMsg->pTable);
// if (pMsg->pAcct) acctDecRef(pMsg->pAcct); if (pMsg->pAcct) acctReleaseAcct(pMsg->pAcct);
// if (pMsg->pDnode) mgmtDecTableRef(pMsg->pDnode); if (pMsg->pDnode) clusterReleaseDnode(pMsg->pDnode);
free(pMsg); free(pMsg);
} }
} }
......
...@@ -435,7 +435,7 @@ void sdbIncRef(void *handle, void *pRow) { ...@@ -435,7 +435,7 @@ void sdbIncRef(void *handle, void *pRow) {
SSdbTable *pTable = handle; SSdbTable *pTable = handle;
int32_t *pRefCount = (int32_t *)(pRow + pTable->refCountPos); int32_t *pRefCount = (int32_t *)(pRow + pTable->refCountPos);
atomic_add_fetch_32(pRefCount, 1); atomic_add_fetch_32(pRefCount, 1);
if (0) { if (0 && strcmp(pTable->tableName, "dnodes") == 0) {
sdbTrace("table:%s, add ref to record:%s:%s:%d", pTable->tableName, pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount); sdbTrace("table:%s, add ref to record:%s:%s:%d", pTable->tableName, pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount);
} }
} }
...@@ -446,7 +446,7 @@ void sdbDecRef(void *handle, void *pRow) { ...@@ -446,7 +446,7 @@ void sdbDecRef(void *handle, void *pRow) {
SSdbTable *pTable = handle; SSdbTable *pTable = handle;
int32_t *pRefCount = (int32_t *)(pRow + pTable->refCountPos); int32_t *pRefCount = (int32_t *)(pRow + pTable->refCountPos);
int32_t refCount = atomic_sub_fetch_32(pRefCount, 1); int32_t refCount = atomic_sub_fetch_32(pRefCount, 1);
if (0) { if (0 && strcmp(pTable->tableName, "dnodes") == 0) {
sdbTrace("table:%s, def ref of record:%s:%s:%d", pTable->tableName, pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount); sdbTrace("table:%s, def ref of record:%s:%s:%d", pTable->tableName, pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount);
} }
int8_t* updateEnd = pRow + pTable->refCountPos - 1; int8_t* updateEnd = pRow + pTable->refCountPos - 1;
......
...@@ -19,15 +19,14 @@ ...@@ -19,15 +19,14 @@
#include "taoserror.h" #include "taoserror.h"
#include "tlog.h" #include "tlog.h"
#include "trpc.h" #include "trpc.h"
#include "tstatus.h"
#include "tsched.h" #include "tsched.h"
#include "dnode.h" #include "dnode.h"
#include "mnode.h" #include "mnode.h"
#include "mgmtAcct.h" #include "taccount.h"
#include "mgmtBalance.h" #include "tbalance.h"
#include "mgmtDb.h" #include "mgmtDb.h"
#include "mgmtDnode.h" #include "tcluster.h"
#include "mgmtGrant.h" #include "tgrant.h"
#include "mgmtMnode.h" #include "mgmtMnode.h"
#include "mgmtProfile.h" #include "mgmtProfile.h"
#include "mgmtSdb.h" #include "mgmtSdb.h"
...@@ -179,6 +178,28 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { ...@@ -179,6 +178,28 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
} }
} }
char *mgmtGetShowTypeStr(int32_t showType) {
switch (showType) {
case TSDB_MGMT_TABLE_ACCT: return "show accounts";
case TSDB_MGMT_TABLE_USER: return "show users";
case TSDB_MGMT_TABLE_DB: return "show databases";
case TSDB_MGMT_TABLE_TABLE: return "show tables";
case TSDB_MGMT_TABLE_DNODE: return "show dnodes";
case TSDB_MGMT_TABLE_MNODE: return "show mnodes";
case TSDB_MGMT_TABLE_VGROUP: return "show vgroups";
case TSDB_MGMT_TABLE_METRIC: return "show stables";
case TSDB_MGMT_TABLE_MODULE: return "show modules";
case TSDB_MGMT_TABLE_QUERIES: return "show queries";
case TSDB_MGMT_TABLE_STREAMS: return "show streams";
case TSDB_MGMT_TABLE_CONFIGS: return "show configs";
case TSDB_MGMT_TABLE_CONNS: return "show connections";
case TSDB_MGMT_TABLE_SCORES: return "show scores";
case TSDB_MGMT_TABLE_GRANTS: return "show grants";
case TSDB_MGMT_TABLE_VNODES: return "show vnodes";
default: return "undefined";
}
}
static void mgmtProcessShowMsg(SQueuedMsg *pMsg) { static void mgmtProcessShowMsg(SQueuedMsg *pMsg) {
SCMShowMsg *pShowMsg = pMsg->pCont; SCMShowMsg *pShowMsg = pMsg->pCont;
if (pShowMsg->type >= TSDB_MGMT_TABLE_MAX) { if (pShowMsg->type >= TSDB_MGMT_TABLE_MAX) {
...@@ -187,7 +208,7 @@ static void mgmtProcessShowMsg(SQueuedMsg *pMsg) { ...@@ -187,7 +208,7 @@ static void mgmtProcessShowMsg(SQueuedMsg *pMsg) {
} }
if (!tsMgmtShowMetaFp[pShowMsg->type] || !tsMgmtShowRetrieveFp[pShowMsg->type]) { if (!tsMgmtShowMetaFp[pShowMsg->type] || !tsMgmtShowRetrieveFp[pShowMsg->type]) {
mError("show type:%s is not support", taosGetShowTypeStr(pShowMsg->type)); mError("show type:%s is not support", mgmtGetShowTypeStr(pShowMsg->type));
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_OPS_NOT_SUPPORT); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_OPS_NOT_SUPPORT);
return; return;
} }
...@@ -209,7 +230,7 @@ static void mgmtProcessShowMsg(SQueuedMsg *pMsg) { ...@@ -209,7 +230,7 @@ static void mgmtProcessShowMsg(SQueuedMsg *pMsg) {
mgmtSaveQhandle(pShow); mgmtSaveQhandle(pShow);
pShowRsp->qhandle = htobe64((uint64_t) pShow); pShowRsp->qhandle = htobe64((uint64_t) pShow);
mTrace("show:%p, type:%s, start to get meta", pShow, taosGetShowTypeStr(pShowMsg->type)); mTrace("show:%p, type:%s, start to get meta", pShow, mgmtGetShowTypeStr(pShowMsg->type));
int32_t code = (*tsMgmtShowMetaFp[pShowMsg->type])(&pShowRsp->tableMeta, pShow, pMsg->thandle); int32_t code = (*tsMgmtShowMetaFp[pShowMsg->type])(&pShowRsp->tableMeta, pShow, pMsg->thandle);
if (code == 0) { if (code == 0) {
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
...@@ -220,7 +241,7 @@ static void mgmtProcessShowMsg(SQueuedMsg *pMsg) { ...@@ -220,7 +241,7 @@ static void mgmtProcessShowMsg(SQueuedMsg *pMsg) {
}; };
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} else { } else {
mError("show:%p, type:%s, failed to get meta, reason:%s", pShow, taosGetShowTypeStr(pShowMsg->type), tstrerror(code)); mError("show:%p, type:%s, failed to get meta, reason:%s", pShow, mgmtGetShowTypeStr(pShowMsg->type), tstrerror(code));
mgmtFreeQhandle(pShow); mgmtFreeQhandle(pShow);
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.handle = pMsg->thandle, .handle = pMsg->thandle,
...@@ -248,7 +269,7 @@ static void mgmtProcessRetrieveMsg(SQueuedMsg *pMsg) { ...@@ -248,7 +269,7 @@ static void mgmtProcessRetrieveMsg(SQueuedMsg *pMsg) {
} }
SShowObj *pShow = (SShowObj *)pRetrieve->qhandle; SShowObj *pShow = (SShowObj *)pRetrieve->qhandle;
mTrace("show:%p, type:%s, retrieve data", pShow, taosGetShowTypeStr(pShow->type)); mTrace("show:%p, type:%s, retrieve data", pShow, mgmtGetShowTypeStr(pShow->type));
if (!mgmtCheckQhandle(pRetrieve->qhandle)) { if (!mgmtCheckQhandle(pRetrieve->qhandle)) {
mError("pShow:%p, query memory is corrupted", pShow); mError("pShow:%p, query memory is corrupted", pShow);
...@@ -338,11 +359,11 @@ static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secr ...@@ -338,11 +359,11 @@ static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secr
SUserObj *pUser = mgmtGetUser(user); SUserObj *pUser = mgmtGetUser(user);
if (pUser == NULL) { if (pUser == NULL) {
*secret = 0; *secret = 0;
mgmtDecUserRef(pUser); mgmtReleaseUser(pUser);
return TSDB_CODE_INVALID_USER; return TSDB_CODE_INVALID_USER;
} else { } else {
memcpy(secret, pUser->pass, TSDB_KEY_LEN); memcpy(secret, pUser->pass, TSDB_KEY_LEN);
mgmtDecUserRef(pUser); mgmtReleaseUser(pUser);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
......
...@@ -19,7 +19,6 @@ ...@@ -19,7 +19,6 @@
#include "tscompression.h" #include "tscompression.h"
#include "tskiplist.h" #include "tskiplist.h"
#include "ttime.h" #include "ttime.h"
#include "tstatus.h"
#include "tutil.h" #include "tutil.h"
#include "qast.h" #include "qast.h"
#include "qextbuffer.h" #include "qextbuffer.h"
...@@ -28,15 +27,14 @@ ...@@ -28,15 +27,14 @@
#include "tscompression.h" #include "tscompression.h"
#include "tskiplist.h" #include "tskiplist.h"
#include "tsqlfunction.h" #include "tsqlfunction.h"
#include "tstatus.h"
#include "ttime.h" #include "ttime.h"
#include "name.h" #include "name.h"
#include "mgmtAcct.h" #include "taccount.h"
#include "mgmtDClient.h" #include "mgmtDClient.h"
#include "mgmtDb.h" #include "mgmtDb.h"
#include "mgmtDnode.h" #include "tcluster.h"
#include "mgmtDServer.h" #include "mgmtDServer.h"
#include "mgmtGrant.h" #include "tgrant.h"
#include "mgmtMnode.h" #include "mgmtMnode.h"
#include "mgmtProfile.h" #include "mgmtProfile.h"
#include "mgmtSdb.h" #include "mgmtSdb.h"
...@@ -98,21 +96,21 @@ static int32_t mgmtChildTableActionInsert(SSdbOperDesc *pOper) { ...@@ -98,21 +96,21 @@ static int32_t mgmtChildTableActionInsert(SSdbOperDesc *pOper) {
mError("ctable:%s, not in vgroup:%d", pTable->info.tableId, pTable->vgId); mError("ctable:%s, not in vgroup:%d", pTable->info.tableId, pTable->vgId);
return TSDB_CODE_INVALID_VGROUP_ID; return TSDB_CODE_INVALID_VGROUP_ID;
} }
mgmtDecVgroupRef(pVgroup); mgmtReleaseVgroup(pVgroup);
SDbObj *pDb = mgmtGetDb(pVgroup->dbName); SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
if (pDb == NULL) { if (pDb == NULL) {
mError("ctable:%s, vgroup:%d not in db:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName); mError("ctable:%s, vgroup:%d not in db:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName);
return TSDB_CODE_INVALID_DB; return TSDB_CODE_INVALID_DB;
} }
mgmtDecDbRef(pDb); mgmtReleaseDb(pDb);
SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct); SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct);
if (pAcct == NULL) { if (pAcct == NULL) {
mError("ctable:%s, account:%s not exists", pTable->info.tableId, pDb->cfg.acct); mError("ctable:%s, account:%s not exists", pTable->info.tableId, pDb->cfg.acct);
return TSDB_CODE_INVALID_ACCT; return TSDB_CODE_INVALID_ACCT;
} }
acctDecRef(pAcct); acctReleaseAcct(pAcct);
if (pTable->info.type == TSDB_CHILD_TABLE) { if (pTable->info.type == TSDB_CHILD_TABLE) {
pTable->superTable = mgmtGetSuperTable(pTable->superTableId); pTable->superTable = mgmtGetSuperTable(pTable->superTableId);
...@@ -140,21 +138,21 @@ static int32_t mgmtChildTableActionDelete(SSdbOperDesc *pOper) { ...@@ -140,21 +138,21 @@ static int32_t mgmtChildTableActionDelete(SSdbOperDesc *pOper) {
if (pVgroup == NULL) { if (pVgroup == NULL) {
return TSDB_CODE_INVALID_VGROUP_ID; return TSDB_CODE_INVALID_VGROUP_ID;
} }
mgmtDecVgroupRef(pVgroup); mgmtReleaseVgroup(pVgroup);
SDbObj *pDb = mgmtGetDb(pVgroup->dbName); SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
if (pDb == NULL) { if (pDb == NULL) {
mError("ctable:%s, vgroup:%d not in DB:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName); mError("ctable:%s, vgroup:%d not in DB:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName);
return TSDB_CODE_INVALID_DB; return TSDB_CODE_INVALID_DB;
} }
mgmtDecDbRef(pDb); mgmtReleaseDb(pDb);
SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct); SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct);
if (pAcct == NULL) { if (pAcct == NULL) {
mError("ctable:%s, account:%s not exists", pTable->info.tableId, pDb->cfg.acct); mError("ctable:%s, account:%s not exists", pTable->info.tableId, pDb->cfg.acct);
return TSDB_CODE_INVALID_ACCT; return TSDB_CODE_INVALID_ACCT;
} }
acctDecRef(pAcct); acctReleaseAcct(pAcct);
if (pTable->info.type == TSDB_CHILD_TABLE) { if (pTable->info.type == TSDB_CHILD_TABLE) {
grantRestore(TSDB_GRANT_TIMESERIES, pTable->superTable->numOfColumns - 1); grantRestore(TSDB_GRANT_TIMESERIES, pTable->superTable->numOfColumns - 1);
...@@ -272,7 +270,7 @@ static int32_t mgmtInitChildTables() { ...@@ -272,7 +270,7 @@ static int32_t mgmtInitChildTables() {
pNode = pLastNode; pNode = pLastNode;
continue; continue;
} }
mgmtDecDbRef(pDb); mgmtReleaseDb(pDb);
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) { if (pVgroup == NULL) {
...@@ -286,7 +284,7 @@ static int32_t mgmtInitChildTables() { ...@@ -286,7 +284,7 @@ static int32_t mgmtInitChildTables() {
pNode = pLastNode; pNode = pLastNode;
continue; continue;
} }
mgmtDecVgroupRef(pVgroup); mgmtReleaseVgroup(pVgroup);
if (strcmp(pVgroup->dbName, pDb->name) != 0) { if (strcmp(pVgroup->dbName, pDb->name) != 0) {
mError("ctable:%s, db:%s not match with vgroup:%d db:%s sid:%d, discard it", mError("ctable:%s, db:%s not match with vgroup:%d db:%s sid:%d, discard it",
...@@ -354,7 +352,7 @@ static int32_t mgmtSuperTableActionInsert(SSdbOperDesc *pOper) { ...@@ -354,7 +352,7 @@ static int32_t mgmtSuperTableActionInsert(SSdbOperDesc *pOper) {
if (pDb != NULL) { if (pDb != NULL) {
mgmtAddSuperTableIntoDb(pDb); mgmtAddSuperTableIntoDb(pDb);
} }
mgmtDecDbRef(pDb); mgmtReleaseDb(pDb);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -366,7 +364,7 @@ static int32_t mgmtSuperTableActionDelete(SSdbOperDesc *pOper) { ...@@ -366,7 +364,7 @@ static int32_t mgmtSuperTableActionDelete(SSdbOperDesc *pOper) {
mgmtRemoveSuperTableFromDb(pDb); mgmtRemoveSuperTableFromDb(pDb);
mgmtDropAllChildTablesInStable((SSuperTableObj *)pStable); mgmtDropAllChildTablesInStable((SSuperTableObj *)pStable);
} }
mgmtDecDbRef(pDb); mgmtReleaseDb(pDb);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -554,7 +552,7 @@ static void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { ...@@ -554,7 +552,7 @@ static void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) {
} }
pMsg->pDb = mgmtGetDb(pCreate->db); pMsg->pDb = mgmtGetDb(pCreate->db);
if (pMsg->pDb == NULL || pMsg->pDb->dirty) { if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) {
mError("table:%s, failed to create, db not selected", pCreate->tableId); mError("table:%s, failed to create, db not selected", pCreate->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED);
return; return;
...@@ -572,7 +570,7 @@ static void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { ...@@ -572,7 +570,7 @@ static void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) {
static void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { static void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) {
SCMDropTableMsg *pDrop = pMsg->pCont; SCMDropTableMsg *pDrop = pMsg->pCont;
pMsg->pDb = mgmtGetDbByTableId(pDrop->tableId); pMsg->pDb = mgmtGetDbByTableId(pDrop->tableId);
if (pMsg->pDb == NULL || pMsg->pDb->dirty) { if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) {
mError("table:%s, failed to drop table, db not selected", pDrop->tableId); mError("table:%s, failed to drop table, db not selected", pDrop->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED);
return; return;
...@@ -611,7 +609,7 @@ static void mgmtProcessTableMetaMsg(SQueuedMsg *pMsg) { ...@@ -611,7 +609,7 @@ static void mgmtProcessTableMetaMsg(SQueuedMsg *pMsg) {
mTrace("table:%s, table meta msg is received from thandle:%p", pInfo->tableId, pMsg->thandle); mTrace("table:%s, table meta msg is received from thandle:%p", pInfo->tableId, pMsg->thandle);
pMsg->pDb = mgmtGetDbByTableId(pInfo->tableId); pMsg->pDb = mgmtGetDbByTableId(pInfo->tableId);
if (pMsg->pDb == NULL || pMsg->pDb->dirty) { if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) {
mError("table:%s, failed to get table meta, db not selected", pInfo->tableId); mError("table:%s, failed to get table meta, db not selected", pInfo->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED);
return; return;
...@@ -860,7 +858,7 @@ static int32_t mgmtAddSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, SSc ...@@ -860,7 +858,7 @@ static int32_t mgmtAddSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, SSc
SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct); SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct);
if (pAcct != NULL) { if (pAcct != NULL) {
pAcct->acctInfo.numOfTimeSeries += (ncols * pStable->numOfTables); pAcct->acctInfo.numOfTimeSeries += (ncols * pStable->numOfTables);
acctDecRef(pAcct); acctReleaseAcct(pAcct);
} }
SSdbOperDesc oper = { SSdbOperDesc oper = {
...@@ -897,7 +895,7 @@ static int32_t mgmtDropSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, ch ...@@ -897,7 +895,7 @@ static int32_t mgmtDropSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, ch
SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct); SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct);
if (pAcct != NULL) { if (pAcct != NULL) {
pAcct->acctInfo.numOfTimeSeries -= pStable->numOfTables; pAcct->acctInfo.numOfTimeSeries -= pStable->numOfTables;
acctDecRef(pAcct); acctReleaseAcct(pAcct);
} }
SSdbOperDesc oper = { SSdbOperDesc oper = {
...@@ -963,7 +961,7 @@ static int32_t mgmtGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, ...@@ -963,7 +961,7 @@ static int32_t mgmtGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow,
pShow->numOfRows = pDb->numOfSuperTables; pShow->numOfRows = pDb->numOfSuperTables;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
mgmtDecDbRef(pDb); mgmtReleaseDb(pDb);
return 0; return 0;
} }
...@@ -1028,7 +1026,7 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v ...@@ -1028,7 +1026,7 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v
} }
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
mgmtDecDbRef(pDb); mgmtReleaseDb(pDb);
return numOfRows; return numOfRows;
} }
...@@ -1106,7 +1104,7 @@ static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) { ...@@ -1106,7 +1104,7 @@ static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) {
return; return;
} }
SCMSTableVgroupRspMsg *pRsp = rpcMallocCont(sizeof(SCMSTableVgroupRspMsg) + sizeof(uint32_t) * mgmtGetDnodesNum()); SCMSTableVgroupRspMsg *pRsp = rpcMallocCont(sizeof(SCMSTableVgroupRspMsg) + sizeof(uint32_t) * clusterGetDnodesNum());
if (pRsp == NULL) { if (pRsp == NULL) {
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE);
return; return;
...@@ -1409,7 +1407,7 @@ static int32_t mgmtAddNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, SSc ...@@ -1409,7 +1407,7 @@ static int32_t mgmtAddNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, SSc
SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct); SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct);
if (pAcct != NULL) { if (pAcct != NULL) {
pAcct->acctInfo.numOfTimeSeries += ncols; pAcct->acctInfo.numOfTimeSeries += ncols;
acctDecRef(pAcct); acctReleaseAcct(pAcct);
} }
SSdbOperDesc oper = { SSdbOperDesc oper = {
...@@ -1443,7 +1441,7 @@ static int32_t mgmtDropNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, ch ...@@ -1443,7 +1441,7 @@ static int32_t mgmtDropNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, ch
SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct); SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct);
if (pAcct != NULL) { if (pAcct != NULL) {
pAcct->acctInfo.numOfTimeSeries--; pAcct->acctInfo.numOfTimeSeries--;
acctDecRef(pAcct); acctReleaseAcct(pAcct);
} }
SSdbOperDesc oper = { SSdbOperDesc oper = {
...@@ -1633,7 +1631,7 @@ static void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable) { ...@@ -1633,7 +1631,7 @@ static void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable) {
} }
static SChildTableObj* mgmtGetTableByPos(uint32_t dnodeId, int32_t vnode, int32_t sid) { static SChildTableObj* mgmtGetTableByPos(uint32_t dnodeId, int32_t vnode, int32_t sid) {
SDnodeObj *pObj = mgmtGetDnode(dnodeId); SDnodeObj *pObj = clusterGetDnode(dnodeId);
SVgObj *pVgroup = mgmtGetVgroup(vnode); SVgObj *pVgroup = mgmtGetVgroup(vnode);
if (pObj == NULL || pVgroup == NULL) { if (pObj == NULL || pVgroup == NULL) {
...@@ -1642,7 +1640,7 @@ static SChildTableObj* mgmtGetTableByPos(uint32_t dnodeId, int32_t vnode, int32_ ...@@ -1642,7 +1640,7 @@ static SChildTableObj* mgmtGetTableByPos(uint32_t dnodeId, int32_t vnode, int32_
SChildTableObj *pTable = pVgroup->tableList[sid]; SChildTableObj *pTable = pVgroup->tableList[sid];
mgmtIncTableRef((STableObj *)pTable); mgmtIncTableRef((STableObj *)pTable);
mgmtDecVgroupRef(pVgroup); mgmtReleaseVgroup(pVgroup);
return pTable; return pTable;
} }
...@@ -1863,7 +1861,7 @@ static int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void ...@@ -1863,7 +1861,7 @@ static int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void
pShow->numOfRows = pDb->numOfTables; pShow->numOfRows = pDb->numOfTables;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
mgmtDecDbRef(pDb); mgmtReleaseDb(pDb);
return 0; return 0;
} }
...@@ -1940,7 +1938,7 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, ...@@ -1940,7 +1938,7 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows,
const int32_t NUM_OF_COLUMNS = 4; const int32_t NUM_OF_COLUMNS = 4;
mgmtVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); mgmtVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow);
mgmtDecDbRef(pDb); mgmtReleaseDb(pDb);
return numOfRows; return numOfRows;
} }
...@@ -1950,7 +1948,7 @@ static void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) { ...@@ -1950,7 +1948,7 @@ static void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) {
mTrace("table:%s, alter table msg is received from thandle:%p", pAlter->tableId, pMsg->thandle); mTrace("table:%s, alter table msg is received from thandle:%p", pAlter->tableId, pMsg->thandle);
pMsg->pDb = mgmtGetDbByTableId(pAlter->tableId); pMsg->pDb = mgmtGetDbByTableId(pAlter->tableId);
if (pMsg->pDb == NULL || pMsg->pDb->dirty) { if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) {
mError("table:%s, failed to alter table, db not selected", pAlter->tableId); mError("table:%s, failed to alter table, db not selected", pAlter->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED);
return; return;
......
...@@ -18,8 +18,8 @@ ...@@ -18,8 +18,8 @@
#include "trpc.h" #include "trpc.h"
#include "ttime.h" #include "ttime.h"
#include "tutil.h" #include "tutil.h"
#include "mgmtAcct.h" #include "taccount.h"
#include "mgmtGrant.h" #include "tgrant.h"
#include "mgmtMnode.h" #include "mgmtMnode.h"
#include "mgmtSdb.h" #include "mgmtSdb.h"
#include "mgmtShell.h" #include "mgmtShell.h"
...@@ -117,7 +117,7 @@ int32_t mgmtInitUsers() { ...@@ -117,7 +117,7 @@ int32_t mgmtInitUsers() {
mgmtCreateUser(pAcct, "root", "taosdata"); mgmtCreateUser(pAcct, "root", "taosdata");
mgmtCreateUser(pAcct, "monitor", tsInternalPass); mgmtCreateUser(pAcct, "monitor", tsInternalPass);
mgmtCreateUser(pAcct, "_root", tsInternalPass); mgmtCreateUser(pAcct, "_root", tsInternalPass);
acctDecRef(pAcct); acctReleaseAcct(pAcct);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CREATE_USER, mgmtProcessCreateUserMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CREATE_USER, mgmtProcessCreateUserMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_ALTER_USER, mgmtProcessAlterUserMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_ALTER_USER, mgmtProcessAlterUserMsg);
...@@ -137,11 +137,7 @@ SUserObj *mgmtGetUser(char *name) { ...@@ -137,11 +137,7 @@ SUserObj *mgmtGetUser(char *name) {
return (SUserObj *)sdbGetRow(tsUserSdb, name); return (SUserObj *)sdbGetRow(tsUserSdb, name);
} }
void mgmtIncUserRef(SUserObj *pUser) { void mgmtReleaseUser(SUserObj *pUser) {
return sdbIncRef(tsUserSdb, pUser);
}
void mgmtDecUserRef(SUserObj *pUser) {
return sdbDecRef(tsUserSdb, pUser); return sdbDecRef(tsUserSdb, pUser);
} }
...@@ -174,7 +170,7 @@ int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass) { ...@@ -174,7 +170,7 @@ int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass) {
SUserObj *pUser = mgmtGetUser(name); SUserObj *pUser = mgmtGetUser(name);
if (pUser != NULL) { if (pUser != NULL) {
mTrace("user:%s is already there", name); mTrace("user:%s is already there", name);
mgmtDecUserRef(pUser); mgmtReleaseUser(pUser);
return TSDB_CODE_USER_ALREADY_EXIST; return TSDB_CODE_USER_ALREADY_EXIST;
} }
...@@ -264,7 +260,7 @@ static int32_t mgmtGetUserMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCon ...@@ -264,7 +260,7 @@ static int32_t mgmtGetUserMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCon
pShow->numOfRows = pUser->pAcct->acctInfo.numOfUsers; pShow->numOfRows = pUser->pAcct->acctInfo.numOfUsers;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
mgmtDecUserRef(pUser); mgmtReleaseUser(pUser);
return 0; return 0;
} }
...@@ -299,7 +295,7 @@ static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void ...@@ -299,7 +295,7 @@ static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void
cols++; cols++;
numOfRows++; numOfRows++;
mgmtDecUserRef(pUser); mgmtReleaseUser(pUser);
} }
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
return numOfRows; return numOfRows;
...@@ -351,7 +347,7 @@ static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg) { ...@@ -351,7 +347,7 @@ static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg) {
if (strcmp(pUser->user, "monitor") == 0 || (strcmp(pUser->user + 1, pUser->acct) == 0 && pUser->user[0] == '_')) { if (strcmp(pUser->user, "monitor") == 0 || (strcmp(pUser->user + 1, pUser->acct) == 0 && pUser->user[0] == '_')) {
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS);
mgmtDecUserRef(pUser); mgmtReleaseUser(pUser);
return; return;
} }
...@@ -427,7 +423,7 @@ static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg) { ...@@ -427,7 +423,7 @@ static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg) {
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS);
} }
mgmtDecUserRef(pUser); mgmtReleaseUser(pUser);
} }
static void mgmtProcessDropUserMsg(SQueuedMsg *pMsg) { static void mgmtProcessDropUserMsg(SQueuedMsg *pMsg) {
...@@ -446,7 +442,7 @@ static void mgmtProcessDropUserMsg(SQueuedMsg *pMsg) { ...@@ -446,7 +442,7 @@ static void mgmtProcessDropUserMsg(SQueuedMsg *pMsg) {
if (strcmp(pUser->user, "monitor") == 0 || strcmp(pUser->user, pUser->acct) == 0 || if (strcmp(pUser->user, "monitor") == 0 || strcmp(pUser->user, pUser->acct) == 0 ||
(strcmp(pUser->user + 1, pUser->acct) == 0 && pUser->user[0] == '_')) { (strcmp(pUser->user + 1, pUser->acct) == 0 && pUser->user[0] == '_')) {
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS);
mgmtDecUserRef(pUser); mgmtReleaseUser(pUser);
return ; return ;
} }
...@@ -475,7 +471,7 @@ static void mgmtProcessDropUserMsg(SQueuedMsg *pMsg) { ...@@ -475,7 +471,7 @@ static void mgmtProcessDropUserMsg(SQueuedMsg *pMsg) {
} }
mgmtSendSimpleResp(pMsg->thandle, code); mgmtSendSimpleResp(pMsg->thandle, code);
mgmtDecUserRef(pUser); mgmtReleaseUser(pUser);
} }
void mgmtDropAllUsers(SAcctObj *pAcct) { void mgmtDropAllUsers(SAcctObj *pAcct) {
...@@ -501,7 +497,7 @@ void mgmtDropAllUsers(SAcctObj *pAcct) { ...@@ -501,7 +497,7 @@ void mgmtDropAllUsers(SAcctObj *pAcct) {
numOfUsers++; numOfUsers++;
} }
mgmtDecUserRef(pUser); mgmtReleaseUser(pUser);
} }
mTrace("acct:%s, all users:%d is dropped from sdb", pAcct->user, numOfUsers); mTrace("acct:%s, all users:%d is dropped from sdb", pAcct->user, numOfUsers);
......
...@@ -17,12 +17,11 @@ ...@@ -17,12 +17,11 @@
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tlog.h" #include "tlog.h"
#include "tstatus.h" #include "tbalance.h"
#include "tcluster.h"
#include "mnode.h" #include "mnode.h"
#include "mgmtBalance.h"
#include "mgmtDb.h" #include "mgmtDb.h"
#include "mgmtDClient.h" #include "mgmtDClient.h"
#include "mgmtDnode.h"
#include "mgmtDServer.h" #include "mgmtDServer.h"
#include "mgmtMnode.h" #include "mgmtMnode.h"
#include "mgmtProfile.h" #include "mgmtProfile.h"
...@@ -54,11 +53,11 @@ static int32_t mgmtVgroupActionDestroy(SSdbOperDesc *pOper) { ...@@ -54,11 +53,11 @@ static int32_t mgmtVgroupActionDestroy(SSdbOperDesc *pOper) {
} }
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SDnodeObj *pDnode = mgmtGetDnode(pVgroup->vnodeGid[i].dnodeId); SDnodeObj *pDnode = clusterGetDnode(pVgroup->vnodeGid[i].dnodeId);
if (pDnode) { if (pDnode) {
atomic_sub_fetch_32(&pDnode->openVnodes, 1); atomic_sub_fetch_32(&pDnode->openVnodes, 1);
} }
mgmtDecDnodeRef(pDnode); clusterReleaseDnode(pDnode);
} }
tfree(pOper->pObj); tfree(pOper->pObj);
...@@ -71,7 +70,7 @@ static int32_t mgmtVgroupActionInsert(SSdbOperDesc *pOper) { ...@@ -71,7 +70,7 @@ static int32_t mgmtVgroupActionInsert(SSdbOperDesc *pOper) {
if (pDb == NULL) { if (pDb == NULL) {
return TSDB_CODE_INVALID_DB; return TSDB_CODE_INVALID_DB;
} }
mgmtDecDbRef(pDb); mgmtReleaseDb(pDb);
pVgroup->pDb = pDb; pVgroup->pDb = pDb;
pVgroup->prev = NULL; pVgroup->prev = NULL;
...@@ -92,12 +91,12 @@ static int32_t mgmtVgroupActionInsert(SSdbOperDesc *pOper) { ...@@ -92,12 +91,12 @@ static int32_t mgmtVgroupActionInsert(SSdbOperDesc *pOper) {
} }
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SDnodeObj *pDnode = mgmtGetDnode(pVgroup->vnodeGid[i].dnodeId); SDnodeObj *pDnode = clusterGetDnode(pVgroup->vnodeGid[i].dnodeId);
if (pDnode != NULL) { if (pDnode != NULL) {
pVgroup->vnodeGid[i].privateIp = pDnode->privateIp; pVgroup->vnodeGid[i].privateIp = pDnode->privateIp;
pVgroup->vnodeGid[i].publicIp = pDnode->publicIp; pVgroup->vnodeGid[i].publicIp = pDnode->publicIp;
atomic_add_fetch_32(&pDnode->openVnodes, 1); atomic_add_fetch_32(&pDnode->openVnodes, 1);
mgmtDecDnodeRef(pDnode); clusterReleaseDnode(pDnode);
} }
} }
...@@ -114,7 +113,7 @@ static int32_t mgmtVgroupActionDelete(SSdbOperDesc *pOper) { ...@@ -114,7 +113,7 @@ static int32_t mgmtVgroupActionDelete(SSdbOperDesc *pOper) {
mgmtRemoveVgroupFromDb(pVgroup); mgmtRemoveVgroupFromDb(pVgroup);
} }
mgmtDecDbRef(pVgroup->pDb); mgmtReleaseDb(pVgroup->pDb);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -190,11 +189,7 @@ int32_t mgmtInitVgroups() { ...@@ -190,11 +189,7 @@ int32_t mgmtInitVgroups() {
return 0; return 0;
} }
void mgmtIncVgroupRef(SVgObj *pVgroup) { void mgmtReleaseVgroup(SVgObj *pVgroup) {
return sdbIncRef(tsVgroupSdb, pVgroup);
}
void mgmtDecVgroupRef(SVgObj *pVgroup) {
return sdbDecRef(tsVgroupSdb, pVgroup); return sdbDecRef(tsVgroupSdb, pVgroup);
} }
...@@ -202,16 +197,32 @@ SVgObj *mgmtGetVgroup(int32_t vgId) { ...@@ -202,16 +197,32 @@ SVgObj *mgmtGetVgroup(int32_t vgId) {
return (SVgObj *)sdbGetRow(tsVgroupSdb, &vgId); return (SVgObj *)sdbGetRow(tsVgroupSdb, &vgId);
} }
void mgmtUpdateVgroup(SVgObj *pVgroup) {
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_GLOBAL,
.table = tsVgroupSdb,
.pObj = pVgroup,
.rowSize = tsVgUpdateSize
};
sdbUpdateRow(&oper);
mgmtSendCreateVgroupMsg(pVgroup, NULL);
}
SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb) { SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb) {
return pDb->pHead; return pDb->pHead;
} }
void *mgmtGetNextVgroup(void *pNode, SVgObj **pVgroup) {
return sdbFetchRow(tsVgroupSdb, pNode, (void **)pVgroup);
}
void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb) { void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb) {
SVgObj *pVgroup = (SVgObj *)calloc(1, sizeof(SVgObj)); SVgObj *pVgroup = (SVgObj *)calloc(1, sizeof(SVgObj));
strcpy(pVgroup->dbName, pDb->name); strcpy(pVgroup->dbName, pDb->name);
pVgroup->numOfVnodes = pDb->cfg.replications; pVgroup->numOfVnodes = pDb->cfg.replications;
pVgroup->createdTime = taosGetTimestampMs(); pVgroup->createdTime = taosGetTimestampMs();
if (mgmtAllocVnodes(pVgroup) != 0) { if (balanceAllocVnodes(pVgroup) != 0) {
mError("db:%s, no enough dnode to alloc %d vnodes to vgroup", pDb->name, pVgroup->numOfVnodes); mError("db:%s, no enough dnode to alloc %d vnodes to vgroup", pDb->name, pVgroup->numOfVnodes);
free(pVgroup); free(pVgroup);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_ENOUGH_DNODES); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_ENOUGH_DNODES);
...@@ -302,7 +313,7 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { ...@@ -302,7 +313,7 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
mgmtDecTableRef(pTable); mgmtDecTableRef(pTable);
pVgroup = mgmtGetVgroup(((SChildTableObj*)pTable)->vgId); pVgroup = mgmtGetVgroup(((SChildTableObj*)pTable)->vgId);
if (NULL == pVgroup) return TSDB_CODE_INVALID_TABLE_ID; if (NULL == pVgroup) return TSDB_CODE_INVALID_TABLE_ID;
mgmtDecVgroupRef(pVgroup); mgmtReleaseVgroup(pVgroup);
maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica; maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica;
} else { } else {
SVgObj *pVgroup = pDb->pHead; SVgObj *pVgroup = pDb->pHead;
...@@ -348,26 +359,26 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { ...@@ -348,26 +359,26 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
pShow->pNode = pVgroup; pShow->pNode = pVgroup;
} }
mgmtDecDbRef(pDb); mgmtReleaseDb(pDb);
return 0; return 0;
} }
char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode) { char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode) {
SDnodeObj *pDnode = mgmtGetDnode(pVnode->dnodeId); SDnodeObj *pDnode = clusterGetDnode(pVnode->dnodeId);
if (pDnode == NULL) { if (pDnode == NULL) {
mError("vgroup:%d, not exist in dnode:%d", pVgroup->vgId, pDnode->dnodeId); mError("vgroup:%d, not exist in dnode:%d", pVgroup->vgId, pDnode->dnodeId);
return "null"; return "null";
} }
mgmtDecDnodeRef(pDnode); clusterReleaseDnode(pDnode);
if (pDnode->status == TSDB_DN_STATUS_OFFLINE) { if (pDnode->status == TAOS_DN_STATUS_OFFLINE) {
return "offline"; return "offline";
} }
for (int i = 0; i < pDnode->openVnodes; ++i) { for (int i = 0; i < pDnode->openVnodes; ++i) {
if (pDnode->vload[i].vgId == pVgroup->vgId) { if (pDnode->vload[i].vgId == pVgroup->vgId) {
return (char*)taosGetVnodeStatusStr(pDnode->vload[i].status); return pDnode->vload[i].status ? "ready" : "offline";
} }
} }
...@@ -407,7 +418,7 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo ...@@ -407,7 +418,7 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, taosGetVgroupLbStatusStr(pVgroup->lbStatus)); strcpy(pWrite, pVgroup->status ? "updating" : "ready");
cols++; cols++;
for (int32_t i = 0; i < maxReplica; ++i) { for (int32_t i = 0; i < maxReplica; ++i) {
...@@ -434,7 +445,7 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo ...@@ -434,7 +445,7 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo
} }
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
mgmtDecDbRef(pDb); mgmtReleaseDb(pDb);
return numOfRows; return numOfRows;
} }
...@@ -645,13 +656,13 @@ static void mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) { ...@@ -645,13 +656,13 @@ static void mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) {
pCfg->dnodeId = htonl(pCfg->dnodeId); pCfg->dnodeId = htonl(pCfg->dnodeId);
pCfg->vgId = htonl(pCfg->vgId); pCfg->vgId = htonl(pCfg->vgId);
SDnodeObj *pDnode = mgmtGetDnode(pCfg->dnodeId); SDnodeObj *pDnode = clusterGetDnode(pCfg->dnodeId);
if (pDnode == NULL) { if (pDnode == NULL) {
mTrace("dnode:%s, invalid dnode", taosIpStr(pCfg->dnodeId), pCfg->vgId); mTrace("dnode:%s, invalid dnode", taosIpStr(pCfg->dnodeId), pCfg->vgId);
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NOT_ACTIVE_VNODE); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NOT_ACTIVE_VNODE);
return; return;
} }
mgmtDecDnodeRef(pDnode); clusterReleaseDnode(pDnode);
SVgObj *pVgroup = mgmtGetVgroup(pCfg->vgId); SVgObj *pVgroup = mgmtGetVgroup(pCfg->vgId);
if (pVgroup == NULL) { if (pVgroup == NULL) {
...@@ -659,7 +670,7 @@ static void mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) { ...@@ -659,7 +670,7 @@ static void mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) {
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NOT_ACTIVE_VNODE); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NOT_ACTIVE_VNODE);
return; return;
} }
mgmtDecVgroupRef(pVgroup); mgmtReleaseVgroup(pVgroup);
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SUCCESS); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SUCCESS);
...@@ -675,7 +686,7 @@ void mgmtDropAllVgroups(SDbObj *pDropDb) { ...@@ -675,7 +686,7 @@ void mgmtDropAllVgroups(SDbObj *pDropDb) {
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
while (1) { while (1) {
mgmtDecVgroupRef(pVgroup); mgmtReleaseVgroup(pVgroup);
pNode = sdbFetchRow(tsVgroupSdb, pNode, (void **)&pVgroup); pNode = sdbFetchRow(tsVgroupSdb, pNode, (void **)&pVgroup);
if (pVgroup == NULL) break; if (pVgroup == NULL) break;
...@@ -704,4 +715,5 @@ void mgmtAlterVgroup(SVgObj *pVgroup, void *ahandle) { ...@@ -704,4 +715,5 @@ void mgmtAlterVgroup(SVgObj *pVgroup, void *ahandle) {
} else { } else {
mgmtAddToShellQueue(ahandle); mgmtAddToShellQueue(ahandle);
} }
} }
\ No newline at end of file
...@@ -20,7 +20,6 @@ ...@@ -20,7 +20,6 @@
#include "tlog.h" #include "tlog.h"
#include "tlosertree.h" #include "tlosertree.h"
#include "tscompression.h" #include "tscompression.h"
#include "tstatus.h"
#include "ttime.h" #include "ttime.h"
#include "qast.h" #include "qast.h"
......
...@@ -28,8 +28,6 @@ enum _module { ...@@ -28,8 +28,6 @@ enum _module {
TSDB_MOD_MGMT, TSDB_MOD_MGMT,
TSDB_MOD_HTTP, TSDB_MOD_HTTP,
TSDB_MOD_MONITOR, TSDB_MOD_MONITOR,
TSDB_MOD_DCLUSTER,
TSDB_MOD_MSTORAGE,
TSDB_MOD_MAX TSDB_MOD_MAX
}; };
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSTATUS_H
#define TDENGINE_TSTATUS_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#include <stdbool.h>
#include "taoserror.h"
enum _TSDB_VG_STATUS {
TSDB_VG_STATUS_READY = TSDB_CODE_SUCCESS,
TSDB_VG_STATUS_IN_PROGRESS = 1, //TSDB_CODE_ACTION_IN_PROGRESS,
TSDB_VG_STATUS_NO_DISK_PERMISSIONS = 73,//TSDB_CODE_NO_DISK_PERMISSIONS,
TSDB_VG_STATUS_SERVER_NO_PACE = 110, //TSDB_CODE_SERV_NO_DISKSPACE,
TSDB_VG_STATUS_SERV_OUT_OF_MEMORY = 69, //TSDB_CODE_SERV_OUT_OF_MEMORY,
TSDB_VG_STATUS_INIT_FAILED = 74, //TSDB_CODE_VG_INIT_FAILED,
TSDB_VG_STATUS_FULL = 48, //TSDB_CODE_NO_ENOUGH_DNODES,
};
enum _TSDB_DB_STATUS {
TSDB_DB_STATUS_READY,
TSDB_DB_STATUS_DROPPING,
TSDB_DB_STATUS_DROP_FROM_SDB
};
typedef enum _TSDB_VN_STATUS {
TSDB_VN_STATUS_NOT_READY,
TSDB_VN_STATUS_UNSYNCED,
TSDB_VN_STATUS_SLAVE,
TSDB_VN_STATUS_MASTER,
TSDB_VN_STATUS_CREATING,
TSDB_VN_STATUS_CLOSING,
TSDB_VN_STATUS_DELETING,
} EVnodeStatus;
enum _TSDB_VN_SYNC_STATUS {
TSDB_VN_SYNC_STATUS_INIT,
TSDB_VN_SYNC_STATUS_SYNCING,
TSDB_VN_SYNC_STATUS_SYNC_CACHE,
TSDB_VN_SYNC_STATUS_SYNC_FILE
};
enum _TSDB_VN_DROP_STATUS {
TSDB_VN_DROP_STATUS_READY,
TSDB_VN_DROP_STATUS_DROPPING
};
enum _TSDB_MN_STATUS {
TSDB_MN_STATUS_OFFLINE,
TSDB_MN_STATUS_UNSYNCED,
TSDB_MN_STATUS_SYNCING,
TSDB_MN_STATUS_SERVING
};
enum _TSDB_MN_ROLE {
TSDB_MN_ROLE_UNDECIDED,
TSDB_MN_ROLE_SLAVE,
TSDB_MN_ROLE_MASTER
};
enum _TSDB_DN_STATUS {
TSDB_DN_STATUS_OFFLINE,
TSDB_DN_STATUS_READY
};
enum _TSDB_DN_LB_STATUS {
TSDB_DN_LB_STATUS_BALANCED,
TSDB_DN_LB_STATUS_BALANCING,
TSDB_DN_LB_STATUS_OFFLINE_REMOVING,
TSDB_DN_LB_STATE_SHELL_REMOVING
};
enum _TSDB_VG_LB_STATUS {
TSDB_VG_LB_STATUS_READY,
TSDB_VG_LB_STATUS_UPDATE
};
enum _TSDB_VN_STREAM_STATUS {
TSDB_VN_STREAM_STATUS_STOP,
TSDB_VN_STREAM_STATUS_START
};
enum TSDB_TABLE_STATUS {
TSDB_METER_STATE_READY = 0x00,
TSDB_METER_STATE_INSERTING = 0x01,
TSDB_METER_STATE_IMPORTING = 0x02,
TSDB_METER_STATE_UPDATING = 0x04,
TSDB_METER_STATE_DROPPING = 0x10,
TSDB_METER_STATE_DROPPED = 0x18,
};
char* taosGetVgroupStatusStr(int32_t vgroupStatus);
char* taosGetDbStatusStr(int32_t dbStatus);
char* taosGetVnodeStatusStr(int32_t vnodeStatus);
char* taosGetVnodeSyncStatusStr(int32_t vnodeSyncStatus);
char* taosGetVnodeDropStatusStr(int32_t dropping);
char* taosGetDnodeStatusStr(int32_t dnodeStatus);
char* taosGetDnodeLbStatusStr(int32_t dnodeBalanceStatus);
char* taosGetVgroupLbStatusStr(int32_t vglbStatus);
char* taosGetVnodeStreamStatusStr(int32_t vnodeStreamStatus);
char* taosGetTableStatusStr(int32_t tableStatus);
char *taosGetShowTypeStr(int32_t showType);
char *taosGetMnodeStatusStr(int32_t mnodeStatus);
char *taosGetMnodeRoleStr(int32_t mnodeRole);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSTATUS_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "taosmsg.h"
#include "tstatus.h"
char* taosGetVgroupStatusStr(int32_t vgroupStatus) {
switch (vgroupStatus) {
case TSDB_VG_STATUS_READY: return (char*)tstrerror(vgroupStatus);
case TSDB_VG_STATUS_IN_PROGRESS: return (char*)tstrerror(vgroupStatus);
case TSDB_VG_STATUS_NO_DISK_PERMISSIONS: return (char*)tstrerror(vgroupStatus);
case TSDB_VG_STATUS_SERVER_NO_PACE: return (char*)tstrerror(vgroupStatus);
case TSDB_VG_STATUS_SERV_OUT_OF_MEMORY: return (char*)tstrerror(vgroupStatus);
case TSDB_VG_STATUS_INIT_FAILED: return (char*)tstrerror(vgroupStatus);
case TSDB_VG_STATUS_FULL: return (char*)tstrerror(vgroupStatus);
default: return "undefined";
}
}
char* taosGetDbStatusStr(int32_t dbStatus) {
switch (dbStatus) {
case TSDB_DB_STATUS_READY: return "ready";
case TSDB_DB_STATUS_DROPPING: return "dropping";
case TSDB_DB_STATUS_DROP_FROM_SDB: return "drop_from_sdb";
default: return "undefined";
}
}
char* taosGetVnodeStatusStr(int32_t vnodeStatus) {
switch (vnodeStatus) {
case TSDB_VN_STATUS_NOT_READY:return "not_ready";
case TSDB_VN_STATUS_UNSYNCED: return "unsynced";
case TSDB_VN_STATUS_SLAVE: return "slave";
case TSDB_VN_STATUS_MASTER: return "master";
case TSDB_VN_STATUS_CREATING: return "creating";
case TSDB_VN_STATUS_CLOSING: return "closing";
case TSDB_VN_STATUS_DELETING: return "deleting";
default: return "undefined";
}
}
char* taosGetVnodeSyncStatusStr(int32_t vnodeSyncStatus) {
switch (vnodeSyncStatus) {
case TSDB_VN_SYNC_STATUS_INIT: return "ready";
case TSDB_VN_SYNC_STATUS_SYNCING: return "syncing";
case TSDB_VN_SYNC_STATUS_SYNC_CACHE: return "sync_cache";
case TSDB_VN_SYNC_STATUS_SYNC_FILE: return "sync_file";
default: return "undefined";
}
}
char* taosGetVnodeDropStatusStr(int32_t dropping) {
switch (dropping) {
case TSDB_VN_DROP_STATUS_READY: return "ready";
case TSDB_VN_DROP_STATUS_DROPPING: return "dropping";
default: return "undefined";
}
}
char* taosGetDnodeStatusStr(int32_t dnodeStatus) {
switch (dnodeStatus) {
case TSDB_DN_STATUS_OFFLINE: return "offline";
case TSDB_DN_STATUS_READY: return "ready";
default: return "undefined";
}
}
char* taosGetDnodeLbStatusStr(int32_t dnodeBalanceStatus) {
switch (dnodeBalanceStatus) {
case TSDB_DN_LB_STATUS_BALANCED: return "balanced";
case TSDB_DN_LB_STATUS_BALANCING: return "balancing";
case TSDB_DN_LB_STATUS_OFFLINE_REMOVING: return "offline removing";
case TSDB_DN_LB_STATE_SHELL_REMOVING: return "removing";
default: return "undefined";
}
}
char* taosGetVgroupLbStatusStr(int32_t vglbStatus) {
switch (vglbStatus) {
case TSDB_VG_LB_STATUS_READY: return "ready";
case TSDB_VG_LB_STATUS_UPDATE: return "updating";
default: return "undefined";
}
}
char* taosGetVnodeStreamStatusStr(int32_t vnodeStreamStatus) {
switch (vnodeStreamStatus) {
case TSDB_VN_STREAM_STATUS_START: return "start";
case TSDB_VN_STREAM_STATUS_STOP: return "stop";
default: return "undefined";
}
}
char* taosGetTableStatusStr(int32_t tableStatus) {
switch(tableStatus) {
case TSDB_METER_STATE_INSERTING:return "inserting";
case TSDB_METER_STATE_IMPORTING:return "importing";
case TSDB_METER_STATE_UPDATING: return "updating";
case TSDB_METER_STATE_DROPPING: return "deleting";
case TSDB_METER_STATE_DROPPED: return "dropped";
case TSDB_METER_STATE_READY: return "ready";
default:return "undefined";
}
}
char *taosGetShowTypeStr(int32_t showType) {
switch (showType) {
case TSDB_MGMT_TABLE_ACCT: return "show accounts";
case TSDB_MGMT_TABLE_USER: return "show users";
case TSDB_MGMT_TABLE_DB: return "show databases";
case TSDB_MGMT_TABLE_TABLE: return "show tables";
case TSDB_MGMT_TABLE_DNODE: return "show dnodes";
case TSDB_MGMT_TABLE_MNODE: return "show mnodes";
case TSDB_MGMT_TABLE_VGROUP: return "show vgroups";
case TSDB_MGMT_TABLE_METRIC: return "show stables";
case TSDB_MGMT_TABLE_MODULE: return "show modules";
case TSDB_MGMT_TABLE_QUERIES: return "show queries";
case TSDB_MGMT_TABLE_STREAMS: return "show streams";
case TSDB_MGMT_TABLE_CONFIGS: return "show configs";
case TSDB_MGMT_TABLE_CONNS: return "show connections";
case TSDB_MGMT_TABLE_SCORES: return "show scores";
case TSDB_MGMT_TABLE_GRANTS: return "show grants";
case TSDB_MGMT_TABLE_VNODES: return "show vnodes";
default: return "undefined";
}
}
char *taosGetMnodeStatusStr(int32_t mnodeStatus) {
switch (mnodeStatus) {
case TSDB_MN_STATUS_OFFLINE: return "offline";
case TSDB_MN_STATUS_UNSYNCED: return "unsynced";
case TSDB_MN_STATUS_SYNCING: return "syncing";
case TSDB_MN_STATUS_SERVING: return "serving";
default: return "undefined";
}
}
char *taosGetMnodeRoleStr(int32_t mnodeRole) {
switch (mnodeRole) {
case TSDB_MN_ROLE_UNDECIDED: return "undicided";
case TSDB_MN_ROLE_SLAVE: return "slave";
case TSDB_MN_ROLE_MASTER: return "master";
default: return "undefined";
}
}
...@@ -23,18 +23,10 @@ extern "C" { ...@@ -23,18 +23,10 @@ extern "C" {
#include "tsync.h" #include "tsync.h"
#include "twal.h" #include "twal.h"
typedef enum _VN_STATUS {
VN_STATUS_INIT,
VN_STATUS_CREATING,
VN_STATUS_READY,
VN_STATUS_CLOSING,
VN_STATUS_DELETING,
} EVnStatus;
typedef struct { typedef struct {
int32_t vgId; // global vnode group ID int32_t vgId; // global vnode group ID
int32_t refCount; // reference count int32_t refCount; // reference count
EVnStatus status; int status;
int8_t role; int8_t role;
int64_t version; int64_t version;
void *wqueue; void *wqueue;
......
...@@ -20,7 +20,6 @@ ...@@ -20,7 +20,6 @@
#include "taosmsg.h" #include "taosmsg.h"
#include "tlog.h" #include "tlog.h"
#include "trpc.h" #include "trpc.h"
#include "tstatus.h"
#include "tsdb.h" #include "tsdb.h"
#include "ttime.h" #include "ttime.h"
#include "ttimer.h" #include "ttimer.h"
...@@ -117,7 +116,7 @@ int32_t vnodeDrop(int32_t vgId) { ...@@ -117,7 +116,7 @@ int32_t vnodeDrop(int32_t vgId) {
} }
dTrace("pVnode:%p vgId:%d, vnode will be dropped", pVnode, pVnode->vgId); dTrace("pVnode:%p vgId:%d, vnode will be dropped", pVnode, pVnode->vgId);
pVnode->status = VN_STATUS_DELETING; pVnode->status = TAOS_VN_STATUS_DELETING;
vnodeCleanUp(pVnode); vnodeCleanUp(pVnode);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -129,7 +128,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -129,7 +128,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
SVnodeObj *pVnode = calloc(sizeof(SVnodeObj), 1); SVnodeObj *pVnode = calloc(sizeof(SVnodeObj), 1);
pVnode->vgId = vnode; pVnode->vgId = vnode;
pVnode->status = VN_STATUS_INIT; pVnode->status = TAOS_VN_STATUS_INIT;
pVnode->refCount = 1; pVnode->refCount = 1;
pVnode->version = 0; pVnode->version = 0;
taosAddIntHash(tsDnodeVnodesHash, pVnode->vgId, (char *)(&pVnode)); taosAddIntHash(tsDnodeVnodesHash, pVnode->vgId, (char *)(&pVnode));
...@@ -179,7 +178,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -179,7 +178,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
walRestore(pVnode->wal, pVnode, vnodeWriteToQueue); walRestore(pVnode->wal, pVnode, vnodeWriteToQueue);
pVnode->status = VN_STATUS_READY; pVnode->status = TAOS_VN_STATUS_READY;
dTrace("pVnode:%p vgId:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir); dTrace("pVnode:%p vgId:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir);
atomic_add_fetch_32(&tsOpennedVnodes, 1); atomic_add_fetch_32(&tsOpennedVnodes, 1);
...@@ -192,7 +191,7 @@ int32_t vnodeClose(int32_t vgId) { ...@@ -192,7 +191,7 @@ int32_t vnodeClose(int32_t vgId) {
if (pVnode == NULL) return 0; if (pVnode == NULL) return 0;
dTrace("pVnode:%p vgId:%d, vnode will be closed", pVnode, pVnode->vgId); dTrace("pVnode:%p vgId:%d, vnode will be closed", pVnode, pVnode->vgId);
pVnode->status = VN_STATUS_CLOSING; pVnode->status = TAOS_VN_STATUS_CLOSING;
vnodeCleanUp(pVnode); vnodeCleanUp(pVnode);
return 0; return 0;
...@@ -216,7 +215,7 @@ void vnodeRelease(void *pVnodeRaw) { ...@@ -216,7 +215,7 @@ void vnodeRelease(void *pVnodeRaw) {
dnodeFreeWqueue(pVnode->wqueue); dnodeFreeWqueue(pVnode->wqueue);
pVnode->wqueue = NULL; pVnode->wqueue = NULL;
if (pVnode->status == VN_STATUS_DELETING) { if (pVnode->status == TAOS_VN_STATUS_DELETING) {
// remove the whole directory // remove the whole directory
} }
...@@ -276,7 +275,7 @@ void vnodeBuildStatusMsg(void *param) { ...@@ -276,7 +275,7 @@ void vnodeBuildStatusMsg(void *param) {
static void vnodeBuildVloadMsg(char *pNode, void * param) { static void vnodeBuildVloadMsg(char *pNode, void * param) {
SVnodeObj *pVnode = *(SVnodeObj **) pNode; SVnodeObj *pVnode = *(SVnodeObj **) pNode;
if (pVnode->status == VN_STATUS_DELETING) return; if (pVnode->status == TAOS_VN_STATUS_DELETING) return;
SDMStatusMsg *pStatus = param; SDMStatusMsg *pStatus = param;
if (pStatus->openVnodes >= TSDB_MAX_VNODES) return; if (pStatus->openVnodes >= TSDB_MAX_VNODES) return;
...@@ -285,6 +284,7 @@ static void vnodeBuildVloadMsg(char *pNode, void * param) { ...@@ -285,6 +284,7 @@ static void vnodeBuildVloadMsg(char *pNode, void * param) {
pLoad->vgId = htonl(pVnode->vgId); pLoad->vgId = htonl(pVnode->vgId);
pLoad->vnode = htonl(pVnode->vgId); pLoad->vnode = htonl(pVnode->vgId);
pLoad->status = pVnode->status; pLoad->status = pVnode->status;
pLoad->role = pVnode->role;
} }
static void vnodeCleanUp(SVnodeObj *pVnode) { static void vnodeCleanUp(SVnodeObj *pVnode) {
......
...@@ -42,7 +42,7 @@ int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, ...@@ -42,7 +42,7 @@ int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen,
if (vnodeProcessReadMsgFp[msgType] == NULL) if (vnodeProcessReadMsgFp[msgType] == NULL)
return TSDB_CODE_MSG_NOT_PROCESSED; return TSDB_CODE_MSG_NOT_PROCESSED;
if (pVnode->status == VN_STATUS_DELETING || pVnode->status == VN_STATUS_CLOSING) if (pVnode->status == TAOS_VN_STATUS_DELETING || pVnode->status == TAOS_VN_STATUS_CLOSING)
return TSDB_CODE_NOT_ACTIVE_VNODE; return TSDB_CODE_NOT_ACTIVE_VNODE;
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pCont, contLen, ret); return (*vnodeProcessReadMsgFp[msgType])(pVnode, pCont, contLen, ret);
......
...@@ -41,18 +41,19 @@ void vnodeInitWriteFp(void) { ...@@ -41,18 +41,19 @@ void vnodeInitWriteFp(void) {
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessDropStableMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessDropStableMsg;
} }
int32_t vnodeProcessWrite(void *param, int qtype, SWalHead *pHead, void *item) { int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
int32_t code = 0; int32_t code = 0;
SVnodeObj *pVnode = (SVnodeObj *)param; SVnodeObj *pVnode = (SVnodeObj *)param1;
SWalHead *pHead = param2;
if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL)
return TSDB_CODE_MSG_NOT_PROCESSED; return TSDB_CODE_MSG_NOT_PROCESSED;
if (pVnode->status == VN_STATUS_DELETING || pVnode->status == VN_STATUS_CLOSING) if (pVnode->status == TAOS_VN_STATUS_DELETING || pVnode->status == TAOS_VN_STATUS_CLOSING)
return TSDB_CODE_NOT_ACTIVE_VNODE; return TSDB_CODE_NOT_ACTIVE_VNODE;
if (pHead->version == 0) { // from client if (pHead->version == 0) { // from client
if (pVnode->status != VN_STATUS_READY) if (pVnode->status != TAOS_VN_STATUS_READY)
return TSDB_CODE_NOT_ACTIVE_VNODE; return TSDB_CODE_NOT_ACTIVE_VNODE;
// if (pVnode->replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) // if (pVnode->replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册