提交 a00be5e3 编写于 作者: S slguan

refact code for cluster

上级 0d4618ce
...@@ -25,18 +25,10 @@ extern "C" { ...@@ -25,18 +25,10 @@ extern "C" {
extern uint16_t tsMgmtMgmtPort; extern uint16_t tsMgmtMgmtPort;
extern uint16_t tsMgmtSyncPort; extern uint16_t tsMgmtSyncPort;
extern int sdbMaxNodes;
extern int tsMgmtPeerHBTimer; // seconds extern int tsMgmtPeerHBTimer; // seconds
extern char sdbZone[];
extern char sdbMasterIp[];
extern char sdbPrivateIp[];
extern char * sdbStatusStr[]; extern char * sdbStatusStr[];
extern char * sdbRoleStr[]; extern char * sdbRoleStr[];
extern void * mnodeSdb;
extern int sdbExtConns;
extern int sdbMaster; extern int sdbMaster;
extern uint32_t sdbPublicIp;
extern uint32_t sdbMasterStartTime;
extern SRpcIpSet *pSdbIpList; extern SRpcIpSet *pSdbIpList;
extern SRpcIpSet *pSdbPublicIpList; extern SRpcIpSet *pSdbPublicIpList;
...@@ -89,14 +81,9 @@ typedef struct { ...@@ -89,14 +81,9 @@ typedef struct {
// internal // internal
int syncFd; int syncFd;
void *hbTimer; void *hbTimer;
void *thandle;
void *pSync; void *pSync;
} SSdbPeer; } SSdbPeer;
SSdbPeer *sdbAddPeer(uint32_t ip, uint32_t publicIp, char role);
void sdbUpdateIpList();
extern SSdbPeer *sdbPeer[]; extern SSdbPeer *sdbPeer[];
#define sdbInited (sdbPeer[0]) #define sdbInited (sdbPeer[0])
#define sdbStatus (sdbPeer[0]->status) #define sdbStatus (sdbPeer[0]->status)
...@@ -130,8 +117,6 @@ int sdbInitPeers(char *directory); ...@@ -130,8 +117,6 @@ int sdbInitPeers(char *directory);
void sdbCleanUpPeers(); void sdbCleanUpPeers();
int sdbCfgNode(char *cont);
int64_t sdbGetVersion(); int64_t sdbGetVersion();
int32_t sdbGetRunStatus(); int32_t sdbGetRunStatus();
......
...@@ -600,17 +600,6 @@ typedef struct { ...@@ -600,17 +600,6 @@ typedef struct {
SVnodeAccess vnodeAccess[]; SVnodeAccess vnodeAccess[];
} SStatusRsp; } SStatusRsp;
// internal message
typedef struct {
uint32_t destId;
uint32_t destIp;
char tableId[TSDB_UNI_LEN + 1];
char empty[3];
uint8_t msgType;
int32_t msgLen;
uint8_t content[0];
} SIntMsg;
typedef struct { typedef struct {
char spi; char spi;
char encrypt; char encrypt;
......
...@@ -22,7 +22,6 @@ extern "C" { ...@@ -22,7 +22,6 @@ extern "C" {
#include "mnode.h" #include "mnode.h"
void mgmtStartBalanceTimer(int64_t mseconds);
int32_t mgmtInitBalance(); int32_t mgmtInitBalance();
void mgmtCleanupBalance(); void mgmtCleanupBalance();
int32_t mgmtAllocVnodes(SVgObj *pVgroup); int32_t mgmtAllocVnodes(SVgObj *pVgroup);
......
...@@ -58,6 +58,8 @@ SDnodeObj* mgmtGetDnode(uint32_t ip); ...@@ -58,6 +58,8 @@ SDnodeObj* mgmtGetDnode(uint32_t ip);
extern int32_t (*mgmtCreateDnodeFp)(uint32_t ip); extern int32_t (*mgmtCreateDnodeFp)(uint32_t ip);
extern int32_t (*mgmtDropDnodeByIpFp)(uint32_t ip); extern int32_t (*mgmtDropDnodeByIpFp)(uint32_t ip);
void mgmtCalcNumOfFreeVnodes(SDnodeObj *pDnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -455,9 +455,9 @@ int32_t mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) { ...@@ -455,9 +455,9 @@ int32_t mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) {
// //
// SVgObj *pVgroup = pDb->pHead; // SVgObj *pVgroup = pDb->pHead;
// while (pVgroup != NULL) { // while (pVgroup != NULL) {
// mgmtUpdateVgroupState(pVgroup, TSDB_VG_LB_STATUS_UPDATE, 0); // balanceUpdateVgroupState(pVgroup, TSDB_VG_LB_STATUS_UPDATE, 0);
// if (oldReplicaNum < pDb->cfg.replications) { // if (oldReplicaNum < pDb->cfg.replications) {
// if (!mgmtAddVnode(pVgroup, NULL, NULL)) { // if (!balanceAddVnode(pVgroup, NULL, NULL)) {
// mWarn("db:%s vgroup:%d not enough dnode to add vnode", pAlter->db, pVgroup->vgId); // mWarn("db:%s vgroup:%d not enough dnode to add vnode", pAlter->db, pVgroup->vgId);
// code = TSDB_CODE_NO_ENOUGH_DNODES; // code = TSDB_CODE_NO_ENOUGH_DNODES;
// } // }
......
...@@ -609,6 +609,9 @@ void *mgmtGetNextDnode(SShowObj *pShow, SDnodeObj **pDnode) { ...@@ -609,6 +609,9 @@ void *mgmtGetNextDnode(SShowObj *pShow, SDnodeObj **pDnode) {
int32_t mgmtGetScoresMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { int32_t mgmtGetScoresMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
if (mgmtGetScoresMetaFp) { if (mgmtGetScoresMetaFp) {
SUserObj *pUser = mgmtGetUserFromConn(pConn);
if (pUser == NULL) return 0;
if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
return mgmtGetScoresMetaFp(pMeta, pShow, pConn); return mgmtGetScoresMetaFp(pMeta, pShow, pConn);
} else { } else {
return TSDB_CODE_OPS_NOT_SUPPORT; return TSDB_CODE_OPS_NOT_SUPPORT;
......
...@@ -312,7 +312,7 @@ int32_t mgmtCfgDynamicOptions(SDnodeObj *pDnode, char *msg) { ...@@ -312,7 +312,7 @@ int32_t mgmtCfgDynamicOptions(SDnodeObj *pDnode, char *msg) {
mTrace("dnode:%s, custom score set from:%d to:%d", taosIpStr(pDnode->privateIp), pDnode->customScore, score); mTrace("dnode:%s, custom score set from:%d to:%d", taosIpStr(pDnode->privateIp), pDnode->customScore, score);
pDnode->customScore = score; pDnode->customScore = score;
mgmtUpdateDnode(pDnode); mgmtUpdateDnode(pDnode);
mgmtStartBalanceTimer(15); //mgmtStartBalanceTimer(15);
} }
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
} else if (strncasecmp(option, "bandwidth", 9) == 0) { } else if (strncasecmp(option, "bandwidth", 9) == 0) {
......
...@@ -51,7 +51,6 @@ static void (*mgmtProcessShellMsg[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contL ...@@ -51,7 +51,6 @@ static void (*mgmtProcessShellMsg[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contL
static void mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle); static void mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle);
static int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); static int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey);
uint32_t mgmtAccessSquence = 0;
void *tsShellConnServer = NULL; void *tsShellConnServer = NULL;
void mgmtProcessTranRequest(SSchedMsg *sched) { void mgmtProcessTranRequest(SSchedMsg *sched) {
......
...@@ -61,10 +61,6 @@ int32_t mgmtCheckMgmtRunning() { ...@@ -61,10 +61,6 @@ int32_t mgmtCheckMgmtRunning() {
tsetModuleStatus(TSDB_MOD_MGMT); tsetModuleStatus(TSDB_MOD_MGMT);
// strcpy(sdbMasterIp, mgmtIpStr[0]);
// strcpy(sdbPrivateIp, tsPrivateIp);
// sdbPublicIp = inet_addr(tsPublicIp);
return 0; return 0;
} }
......
...@@ -318,6 +318,27 @@ int32_t mgmtGetVgroupMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { ...@@ -318,6 +318,27 @@ int32_t mgmtGetVgroupMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
return 0; return 0;
} }
char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode) {
SDnodeObj *pDnode = mgmtGetDnode(pVnode->ip);
if (pDnode == NULL) {
mError("dnode:%s, vgroup:%d, vnode:%d dnode not exist", taosIpStr(pVnode->ip), pVgroup->vgId, pVnode->vnode);
return "null";
}
if (pDnode->status == TSDB_DN_STATUS_OFFLINE) {
return "offline";
}
SVnodeLoad *vload = pDnode->vload + pVnode->vnode;
if (vload->vgId != pVgroup->vgId || vload->vnode != pVnode->vnode) {
mError("dnode:%s, vgroup:%d, vnode:%d not same with dnode vgroup:%d vnode:%d",
taosIpStr(pVnode->ip), pVgroup->vgId, pVnode->vnode, vload->vgId, vload->vnode);
return "null";
}
return (char*)taosGetVnodeStatusStr(vload->status);
}
int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn) { int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0; int32_t numOfRows = 0;
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
......
...@@ -48,6 +48,21 @@ ...@@ -48,6 +48,21 @@
#define sdbPrint(...) \ #define sdbPrint(...) \
{ tprintf("MND-SDB ", 255, __VA_ARGS__); } { tprintf("MND-SDB ", 255, __VA_ARGS__); }
#define mpeerError(...) \
if (sdbDebugFlag & DEBUG_ERROR) { \
tprintf("ERROR MND-MPEER ", 255, __VA_ARGS__); \
}
#define mpeerWarn(...) \
if (sdbDebugFlag & DEBUG_WARN) { \
tprintf("WARN MND-MPEER ", sdbDebugFlag, __VA_ARGS__); \
}
#define mpeerTrace(...) \
if (sdbDebugFlag & DEBUG_TRACE) { \
tprintf("MND-MPEER ", sdbDebugFlag, __VA_ARGS__); \
}
#define mpeerPrint(...) \
{ tprintf("MND-MPEER ", 255, __VA_ARGS__); }
#define sdbLError(...) taosLogError(__VA_ARGS__) sdbError(__VA_ARGS__) #define sdbLError(...) taosLogError(__VA_ARGS__) sdbError(__VA_ARGS__)
#define sdbLWarn(...) taosLogWarn(__VA_ARGS__) sdbWarn(__VA_ARGS__) #define sdbLWarn(...) taosLogWarn(__VA_ARGS__) sdbWarn(__VA_ARGS__)
#define sdbLPrint(...) taosLogPrint(__VA_ARGS__) sdbPrint(__VA_ARGS__) #define sdbLPrint(...) taosLogPrint(__VA_ARGS__) sdbPrint(__VA_ARGS__)
...@@ -69,11 +84,6 @@ typedef struct { ...@@ -69,11 +84,6 @@ typedef struct {
char *row; char *row;
} SSdbUpdate; } SSdbUpdate;
typedef struct {
char numOfTables;
uint64_t version[];
} SSdbSync;
typedef struct { typedef struct {
SSdbHeader header; SSdbHeader header;
int maxRows; int maxRows;
...@@ -109,23 +119,6 @@ typedef struct { ...@@ -109,23 +119,6 @@ typedef struct {
char data[]; char data[];
} SRowHead; } SRowHead;
typedef struct {
char * buffer;
char * offset;
int trans;
int bufferSize;
pthread_mutex_t qmutex;
} STranQueue;
typedef struct {
char status;
char role;
char numOfMnodes;
uint64_t dbVersion;
uint32_t numOfDnodes;
uint32_t publicIp;
} SMnodeStatus;
typedef struct { typedef struct {
uint8_t dbId; uint8_t dbId;
char type; char type;
...@@ -139,8 +132,8 @@ extern int sdbMaxPeers; ...@@ -139,8 +132,8 @@ extern int sdbMaxPeers;
extern int sdbNumOfTables; extern int sdbNumOfTables;
extern int64_t sdbVersion; extern int64_t sdbVersion;
int sdbForwardDbReqToPeer(SSdbTable *pTable, char type, char *data, int dataLen); int mpeerForwardDbReqToPeer(SSdbTable *pTable, char type, char *data, int dataLen);
int sdbRetrieveRows(int fd, SSdbTable *pTable, uint64_t version); int mpeerRetrieveRows(int fd, SSdbTable *pTable, uint64_t version);
void sdbResetTable(SSdbTable *pTable); void sdbResetTable(SSdbTable *pTable);
extern const int16_t sdbFileVersion; extern const int16_t sdbFileVersion;
......
...@@ -23,7 +23,6 @@ ...@@ -23,7 +23,6 @@
extern char version[]; extern char version[];
const int16_t sdbFileVersion = 0; const int16_t sdbFileVersion = 0;
int sdbExtConns = 0;
SRpcIpSet *pSdbIpList = NULL; SRpcIpSet *pSdbIpList = NULL;
SRpcIpSet *pSdbPublicIpList = NULL; SRpcIpSet *pSdbPublicIpList = NULL;
SSdbPeer * sdbPeer[SDB_MAX_PEERS]; // first slot for self SSdbPeer * sdbPeer[SDB_MAX_PEERS]; // first slot for self
...@@ -431,7 +430,7 @@ int64_t sdbInsertRow(void *handle, void *row, int rowSize) { ...@@ -431,7 +430,7 @@ int64_t sdbInsertRow(void *handle, void *row, int rowSize) {
pthread_mutex_lock(&pTable->mutex); pthread_mutex_lock(&pTable->mutex);
if (sdbForwardDbReqToPeer(pTable, SDB_TYPE_INSERT, rowHead->data, rowHead->rowSize) == 0) { if (mpeerForwardDbReqToPeer(pTable, SDB_TYPE_INSERT, rowHead->data, rowHead->rowSize) == 0) {
pTable->id++; pTable->id++;
sdbVersion++; sdbVersion++;
if (pTable->keyType == SDB_KEYTYPE_AUTO) { if (pTable->keyType == SDB_KEYTYPE_AUTO) {
...@@ -548,7 +547,7 @@ int sdbDeleteRow(void *handle, void *row) { ...@@ -548,7 +547,7 @@ int sdbDeleteRow(void *handle, void *row) {
pthread_mutex_lock(&pTable->mutex); pthread_mutex_lock(&pTable->mutex);
if (sdbForwardDbReqToPeer(pTable, SDB_TYPE_DELETE, (char *)row, rowSize) == 0) { if (mpeerForwardDbReqToPeer(pTable, SDB_TYPE_DELETE, (char *)row, rowSize) == 0) {
pTable->id++; pTable->id++;
sdbVersion++; sdbVersion++;
...@@ -666,7 +665,7 @@ int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated) { ...@@ -666,7 +665,7 @@ int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated) {
pthread_mutex_lock(&pTable->mutex); pthread_mutex_lock(&pTable->mutex);
if (sdbForwardDbReqToPeer(pTable, SDB_TYPE_UPDATE, rowHead->data, rowHead->rowSize) == 0) { if (mpeerForwardDbReqToPeer(pTable, SDB_TYPE_UPDATE, rowHead->data, rowHead->rowSize) == 0) {
pTable->id++; pTable->id++;
sdbVersion++; sdbVersion++;
...@@ -745,7 +744,7 @@ int sdbBatchUpdateRow(void *handle, void *row, int rowSize) { ...@@ -745,7 +744,7 @@ int sdbBatchUpdateRow(void *handle, void *row, int rowSize) {
} }
pthread_mutex_lock(&pTable->mutex); pthread_mutex_lock(&pTable->mutex);
if (sdbForwardDbReqToPeer(pTable, SDB_TYPE_BATCH_UPDATE, row, rowSize) == 0) { if (mpeerForwardDbReqToPeer(pTable, SDB_TYPE_BATCH_UPDATE, row, rowSize) == 0) {
/* // write action */ /* // write action */
/* write(pTable->fd, &action, sizeof(action)); */ /* write(pTable->fd, &action, sizeof(action)); */
/* pTable->size += sizeof(action); */ /* pTable->size += sizeof(action); */
......
...@@ -24,7 +24,7 @@ char* sdbRoleStr[] = {"unauthed", "undecided", "master", "slave", "null"}; ...@@ -24,7 +24,7 @@ char* sdbRoleStr[] = {"unauthed", "undecided", "master", "slave", "null"};
/* /*
* Lite Version sync request is always successful * Lite Version sync request is always successful
*/ */
int sdbForwardDbReqToPeer(SSdbTable *pTable, char type, char *data, int dataLen) { int mpeerForwardDbReqToPeer(SSdbTable *pTable, char type, char *data, int dataLen) {
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册