提交 b9767722 编写于 作者: H hzcheng

Merge branch '2.0' into feature/2.0tsdb

...@@ -33,11 +33,11 @@ extern int32_t (*dnodeInitPeers)(int32_t numOfThreads); ...@@ -33,11 +33,11 @@ extern int32_t (*dnodeInitPeers)(int32_t numOfThreads);
extern int32_t (*dnodeCheckSystem)(); extern int32_t (*dnodeCheckSystem)();
extern int32_t (*dnodeInitStorage)(); extern int32_t (*dnodeInitStorage)();
extern void (*dnodeCleanupStorage)(); extern void (*dnodeCleanupStorage)();
extern void (*dnodeParseParameterK)();
extern int32_t tsMaxQueues; extern int32_t tsMaxQueues;
extern void ** tsRpcQhandle; extern void ** tsRpcQhandle;
extern void *tsQueryQhandle; extern void *tsQueryQhandle;
extern void *tsDnodeMgmtQhandle; extern void *tsDnodeMgmtQhandle;
extern void *tsDnodeTmr;
int32_t dnodeInitSystem(); int32_t dnodeInitSystem();
void dnodeCleanUpSystem(); void dnodeCleanUpSystem();
......
...@@ -58,6 +58,8 @@ int32_t dnodeDropVnode(int32_t vnode); ...@@ -58,6 +58,8 @@ int32_t dnodeDropVnode(int32_t vnode);
//tsdb_repo_t* dnodeGetVnode(int vid); //tsdb_repo_t* dnodeGetVnode(int vid);
void* dnodeGetVnode(int32_t vnode); void* dnodeGetVnode(int32_t vnode);
int32_t dnodeGetVnodesNum();
/* /*
* get the status of vnode * get the status of vnode
*/ */
......
...@@ -29,10 +29,14 @@ ...@@ -29,10 +29,14 @@
void (*dnodeInitMgmtIpFp)() = NULL; void (*dnodeInitMgmtIpFp)() = NULL;
int32_t (*dnodeInitMgmtFp)() = NULL; int32_t (*dnodeInitMgmtFp)() = NULL;
void (*dnodeCleanUpMgmtFp)() = NULL;
void (*dnodeProcessStatusRspFp)(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) = NULL; void (*dnodeProcessStatusRspFp)(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) = NULL;
void (*dnodeSendMsgToMnodeFp)(int8_t msgType, void *pCont, int32_t contLen) = NULL; void (*dnodeSendMsgToMnodeFp)(int8_t msgType, void *pCont, int32_t contLen) = NULL;
void (*dnodeSendRspToMnodeFp)(void *handle, int32_t code, void *pCont, int contLen) = NULL; void (*dnodeSendRspToMnodeFp)(void *handle, int32_t code, void *pCont, int contLen) = NULL;
static void *tsStatusTimer = NULL;
static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contLen, int8_t msgType, void *pConn); static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contLen, int8_t msgType, void *pConn);
static void dnodeInitProcessShellMsg(); static void dnodeInitProcessShellMsg();
...@@ -86,12 +90,69 @@ void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont, ...@@ -86,12 +90,69 @@ void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont,
} }
} }
void dnodeSendStatusMsgToMgmt(void *handle, void *tmrId) {
taosTmrReset(dnodeSendStatusMsgToMgmt, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
if (tsStatusTimer == NULL) {
dError("Failed to start status timer");
return;
}
int32_t contLen = sizeof(SStatusMsg) + dnodeGetVnodesNum() * sizeof(SVnodeLoad);
SStatusMsg *pStatus = rpcMallocCont(contLen);
if (pStatus == NULL) {
dError("Failed to malloc status message");
return;
}
int32_t totalVnodes = dnodeGetVnodesNum();
pStatus->version = htonl(tsVersion);
pStatus->privateIp = htonl(inet_addr(tsPrivateIp));
pStatus->publicIp = htonl(inet_addr(tsPublicIp));
pStatus->lastReboot = htonl(tsRebootTime);
pStatus->numOfTotalVnodes = htons((uint16_t) tsNumOfTotalVnodes);
pStatus->openVnodes = htons((uint16_t) totalVnodes);
pStatus->numOfCores = htons((uint16_t) tsNumOfCores);
pStatus->diskAvailable = tsAvailDataDirGB;
pStatus->alternativeRole = (uint8_t) tsAlternativeRole;
SVnodeLoad *pLoad = (SVnodeLoad *)pStatus->load;
//TODO loop all vnodes
// for (int32_t vnode = 0, count = 0; vnode <= totalVnodes; ++vnode) {
// if (vnodeList[vnode].cfg.maxSessions <= 0) continue;
//
// SVnodeObj *pVnode = vnodeList + vnode;
// pLoad->vnode = htonl(vnode);
// pLoad->vgId = htonl(pVnode->cfg.vgId);
// pLoad->status = (uint8_t)vnodeList[vnode].vnodeStatus;
// pLoad->syncStatus =(uint8_t)vnodeList[vnode].syncStatus;
// pLoad->accessState = (uint8_t)(pVnode->accessState);
// pLoad->totalStorage = htobe64(pVnode->vnodeStatistic.totalStorage);
// pLoad->compStorage = htobe64(pVnode->vnodeStatistic.compStorage);
// if (pVnode->vnodeStatus == TSDB_VN_STATUS_MASTER) {
// pLoad->pointsWritten = htobe64(pVnode->vnodeStatistic.pointsWritten);
// } else {
// pLoad->pointsWritten = htobe64(0);
// }
// pLoad++;
//
// if (++count >= tsOpenVnodes) {
// break;
// }
// }
dnodeSendMsgToMnode(TSDB_MSG_TYPE_STATUS, pStatus, contLen);
}
int32_t dnodeInitMgmt() { int32_t dnodeInitMgmt() {
if (dnodeInitMgmtFp) { if (dnodeInitMgmtFp) {
dnodeInitMgmtFp(); dnodeInitMgmtFp();
} }
dnodeInitProcessShellMsg(); dnodeInitProcessShellMsg();
taosTmrReset(dnodeSendStatusMsgToMgmt, 500, NULL, tsDnodeTmr, &tsStatusTimer);
return 0; return 0;
} }
...@@ -101,6 +162,17 @@ void dnodeInitMgmtIp() { ...@@ -101,6 +162,17 @@ void dnodeInitMgmtIp() {
} }
} }
void dnodeCleanUpMgmt() {
if (tsStatusTimer != NULL) {
taosTmrStopA(&tsStatusTimer);
tsStatusTimer = NULL;
}
if (dnodeCleanUpMgmtFp) {
dnodeCleanUpMgmtFp();
}
}
void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) { void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) {
if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) { if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) {
dError("invalid msg type:%d", msgType); dError("invalid msg type:%d", msgType);
......
...@@ -19,6 +19,8 @@ ...@@ -19,6 +19,8 @@
#include "tglobalcfg.h" #include "tglobalcfg.h"
#include "dnodeSystem.h" #include "dnodeSystem.h"
void (*dnodeParseParameterKFp)() = NULL;
/* /*
* Termination handler * Termination handler
*/ */
...@@ -63,7 +65,9 @@ int main(int argc, char *argv[]) { ...@@ -63,7 +65,9 @@ int main(int argc, char *argv[]) {
printf("buildinfo: %s\n", buildinfo); printf("buildinfo: %s\n", buildinfo);
return 0; return 0;
} else if (strcmp(argv[i], "-k") == 0) { } else if (strcmp(argv[i], "-k") == 0) {
dnodeParseParameterK(); if (dnodeParseParameterKFp) {
dnodeParseParameterKFp();
}
#ifdef TAOS_MEM_CHECK #ifdef TAOS_MEM_CHECK
} else if (strcmp(argv[i], "--alloc-random-fail") == 0) { } else if (strcmp(argv[i], "--alloc-random-fail") == 0) {
if ((i < argc - 1) && (argv[i+1][0] != '-')) { if ((i < argc - 1) && (argv[i+1][0] != '-')) {
......
...@@ -33,12 +33,12 @@ ...@@ -33,12 +33,12 @@
#include "dnodeVnodeMgmt.h" #include "dnodeVnodeMgmt.h"
#ifdef CLUSTER #ifdef CLUSTER
#include "acct.h" //#include "acct.h"
#include "admin.h" //#include "admin.h"
#include "cluster.h" //#include "cluster.h"
#include "grant.h" //#include "grant.h"
#include "replica.h" //#include "replica.h"
#include "storage.h" //#include "storage.h"
#endif #endif
static pthread_mutex_t tsDnodeMutex; static pthread_mutex_t tsDnodeMutex;
...@@ -48,8 +48,7 @@ static int32_t dnodeInitRpcQHandle(); ...@@ -48,8 +48,7 @@ static int32_t dnodeInitRpcQHandle();
static int32_t dnodeInitQueryQHandle(); static int32_t dnodeInitQueryQHandle();
static int32_t dnodeInitTmrCtl(); static int32_t dnodeInitTmrCtl();
void *tsStatusTimer = NULL; void *tsDnodeTmr;
void *vnodeTmrCtrl;
void **tsRpcQhandle; void **tsRpcQhandle;
void *tsDnodeMgmtQhandle; void *tsDnodeMgmtQhandle;
void *tsQueryQhandle; void *tsQueryQhandle;
...@@ -90,10 +89,7 @@ void dnodeCleanUpSystem() { ...@@ -90,10 +89,7 @@ void dnodeCleanUpSystem() {
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED); dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED);
} }
if (tsStatusTimer != NULL) {
taosTmrStopA(&tsStatusTimer);
tsStatusTimer = NULL;
}
dnodeCleanupShell(); dnodeCleanupShell();
dnodeCleanUpModules(); dnodeCleanUpModules();
...@@ -259,15 +255,15 @@ static int32_t dnodeInitQueryQHandle() { ...@@ -259,15 +255,15 @@ static int32_t dnodeInitQueryQHandle() {
int32_t maxQueueSize = tsNumOfVnodesPerCore * tsNumOfCores * tsSessionsPerVnode; int32_t maxQueueSize = tsNumOfVnodesPerCore * tsNumOfCores * tsSessionsPerVnode;
dTrace("query task queue initialized, max slot:%d, task threads:%d", maxQueueSize, numOfThreads); dTrace("query task queue initialized, max slot:%d, task threads:%d", maxQueueSize, numOfThreads);
tsQueryQhandle = taosInitSchedulerWithInfo(maxQueueSize, numOfThreads, "query", vnodeTmrCtrl); tsQueryQhandle = taosInitSchedulerWithInfo(maxQueueSize, numOfThreads, "query", tsDnodeTmr);
return 0; return 0;
} }
static int32_t dnodeInitTmrCtl() { static int32_t dnodeInitTmrCtl() {
vnodeTmrCtrl = taosTmrInit(TSDB_MAX_VNODES * (tsVnodePeers + 10) + tsSessionsPerVnode + 1000, 200, 60000, tsDnodeTmr = taosTmrInit(TSDB_MAX_VNODES * (tsVnodePeers + 10) + tsSessionsPerVnode + 1000, 200, 60000,
"DND-vnode"); "DND-vnode");
if (vnodeTmrCtrl == NULL) { if (tsDnodeTmr == NULL) {
dError("failed to init timer, exit"); dError("failed to init timer, exit");
return -1; return -1;
} }
...@@ -298,10 +294,6 @@ int32_t dnodeCheckSystemImp() { ...@@ -298,10 +294,6 @@ int32_t dnodeCheckSystemImp() {
int32_t (*dnodeCheckSystem)() = dnodeCheckSystemImp; int32_t (*dnodeCheckSystem)() = dnodeCheckSystemImp;
void dnodeParseParameterKImp() {}
void (*dnodeParseParameterK)() = dnodeParseParameterKImp;
int32_t dnodeInitPeersImp(int32_t numOfThreads) { int32_t dnodeInitPeersImp(int32_t numOfThreads) {
return 0; return 0;
} }
......
...@@ -59,3 +59,6 @@ bool dnodeCheckTableExist(int32_t vnode, int32_t sid, int64_t uid) { ...@@ -59,3 +59,6 @@ bool dnodeCheckTableExist(int32_t vnode, int32_t sid, int64_t uid) {
return true; return true;
} }
int32_t dnodeGetVnodesNum() {
return 1;
}
此差异已折叠。
...@@ -42,23 +42,19 @@ extern uint32_t tsRebootTime; ...@@ -42,23 +42,19 @@ extern uint32_t tsRebootTime;
// dnodeCluster // dnodeCluster
extern void (*dnodeStartModules)(); extern void (*dnodeStartModules)();
extern void (*dnodeParseParameterK)();
extern int32_t (*dnodeCheckSystem)(); extern int32_t (*dnodeCheckSystem)();
// dnodeSystem // dnodeSystem
extern void *tsDnodeMgmtQhandle; extern void *tsDnodeMgmtQhandle;
void dnodeCheckDataDirOpenned(const char* dir);
void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code); void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code);
// dnodeModule // dnodeModule
extern void (*dnodeStartModules)(); extern void (*dnodeStartModules)();
// multilevelStorage
extern int32_t (*dnodeInitStorage)();
extern void (*dnodeCleanupStorage)();
void dnodeCheckDataDirOpenned(const char* dir);
void dnodeLockVnodes(); void dnodeLockVnodes();
void dnodeUnLockVnodes(); void dnodeUnLockVnodes();
......
...@@ -258,14 +258,12 @@ typedef struct { ...@@ -258,14 +258,12 @@ typedef struct {
} SShowObj; } SShowObj;
//mgmtSystem //mgmtSystem
int32_t mgmtInitSystem();
int32_t mgmtStartSystem(); int32_t mgmtStartSystem();
void mgmtCleanUpSystem(); void mgmtCleanUpSystem();
void mgmtProcessMsgFromDnode(char msgType, void *pCont, int contLen, void *pConn, int32_t code); void mgmtStopSystem();
extern int32_t (*mgmtInitSystem)();
extern void (*mgmtStopSystem)();
extern void (*mgmtCleanUpRedirect)();
void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -163,6 +163,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TABLE_ID_MISMATCH, 0, 118, "table id mismat ...@@ -163,6 +163,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TABLE_ID_MISMATCH, 0, 118, "table id mismat
TAOS_DEFINE_ERROR(TSDB_CODE_QUERY_CACHE_ERASED, 0, 119, "query cache erased") TAOS_DEFINE_ERROR(TSDB_CODE_QUERY_CACHE_ERASED, 0, 119, "query cache erased")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MSG, 0, 120, "invalid message") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MSG, 0, 120, "invalid message")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TABLE_TYPE, 0, 121, "invalid table typee") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TABLE_TYPE, 0, 121, "invalid table typee")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MSG_VERSION, 0, 122, "invalid version of message")
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_NOT_EXIST, 0, 123, "dnode not exist")
#ifdef TAOS_ERROR_C #ifdef TAOS_ERROR_C
}; };
......
...@@ -309,8 +309,8 @@ typedef struct { ...@@ -309,8 +309,8 @@ typedef struct {
} SAcctCfg; } SAcctCfg;
typedef struct { typedef struct {
char user[TSDB_USER_LEN + 1]; char user[TSDB_USER_LEN + 1];
char pass[TSDB_KEY_LEN + 1]; char pass[TSDB_KEY_LEN + 1];
SAcctCfg cfg; SAcctCfg cfg;
} SCreateAcctMsg, SAlterAcctMsg; } SCreateAcctMsg, SAlterAcctMsg;
...@@ -572,33 +572,35 @@ typedef struct { ...@@ -572,33 +572,35 @@ typedef struct {
char reserved[64]; char reserved[64];
} SVnodeStatisticInfo; } SVnodeStatisticInfo;
typedef struct {
uint32_t moduleStatus;
uint32_t createdTime;
uint32_t numOfVnodes;
uint32_t reserved;
} SDnodeState;
typedef struct { typedef struct {
uint32_t version; uint32_t version;
uint32_t privateIp;
uint32_t publicIp; uint32_t publicIp;
uint32_t lastReboot; // time stamp for last reboot uint32_t lastReboot; // time stamp for last reboot
uint16_t numOfTotalVnodes; // from config file
uint16_t openVnodes;
uint16_t numOfCores; uint16_t numOfCores;
float diskAvailable; // GB
uint8_t alternativeRole; uint8_t alternativeRole;
uint8_t reserve; uint8_t reserve[15];
uint16_t numOfTotalVnodes; // from config file
uint16_t unused;
float diskAvailable; // GB
uint32_t openVnodes;
char reserved[16];
SVnodeLoad load[]; SVnodeLoad load[];
} SStatusMsg; } SStatusMsg;
typedef struct { typedef struct {
int32_t code; int32_t code;
SRpcIpSet ipList; int32_t numOfVnodes;
SDnodeState dnodeState;
SRpcIpSet ipList;
SVnodeAccess vnodeAccess[];
} SStatusRsp; } SStatusRsp;
typedef struct {
uint32_t moduleStatus;
uint32_t createdTime;
uint32_t numOfVnodes;
uint32_t reserved;
} SDnodeState;
// internal message // internal message
typedef struct { typedef struct {
uint32_t destId; uint32_t destId;
......
...@@ -22,19 +22,24 @@ extern "C" { ...@@ -22,19 +22,24 @@ extern "C" {
#include "mnode.h" #include "mnode.h"
int32_t mgmtInitAccts();
void mgmtCleanUpAccts();
SAcctObj *mgmtGetAcct(char *acctName);
int32_t mgmtCheckUserLimit(SAcctObj *pAcct);
int32_t mgmtCheckDbLimit(SAcctObj *pAcct);
int32_t mgmtCheckTableLimit(SAcctObj *pAcct, int32_t numOfTimeSeries);
int32_t mgmtGetAcctMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveAccts(SShowObj *pShow, char *data, int32_t rows, void *pConn);
int32_t mgmtAddDbIntoAcct(SAcctObj *pAcct, SDbObj *pDb); int32_t mgmtAddDbIntoAcct(SAcctObj *pAcct, SDbObj *pDb);
int32_t mgmtRemoveDbFromAcct(SAcctObj *pAcct, SDbObj *pDb); int32_t mgmtRemoveDbFromAcct(SAcctObj *pAcct, SDbObj *pDb);
int32_t mgmtAddUserIntoAcct(SAcctObj *pAcct, SUserObj *pUser); int32_t mgmtAddUserIntoAcct(SAcctObj *pAcct, SUserObj *pUser);
int32_t mgmtRemoveUserFromAcct(SAcctObj *pAcct, SUserObj *pUser); int32_t mgmtRemoveUserFromAcct(SAcctObj *pAcct, SUserObj *pUser);
extern int32_t (*mgmtInitAccts)(); extern int32_t (*mgmtCreateAcctFp)(char *name, char *pass, SAcctCfg *pCfg);
extern void (*mgmtCleanUpAccts)(); extern int32_t (*mgmtDropAcctFp)(char *name);
extern SAcctObj* (*mgmtGetAcct)(char *acctName); extern int32_t (*mgmtAlterAcctFp)(char *name, char *pass, SAcctCfg *pCfg);
extern int32_t (*mgmtCheckUserLimit)(SAcctObj *pAcct);
extern int32_t (*mgmtCheckDbLimit)(SAcctObj *pAcct);
extern int32_t (*mgmtCheckTableLimit)(SAcctObj *pAcct, SCreateTableMsg *pCreate);
extern int32_t (*mgmtGetAcctMeta)(STableMeta *pMeta, SShowObj *pShow, void *pConn);
extern int32_t (*mgmtRetrieveAccts)(SShowObj *pShow, char *data, int32_t rows, void *pConn);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -20,19 +20,13 @@ ...@@ -20,19 +20,13 @@
extern "C" { extern "C" {
#endif #endif
#include "os.h"
#include "mnode.h" #include "mnode.h"
extern void (*mgmtStartBalanceTimer)(int64_t mseconds); void mgmtStartBalanceTimer(int64_t mseconds);
extern int32_t (*mgmtInitBalance)(); int32_t mgmtInitBalance();
extern void (*mgmtCleanupBalance)(); void mgmtCleanupBalance();
extern int32_t (*mgmtAllocVnodes)(SVgObj *pVgroup); int32_t mgmtAllocVnodes(SVgObj *pVgroup);
extern bool (*mgmtCheckModuleInDnode)(SDnodeObj *pDnode, int moduleType); char* mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode);
extern char* (*mgmtGetVnodeStatus)(SVgObj *pVgroup, SVnodeGid *pVnode);
extern bool (*mgmtCheckVnodeReady)(SDnodeObj *pDnode, SVgObj *pVgroup, SVnodeGid *pVnode);
extern void (*mgmtUpdateDnodeState)(SDnodeObj *pDnode, int lbStatus);
extern void (*mgmtUpdateVgroupState)(SVgObj *pVgroup, int lbStatus, int srcIp);
extern bool (*mgmtAddVnode)(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -24,10 +24,6 @@ extern "C" { ...@@ -24,10 +24,6 @@ extern "C" {
#include <stdbool.h> #include <stdbool.h>
#include "mnode.h" #include "mnode.h"
int32_t mgmtCreateDnode(uint32_t ip);
int32_t mgmtDropDnode(SDnodeObj *pDnode);
int32_t mgmtDropDnodeByIp(uint32_t ip);
int32_t mgmtGetNextVnode(SVnodeGid *pVnodeGid);
void mgmtSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes, int32_t vgId); void mgmtSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes, int32_t vgId);
void mgmtUnSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes); void mgmtUnSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes);
int32_t mgmtGetDnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); int32_t mgmtGetDnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
...@@ -38,25 +34,24 @@ void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode); ...@@ -38,25 +34,24 @@ void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode);
int32_t mgmtGetConfigMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); int32_t mgmtGetConfigMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, void *pConn); int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, void *pConn);
bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType);
int32_t mgmtGetModuleMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); int32_t mgmtGetModuleMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pConn); int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pConn);
int32_t mgmtGetVnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); int32_t mgmtGetVnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
extern int32_t (*mgmtInitDnodes)(); int32_t mgmtGetScoresMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
extern void (*mgmtCleanUpDnodes)(); int32_t mgmtRetrieveScores(SShowObj *pShow, char *data, int32_t rows, void *pConn);
extern SDnodeObj* (*mgmtGetDnode)(uint32_t ip);
extern int32_t (*mgmtGetDnodesNum)(); int32_t mgmtInitDnodes();
extern void* (*mgmtGetNextDnode)(SShowObj *pShow, SDnodeObj **pDnode); void mgmtCleanUpDnodes();
extern int32_t (*mgmtUpdateDnode)(SDnodeObj *pDnode); int32_t mgmtGetDnodesNum();
extern void (*mgmtSetDnodeUnRemove)(SDnodeObj *pDnode); int32_t mgmtUpdateDnode(SDnodeObj *pDnode);
extern int32_t (*mgmtGetScoresMeta)(STableMeta *pMeta, SShowObj *pShow, void *pConn); void* mgmtGetNextDnode(SShowObj *pShow, SDnodeObj **pDnode);
extern int32_t (*mgmtRetrieveScores)(SShowObj *pShow, char *data, int32_t rows, void *pConn); bool mgmtCheckConfigShow(SGlobalConfig *cfg);
extern bool (*mgmtCheckConfigShow)(SGlobalConfig *cfg); void mgmtSetDnodeUnRemove(SDnodeObj *pDnode);
SDnodeObj* mgmtGetDnode(uint32_t ip);
extern SDnodeObj tsDnodeObj;
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -34,12 +34,12 @@ void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); ...@@ -34,12 +34,12 @@ void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle);
void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle); void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle);
void mgmtSendRemoveVgroupMsg(SVgObj *pVgroup, void *ahandle); void mgmtSendRemoveVgroupMsg(SVgObj *pVgroup, void *ahandle);
extern int32_t (*mgmtInitDnodeInt)(); int32_t mgmtInitDnodeInt();
extern void (*mgmtCleanUpDnodeInt)(); void mgmtCleanUpDnodeInt();
extern void (*mgmtProcessDnodeStatus)(void *handle, void *tmrId);
void mgmtSendMsgToDnode(SRpcIpSet *ipSet, int8_t msgType, void *pCont, int32_t contLen, void *ahandle); void mgmtSendMsgToDnode(SRpcIpSet *ipSet, int8_t msgType, void *pCont, int32_t contLen, void *ahandle);
void mgmtSendRspToDnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen); void mgmtSendRspToDnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen);
void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -17,21 +17,23 @@ ...@@ -17,21 +17,23 @@
#define TDENGINE_MGMT_GTANT_H #define TDENGINE_MGMT_GTANT_H
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { "C" {
#endif #endif
#include <stdint.h> #include <stdint.h>
#include <stdbool.h> #include <stdbool.h>
#include "mnode.h" #include "mnode.h"
extern bool (*mgmtCheckExpired)(); bool mgmtCheckExpired();
extern void (*mgmtAddTimeSeries)(SAcctObj *pAcct, uint32_t timeSeriesNum); void mgmtAddTimeSeries(SAcctObj *pAcct, uint32_t timeSeriesNum);
extern void (*mgmtRestoreTimeSeries)(SAcctObj *pAcct, uint32_t timeseries); void mgmtRestoreTimeSeries(SAcctObj *pAcct, uint32_t timeseries);
extern int32_t (*mgmtCheckTimeSeries)(uint32_t timeseries); int32_t mgmtCheckTimeSeries(uint32_t timeseries);
extern int32_t (*mgmtCheckUserGrant)(); int32_t mgmtCheckUserGrant();
extern int32_t (*mgmtCheckDbGrant)(); int32_t mgmtCheckDbGrant();
extern int32_t (*mgmtGetGrantsMeta)(STableMeta *pMeta, SShowObj *pShow, void *pConn); int32_t mgmtGetGrantsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
extern int32_t (*mgmtRetrieveGrants)(SShowObj *pShow, char *data, int rows, void *pConn); int32_t mgmtRetrieveGrants(SShowObj *pShow, char *data, int32_t rows, void *pConn);
extern void (*mgmtUpdateGrantInfoFp)(void *pCont);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -24,8 +24,8 @@ extern "C" { ...@@ -24,8 +24,8 @@ extern "C" {
#include <stdbool.h> #include <stdbool.h>
#include "mnode.h" #include "mnode.h"
extern int32_t (*mgmtGetMnodeMeta)(STableMeta *pMeta, SShowObj *pShow, void *pConn); int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
extern int32_t (*mgmtRetrieveMnodes)(SShowObj *pShow, char *data, int32_t rows, void *pConn); int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -28,13 +28,10 @@ int32_t mgmtInitShell(); ...@@ -28,13 +28,10 @@ int32_t mgmtInitShell();
void mgmtCleanUpShell(); void mgmtCleanUpShell();
extern int32_t (*mgmtCheckRedirectMsg)(void *pConn); extern int32_t (*mgmtCheckRedirectMsg)(void *pConn);
extern void (*mgmtProcessAlterAcctMsg)(void *pCont, int32_t contLen, void *ahandle);
extern void (*mgmtProcessCreateDnodeMsg)(void *pCont, int32_t contLen, void *ahandle); extern void (*mgmtProcessCreateDnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
extern void (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle); extern void (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
extern void (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle); extern void (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
extern void (*mgmtProcessDropDnodeMsg)(void *pCont, int32_t contLen, void *ahandle); extern void (*mgmtProcessDropDnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
extern void (*mgmtProcessDropAcctMsg)(void *pCont, int32_t contLen, void *ahandle);
extern void (*mgmtProcessCreateAcctMsg)(void *pCont, int32_t contLen, void *ahandle);
/* /*
* If table not exist, will create it * If table not exist, will create it
......
...@@ -22,14 +22,10 @@ extern "C" { ...@@ -22,14 +22,10 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
int32_t mgmtInitSystem();
int32_t mgmtStartSystem(); int32_t mgmtStartSystem();
void mgmtCleanUpSystem(); void mgmtCleanUpSystem();
void mgmtStopSystem();
extern int32_t (*mgmtInitSystem)();
extern int32_t (*mgmtCheckMgmtRunning)();
extern void (*mgmtDoStatistic)(void *handle, void *tmrId);
extern void (*mgmtStopSystem)();
extern void (*mgmtCleanUpRedirect)();
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -26,6 +26,20 @@ extern void *tsUserSdb; ...@@ -26,6 +26,20 @@ extern void *tsUserSdb;
extern void *tsDbSdb; extern void *tsDbSdb;
static SAcctObj tsAcctObj; static SAcctObj tsAcctObj;
int32_t (*mgmtInitAcctsFp)() = NULL;
void (*mgmtCleanUpAcctsFp)() = NULL;
int32_t (*mgmtCreateAcctFp)(char *name, char *pass, SAcctCfg *pCfg) = NULL;
int32_t (*mgmtDropAcctFp)(char *name) = NULL;
int32_t (*mgmtAlterAcctFp)(char *name, char *pass, SAcctCfg *pCfg) = NULL;
int32_t (*mgmtGetAcctMetaFp)(STableMeta *pMeta, SShowObj *pShow, void *pConn) = NULL;
int32_t (*mgmtRetrieveAcctsFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn) = NULL;
SAcctObj *(*mgmtGetAcctFp)(char *acctName) = NULL;
int32_t (*mgmtCheckUserLimitFp)(SAcctObj *pAcct) = NULL;
int32_t (*mgmtCheckDbLimitFp)(SAcctObj *pAcct) = NULL;
int32_t (*mgmtCheckTimeSeriesLimitFp)(SAcctObj *pAcct, int32_t numOfTimeSeries) = NULL;
int32_t mgmtAddDbIntoAcct(SAcctObj *pAcct, SDbObj *pDb) { int32_t mgmtAddDbIntoAcct(SAcctObj *pAcct, SDbObj *pDb) {
pthread_mutex_lock(&pAcct->mutex); pthread_mutex_lock(&pAcct->mutex);
pDb->next = pAcct->pHead; pDb->next = pAcct->pHead;
...@@ -97,73 +111,90 @@ int32_t mgmtRemoveUserFromAcct(SAcctObj *pAcct, SUserObj *pUser) { ...@@ -97,73 +111,90 @@ int32_t mgmtRemoveUserFromAcct(SAcctObj *pAcct, SUserObj *pUser) {
return 0; return 0;
} }
int32_t mgmtInitAcctsImp() { int32_t mgmtInitAccts() {
SAcctObj *pAcct = &tsAcctObj; if (mgmtInitAcctsFp) {
pAcct->acctId = 0; return mgmtInitAcctsFp();
strcpy(pAcct->user, "root"); } else {
return 0; SAcctObj *pAcct = &tsAcctObj;
pAcct->acctId = 0;
strcpy(pAcct->user, "root");
return 0;
}
} }
int32_t (*mgmtInitAccts)() = mgmtInitAcctsImp; SAcctObj *mgmtGetAcct(char *acctName) {
if (mgmtGetAcctFp) {
static SAcctObj *mgmtGetAcctImp(char *acctName) { return mgmtGetAcctFp(acctName);
return &tsAcctObj; } else {
return &tsAcctObj;
}
} }
SAcctObj *(*mgmtGetAcct)(char *acctName) = mgmtGetAcctImp; int32_t mgmtCheckUserLimit(SAcctObj *pAcct) {
if (mgmtCheckUserLimitFp) {
static int32_t mgmtCheckUserLimitImp(SAcctObj *pAcct) { return mgmtCheckUserLimitFp(pAcct);
int32_t numOfUsers = sdbGetNumOfRows(tsUserSdb); } else {
if (numOfUsers >= tsMaxUsers) { int32_t numOfUsers = sdbGetNumOfRows(tsUserSdb);
mWarn("numOfUsers:%d, exceed tsMaxUsers:%d", numOfUsers, tsMaxUsers); if (numOfUsers >= tsMaxUsers) {
return TSDB_CODE_TOO_MANY_USERS; mWarn("numOfUsers:%d, exceed tsMaxUsers:%d", numOfUsers, tsMaxUsers);
return TSDB_CODE_TOO_MANY_USERS;
}
return 0;
} }
return 0;
} }
int32_t (*mgmtCheckUserLimit)(SAcctObj *pAcct) = mgmtCheckUserLimitImp; int32_t mgmtCheckDbLimit(SAcctObj *pAcct) {
if (mgmtCheckDbLimitFp) {
static int32_t mgmtCheckDbLimitImp(SAcctObj *pAcct) { return mgmtCheckDbLimitFp(pAcct);
int32_t numOfDbs = sdbGetNumOfRows(tsDbSdb); } else {
if (numOfDbs >= tsMaxDbs) { int32_t numOfDbs = sdbGetNumOfRows(tsDbSdb);
mWarn("numOfDbs:%d, exceed tsMaxDbs:%d", numOfDbs, tsMaxDbs); if (numOfDbs >= tsMaxDbs) {
return TSDB_CODE_TOO_MANY_DATABASES; mWarn("numOfDbs:%d, exceed tsMaxDbs:%d", numOfDbs, tsMaxDbs);
return TSDB_CODE_TOO_MANY_DATABASES;
}
return 0;
} }
return 0;
} }
int32_t (*mgmtCheckDbLimit)(SAcctObj *pAcct) = mgmtCheckDbLimitImp; int32_t mgmtCheckTableLimit(SAcctObj *pAcct, int32_t numOfTimeSeries) {
if (mgmtCheckTimeSeriesLimitFp) {
static int32_t mgmtCheckTableLimitImp(SAcctObj *pAcct, SCreateTableMsg *pCreate) { return mgmtCheckTimeSeriesLimitFp(pAcct, numOfTimeSeries);
return 0; } else {
return 0;
}
} }
int32_t (*mgmtCheckTableLimit)(SAcctObj *pAcct, SCreateTableMsg *pCreate) = mgmtCheckTableLimitImp; void mgmtCleanUpAccts() {
if (mgmtCleanUpAcctsFp) {
static void mgmtCleanUpAcctsImp() { mgmtCleanUpAcctsFp();
}
} }
void (*mgmtCleanUpAccts)() = mgmtCleanUpAcctsImp; int32_t mgmtGetAcctMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
if (mgmtGetAcctMetaFp) {
static int32_t mgmtGetAcctMetaImp(STableMeta *pMeta, SShowObj *pShow, void *pConn) { return mgmtGetAcctMetaFp(pMeta, pShow, pConn);
return TSDB_CODE_OPS_NOT_SUPPORT; } else {
return TSDB_CODE_OPS_NOT_SUPPORT;
}
} }
int32_t (*mgmtGetAcctMeta)(STableMeta *pMeta, SShowObj *pShow, void *pConn) = mgmtGetAcctMetaImp; int32_t mgmtRetrieveAccts(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
if (mgmtRetrieveAcctsFp) {
static int32_t mgmtRetrieveAcctsImp(SShowObj *pShow, char *data, int32_t rows, void *pConn) { return mgmtRetrieveAcctsFp(pShow, data, rows, pConn);
return 0; } else {
return 0;
}
} }
int32_t (*mgmtRetrieveAccts)(SShowObj *pShow, char *data, int32_t rows, void *pConn) = mgmtRetrieveAcctsImp;
SAcctObj *mgmtGetAcctFromConn(void *pConn) { SAcctObj *mgmtGetAcctFromConn(void *pConn) {
SRpcConnInfo connInfo; SRpcConnInfo connInfo;
rpcGetConnInfo(pConn, &connInfo); rpcGetConnInfo(pConn, &connInfo);
SUserObj *pUser = mgmtGetUser(connInfo.user);
if(pUser != NULL) { SUserObj *pUser = mgmtGetUser(connInfo.user);
if (pUser != NULL) {
return pUser->pAcct; return pUser->pAcct;
} }
return NULL; return NULL;
} }
...@@ -21,19 +21,42 @@ ...@@ -21,19 +21,42 @@
#include "mgmtBalance.h" #include "mgmtBalance.h"
#include "mgmtDnode.h" #include "mgmtDnode.h"
void mgmtStartBalanceTimerImp(int64_t mseconds) {} void (*mgmtStartBalanceTimerFp)(int64_t mseconds) = NULL;
void (*mgmtStartBalanceTimer)(int64_t mseconds) = mgmtStartBalanceTimerImp; int32_t (*mgmtInitBalanceFp)() = NULL;
void (*mgmtCleanupBalanceFp)() = NULL;
int32_t (*mgmtAllocVnodesFp)(SVgObj *pVgroup) = NULL;
char * (*mgmtGetVnodeStatusFp)(SVgObj *pVgroup, SVnodeGid *pVnode) = NULL;
void mgmtStartBalanceTimer(int64_t mseconds) {
if (mgmtStartBalanceTimerFp) {
(*mgmtStartBalanceTimerFp)(mseconds);
}
}
int32_t mgmtInitBalanceImp() { return 0; } int32_t mgmtInitBalance() {
int32_t (*mgmtInitBalance)() = mgmtInitBalanceImp; if (mgmtInitBalanceFp) {
return (*mgmtInitBalanceFp)();
} else {
return 0;
}
}
void mgmtCleanupBalanceImp() {} void mgmtCleanupBalance() {
void (*mgmtCleanupBalance)() = mgmtCleanupBalanceImp; if (mgmtCleanupBalanceFp) {
(*mgmtCleanupBalanceFp)();
}
}
int32_t mgmtAllocVnodesImp(SVgObj *pVgroup) { int32_t mgmtAllocVnodes(SVgObj *pVgroup) {
int selectedVnode = -1; if (mgmtAllocVnodesFp) {
SDnodeObj *pDnode = &tsDnodeObj; return mgmtAllocVnodesFp(pVgroup);
int lastAllocVode = pDnode->lastAllocVnode; }
SDnodeObj *pDnode = mgmtGetDnode(0);
if (pDnode == NULL) return TSDB_CODE_OTHERS;
int selectedVnode = -1;
int lastAllocVode = pDnode->lastAllocVnode;
for (int i = 0; i < pDnode->numOfVnodes; i++) { for (int i = 0; i < pDnode->numOfVnodes; i++) {
int vnode = (i + lastAllocVode) % pDnode->numOfVnodes; int vnode = (i + lastAllocVode) % pDnode->numOfVnodes;
...@@ -49,44 +72,16 @@ int32_t mgmtAllocVnodesImp(SVgObj *pVgroup) { ...@@ -49,44 +72,16 @@ int32_t mgmtAllocVnodesImp(SVgObj *pVgroup) {
} else { } else {
mTrace("vgroup:%d allocate vnode:%d, last allocated vnode:%d", pVgroup->vgId, selectedVnode, lastAllocVode); mTrace("vgroup:%d allocate vnode:%d, last allocated vnode:%d", pVgroup->vgId, selectedVnode, lastAllocVode);
pVgroup->vnodeGid[0].vnode = selectedVnode; pVgroup->vnodeGid[0].vnode = selectedVnode;
pDnode->lastAllocVnode = selectedVnode + 1; pDnode->lastAllocVnode = selectedVnode + 1;
if (pDnode->lastAllocVnode >= pDnode->numOfVnodes) pDnode->lastAllocVnode = 0; if (pDnode->lastAllocVnode >= pDnode->numOfVnodes) pDnode->lastAllocVnode = 0;
return 0; return 0;
} }
} }
int32_t (*mgmtAllocVnodes)(SVgObj *pVgroup) = mgmtAllocVnodesImp;
bool mgmtCheckModuleInDnodeImp(SDnodeObj *pDnode, int moduleType) {
return tsModule[moduleType].num != 0;
}
bool (*mgmtCheckModuleInDnode)(SDnodeObj *pDnode, int moduleType) = mgmtCheckModuleInDnodeImp;
char *mgmtGetVnodeStatusImp(SVgObj *pVgroup, SVnodeGid *pVnode) {
return "master";
}
char *(*mgmtGetVnodeStatus)(SVgObj *pVgroup, SVnodeGid *pVnode) = mgmtGetVnodeStatusImp;
bool mgmtCheckVnodeReadyImp(SDnodeObj *pDnode, SVgObj *pVgroup, SVnodeGid *pVnode) {
return true;
}
bool (*mgmtCheckVnodeReady)(SDnodeObj *pDnode, SVgObj *pVgroup, SVnodeGid *pVnode) = mgmtCheckVnodeReadyImp;
void mgmtUpdateDnodeStateImp(SDnodeObj *pDnode, int lbStatus) {
}
void (*mgmtUpdateDnodeState)(SDnodeObj *pDnode, int lbStatus) = mgmtUpdateDnodeStateImp;
void mgmtUpdateVgroupStateImp(SVgObj *pVgroup, int lbStatus, int srcIp) { char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode) {
} if (mgmtGetVnodeStatusFp) {
return (*mgmtGetVnodeStatusFp)(pVgroup, pVnode);
void (*mgmtUpdateVgroupState)(SVgObj *pVgroup, int lbStatus, int srcIp) = mgmtUpdateVgroupStateImp; } else {
return "master";
bool mgmtAddVnodeImp(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode) { }
return false;
} }
bool (*mgmtAddVnode)(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode) = mgmtAddVnodeImp;
...@@ -442,7 +442,7 @@ int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName ...@@ -442,7 +442,7 @@ int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName
// //
// mgmtMeterActionEncode(pTable, msg, size, &rowSize); // mgmtMeterActionEncode(pTable, msg, size, &rowSize);
// //
// int32_t ret = sdbUpdateRow(meterSdb, msg, rowSize, 1); // Need callback function // int32_t ret = sdbUpdateRow(tsChildTableSdb, msg, rowSize, 1); // Need callback function
// tfree(msg); // tfree(msg);
// //
// if (pTable->isDirty) pTable->isDirty = 0; // if (pTable->isDirty) pTable->isDirty = 0;
......
...@@ -409,68 +409,69 @@ void mgmtMonitorDbDrop(void *unused, void *unusedt) { ...@@ -409,68 +409,69 @@ void mgmtMonitorDbDrop(void *unused, void *unusedt) {
} }
int32_t mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) { int32_t mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) {
int32_t code = TSDB_CODE_SUCCESS; return 0;
// int32_t code = TSDB_CODE_SUCCESS;
SDbObj *pDb = (SDbObj *) sdbGetRow(tsDbSdb, pAlter->db); //
if (pDb == NULL) { // SDbObj *pDb = (SDbObj *) sdbGetRow(tsDbSdb, pAlter->db);
mTrace("db:%s is not exist", pAlter->db); // if (pDb == NULL) {
return TSDB_CODE_INVALID_DB; // mTrace("db:%s is not exist", pAlter->db);
} // return TSDB_CODE_INVALID_DB;
// }
int32_t oldReplicaNum = pDb->cfg.replications; //
if (pAlter->daysToKeep > 0) { // int32_t oldReplicaNum = pDb->cfg.replications;
mTrace("db:%s daysToKeep:%d change to %d", pDb->name, pDb->cfg.daysToKeep, pAlter->daysToKeep); // if (pAlter->daysToKeep > 0) {
pDb->cfg.daysToKeep = pAlter->daysToKeep; // mTrace("db:%s daysToKeep:%d change to %d", pDb->name, pDb->cfg.daysToKeep, pAlter->daysToKeep);
} else if (pAlter->replications > 0) { // pDb->cfg.daysToKeep = pAlter->daysToKeep;
mTrace("db:%s replica:%d change to %d", pDb->name, pDb->cfg.replications, pAlter->replications); // } else if (pAlter->replications > 0) {
if (pAlter->replications < TSDB_REPLICA_MIN_NUM || pAlter->replications > TSDB_REPLICA_MAX_NUM) { // mTrace("db:%s replica:%d change to %d", pDb->name, pDb->cfg.replications, pAlter->replications);
mError("invalid db option replica: %d valid range: %d--%d", pAlter->replications, TSDB_REPLICA_MIN_NUM, TSDB_REPLICA_MAX_NUM); // if (pAlter->replications < TSDB_REPLICA_MIN_NUM || pAlter->replications > TSDB_REPLICA_MAX_NUM) {
return TSDB_CODE_INVALID_OPTION; // mError("invalid db option replica: %d valid range: %d--%d", pAlter->replications, TSDB_REPLICA_MIN_NUM, TSDB_REPLICA_MAX_NUM);
} // return TSDB_CODE_INVALID_OPTION;
pDb->cfg.replications = pAlter->replications; // }
} else if (pAlter->maxSessions > 0) { // pDb->cfg.replications = pAlter->replications;
mTrace("db:%s tables:%d change to %d", pDb->name, pDb->cfg.maxSessions, pAlter->maxSessions); // } else if (pAlter->maxSessions > 0) {
if (pAlter->maxSessions < TSDB_MIN_TABLES_PER_VNODE || pAlter->maxSessions > TSDB_MAX_TABLES_PER_VNODE) { // mTrace("db:%s tables:%d change to %d", pDb->name, pDb->cfg.maxSessions, pAlter->maxSessions);
mError("invalid db option tables: %d valid range: %d--%d", pAlter->maxSessions, TSDB_MIN_TABLES_PER_VNODE, TSDB_MAX_TABLES_PER_VNODE); // if (pAlter->maxSessions < TSDB_MIN_TABLES_PER_VNODE || pAlter->maxSessions > TSDB_MAX_TABLES_PER_VNODE) {
return TSDB_CODE_INVALID_OPTION; // mError("invalid db option tables: %d valid range: %d--%d", pAlter->maxSessions, TSDB_MIN_TABLES_PER_VNODE, TSDB_MAX_TABLES_PER_VNODE);
} // return TSDB_CODE_INVALID_OPTION;
if (pAlter->maxSessions < pDb->cfg.maxSessions) { // }
mError("invalid db option tables: %d should larger than original:%d", pAlter->maxSessions, pDb->cfg.maxSessions); // if (pAlter->maxSessions < pDb->cfg.maxSessions) {
return TSDB_CODE_INVALID_OPTION; // mError("invalid db option tables: %d should larger than original:%d", pAlter->maxSessions, pDb->cfg.maxSessions);
} // return TSDB_CODE_INVALID_OPTION;
return TSDB_CODE_INVALID_OPTION; // }
//The modification of tables needs to rewrite the head file, so disable this option // return TSDB_CODE_INVALID_OPTION;
//pDb->cfg.maxSessions = pAlter->maxSessions; // //The modification of tables needs to rewrite the head file, so disable this option
} else { // //pDb->cfg.maxSessions = pAlter->maxSessions;
mError("db:%s alter msg, replica:%d, keep:%d, tables:%d, origin replica:%d keep:%d", pDb->name, // } else {
pAlter->replications, pAlter->maxSessions, pAlter->daysToKeep, // mError("db:%s alter msg, replica:%d, keep:%d, tables:%d, origin replica:%d keep:%d", pDb->name,
pDb->cfg.replications, pDb->cfg.daysToKeep); // pAlter->replications, pAlter->maxSessions, pAlter->daysToKeep,
return TSDB_CODE_INVALID_OPTION; // pDb->cfg.replications, pDb->cfg.daysToKeep);
} // return TSDB_CODE_INVALID_OPTION;
// }
if (sdbUpdateRow(tsDbSdb, pDb, tsDbUpdateSize, 1) < 0) { //
return TSDB_CODE_SDB_ERROR; // if (sdbUpdateRow(tsDbSdb, pDb, tsDbUpdateSize, 1) < 0) {
} // return TSDB_CODE_SDB_ERROR;
// }
SVgObj *pVgroup = pDb->pHead; //
while (pVgroup != NULL) { // SVgObj *pVgroup = pDb->pHead;
mgmtUpdateVgroupState(pVgroup, TSDB_VG_LB_STATUS_UPDATE, 0); // while (pVgroup != NULL) {
if (oldReplicaNum < pDb->cfg.replications) { // mgmtUpdateVgroupState(pVgroup, TSDB_VG_LB_STATUS_UPDATE, 0);
if (!mgmtAddVnode(pVgroup, NULL, NULL)) { // if (oldReplicaNum < pDb->cfg.replications) {
mWarn("db:%s vgroup:%d not enough dnode to add vnode", pAlter->db, pVgroup->vgId); // if (!mgmtAddVnode(pVgroup, NULL, NULL)) {
code = TSDB_CODE_NO_ENOUGH_DNODES; // mWarn("db:%s vgroup:%d not enough dnode to add vnode", pAlter->db, pVgroup->vgId);
} // code = TSDB_CODE_NO_ENOUGH_DNODES;
} // }
if (pAlter->maxSessions > 0) { // }
//rebuild meterList in mgmtVgroup.c // if (pAlter->maxSessions > 0) {
mgmtUpdateVgroup(pVgroup); // //rebuild meterList in mgmtVgroup.c
} // mgmtUpdateVgroup(pVgroup);
// mgmtSendCreateVnodeMsg(pVgroup); // }
pVgroup = pVgroup->next; //// mgmtSendCreateVnodeMsg(pVgroup);
} // pVgroup = pVgroup->next;
mgmtStartBalanceTimer(10); // }
// mgmtStartBalanceTimer(10);
return code; //
// return code;
} }
int32_t mgmtAddVgroupIntoDb(SDbObj *pDb, SVgObj *pVgroup) { int32_t mgmtAddVgroupIntoDb(SDbObj *pDb, SVgObj *pVgroup) {
......
...@@ -24,7 +24,17 @@ ...@@ -24,7 +24,17 @@
#include "mgmtUser.h" #include "mgmtUser.h"
#include "mgmtVgroup.h" #include "mgmtVgroup.h"
SDnodeObj tsDnodeObj; int32_t (*mgmtInitDnodesFp)() = NULL;
void (*mgmtCleanUpDnodesFp)() = NULL;
SDnodeObj *(*mgmtGetDnodeFp)(uint32_t ip) = NULL;
int32_t (*mgmtGetDnodesNumFp)() = NULL;
int32_t (*mgmtUpdateDnodeFp)(SDnodeObj *pDnode) = NULL;
void * (*mgmtGetNextDnodeFp)(SShowObj *pShow, SDnodeObj **pDnode) = NULL;
int32_t (*mgmtGetScoresMetaFp)(STableMeta *pMeta, SShowObj *pShow, void *pConn) = NULL;
int32_t (*mgmtRetrieveScoresFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn) = NULL;
void (*mgmtSetDnodeUnRemoveFp)(SDnodeObj *pDnode) = NULL;
static SDnodeObj tsDnodeObj = {0};
void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode) { void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode) {
int32_t maxVnodes = pDnode->numOfCores * tsNumOfVnodesPerCore; int32_t maxVnodes = pDnode->numOfCores * tsNumOfVnodesPerCore;
...@@ -154,7 +164,9 @@ int32_t mgmtGetDnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { ...@@ -154,7 +164,9 @@ int32_t mgmtGetDnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
pShow->numOfColumns = cols; pShow->numOfColumns = cols;
pShow->offset[0] = 0; pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = mgmtGetDnodesNum(); pShow->numOfRows = mgmtGetDnodesNum();
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
...@@ -165,9 +177,9 @@ int32_t mgmtGetDnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { ...@@ -165,9 +177,9 @@ int32_t mgmtGetDnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn) { int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0; int32_t numOfRows = 0;
int32_t cols = 0;
SDnodeObj *pDnode = NULL; SDnodeObj *pDnode = NULL;
char *pWrite; char *pWrite;
int32_t cols = 0;
char ipstr[20]; char ipstr[20];
while (numOfRows < rows) { while (numOfRows < rows) {
...@@ -213,6 +225,11 @@ int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, void *pCon ...@@ -213,6 +225,11 @@ int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, void *pCon
return numOfRows; return numOfRows;
} }
bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) {
uint32_t status = pDnode->moduleStatus & (1 << moduleType);
return status > 0;
}
int32_t mgmtGetModuleMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { int32_t mgmtGetModuleMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0; int32_t cols = 0;
...@@ -517,85 +534,102 @@ int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pCon ...@@ -517,85 +534,102 @@ int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pCon
return numOfRows; return numOfRows;
} }
SDnodeObj *mgmtGetDnodeImp(uint32_t ip) { int32_t mgmtInitDnodes() {
return &tsDnodeObj; if (mgmtInitDnodesFp) {
} return mgmtInitDnodesFp();
} else {
SDnodeObj *(*mgmtGetDnode)(uint32_t ip) = mgmtGetDnodeImp; tsDnodeObj.privateIp = inet_addr(tsPrivateIp);;
tsDnodeObj.createdTime = taosGetTimestampMs();
tsDnodeObj.lastReboot = taosGetTimestampSec();
tsDnodeObj.numOfCores = (uint16_t) tsNumOfCores;
tsDnodeObj.status = TSDB_DN_STATUS_READY;
tsDnodeObj.alternativeRole = TSDB_DNODE_ROLE_ANY;
tsDnodeObj.numOfTotalVnodes = tsNumOfTotalVnodes;
tsDnodeObj.thandle = (void *) (1); //hack way
tsDnodeObj.status = TSDB_DN_STATUS_READY;
mgmtSetDnodeMaxVnodes(&tsDnodeObj);
int32_t mgmtUpdateDnodeImp(SDnodeObj *pDnode) { tsDnodeObj.moduleStatus |= (1 << TSDB_MOD_MGMT);
return 0; if (tsEnableHttpModule) {
tsDnodeObj.moduleStatus |= (1 << TSDB_MOD_HTTP);
}
if (tsEnableMonitorModule) {
tsDnodeObj.moduleStatus |= (1 << TSDB_MOD_MONITOR);
}
return 0;
}
} }
int32_t (*mgmtUpdateDnode)(SDnodeObj *pDnode) = mgmtUpdateDnodeImp; void mgmtCleanUpDnodes() {
if (mgmtCleanUpDnodesFp) {
void mgmtCleanUpDnodesImp() { mgmtCleanUpDnodesFp();
}
} }
void (*mgmtCleanUpDnodes)() = mgmtCleanUpDnodesImp; SDnodeObj *mgmtGetDnode(uint32_t ip) {
if (mgmtGetDnodeFp) {
int32_t mgmtInitDnodesImp() { return mgmtGetDnodeFp(ip);
tsDnodeObj.privateIp = inet_addr(tsPrivateIp);; } else {
tsDnodeObj.createdTime = taosGetTimestampMs(); return &tsDnodeObj;
tsDnodeObj.lastReboot = taosGetTimestampSec();
tsDnodeObj.numOfCores = (uint16_t) tsNumOfCores;
tsDnodeObj.status = TSDB_DN_STATUS_READY;
tsDnodeObj.alternativeRole = TSDB_DNODE_ROLE_ANY;
tsDnodeObj.numOfTotalVnodes = tsNumOfTotalVnodes;
tsDnodeObj.thandle = (void *) (1); //hack way
if (tsDnodeObj.numOfVnodes == TSDB_INVALID_VNODE_NUM) {
mgmtSetDnodeMaxVnodes(&tsDnodeObj);
mPrint("dnode first access, set total vnodes:%d", tsDnodeObj.numOfVnodes);
} }
tsDnodeObj.status = TSDB_DN_STATUS_READY;
return 0;
} }
int32_t (*mgmtInitDnodes)() = mgmtInitDnodesImp; int32_t mgmtGetDnodesNum() {
if (mgmtGetDnodesNumFp) {
int32_t mgmtGetDnodesNumImp() { return mgmtGetDnodesNumFp();
return 1; } else {
return 1;
}
} }
int32_t (*mgmtGetDnodesNum)() = mgmtGetDnodesNumImp; int32_t mgmtUpdateDnode(SDnodeObj *pDnode) {
if (mgmtUpdateDnodeFp) {
return mgmtUpdateDnodeFp(pDnode);
} else {
return 0;
}
}
void *mgmtGetNextDnodeImp(SShowObj *pShow, SDnodeObj **pDnode) { void *mgmtGetNextDnode(SShowObj *pShow, SDnodeObj **pDnode) {
if (*pDnode == NULL) { if (mgmtGetNextDnodeFp) {
*pDnode = &tsDnodeObj; return mgmtGetNextDnodeFp(pShow, pDnode);
} else { } else {
*pDnode = NULL; if (*pDnode == NULL) {
*pDnode = &tsDnodeObj;
} else {
*pDnode = NULL;
}
} }
return *pDnode; return *pDnode;
} }
void *(*mgmtGetNextDnode)(SShowObj *pShow, SDnodeObj **pDnode) = mgmtGetNextDnodeImp; int32_t mgmtGetScoresMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
if (mgmtGetScoresMetaFp) {
int32_t mgmtGetScoresMetaImp(STableMeta *pMeta, SShowObj *pShow, void *pConn) { return mgmtGetScoresMetaFp(pMeta, pShow, pConn);
return TSDB_CODE_OPS_NOT_SUPPORT; } else {
return TSDB_CODE_OPS_NOT_SUPPORT;
}
} }
int32_t (*mgmtGetScoresMeta)(STableMeta *pMeta, SShowObj *pShow, void *pConn) = mgmtGetScoresMetaImp; int32_t mgmtRetrieveScores(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
if (mgmtRetrieveScoresFp) {
int32_t mgmtRetrieveScoresImp(SShowObj *pShow, char *data, int32_t rows, void *pConn) { return mgmtRetrieveScoresFp(pShow, data, rows, pConn);
return 0; } else {
return 0;
}
} }
int32_t (*mgmtRetrieveScores)(SShowObj *pShow, char *data, int32_t rows, void *pConn) = mgmtRetrieveScoresImp; void mgmtSetDnodeUnRemove(SDnodeObj *pDnode) {
if (mgmtSetDnodeUnRemoveFp) {
void mgmtSetDnodeUnRemoveImp(SDnodeObj *pDnode) { mgmtSetDnodeUnRemoveFp(pDnode);
}
} }
void (*mgmtSetDnodeUnRemove)(SDnodeObj *pDnode) = mgmtSetDnodeUnRemoveImp; bool mgmtCheckConfigShow(SGlobalConfig *cfg) {
bool mgmtCheckConfigShowImp(SGlobalConfig *cfg) {
if (cfg->cfgType & TSDB_CFG_CTYPE_B_CLUSTER) if (cfg->cfgType & TSDB_CFG_CTYPE_B_CLUSTER)
return false; return false;
if (cfg->cfgType & TSDB_CFG_CTYPE_B_NOT_PRINT) if (cfg->cfgType & TSDB_CFG_CTYPE_B_NOT_PRINT)
return false; return false;
return true; return true;
} }
bool (*mgmtCheckConfigShow)(SGlobalConfig *cfg) = mgmtCheckConfigShowImp;
...@@ -26,15 +26,21 @@ ...@@ -26,15 +26,21 @@
#include "mgmtDb.h" #include "mgmtDb.h"
#include "mgmtDnode.h" #include "mgmtDnode.h"
#include "mgmtDnodeInt.h" #include "mgmtDnodeInt.h"
#include "mgmtGrant.h"
#include "mgmtProfile.h" #include "mgmtProfile.h"
#include "mgmtShell.h" #include "mgmtShell.h"
#include "mgmtTable.h" #include "mgmtTable.h"
#include "mgmtVgroup.h" #include "mgmtVgroup.h"
int32_t (*mgmtInitDnodeIntFp)() = NULL;
void (*mgmtCleanUpDnodeIntFp)() = NULL;
void (*mgmtSendMsgToDnodeFp)(SRpcIpSet *ipSet, int8_t msgType, void *pCont, int32_t contLen, void *ahandle) = NULL; void (*mgmtSendMsgToDnodeFp)(SRpcIpSet *ipSet, int8_t msgType, void *pCont, int32_t contLen, void *ahandle) = NULL;
void (*mgmtSendRspToDnodeFp)(void *handle, int32_t code, void *pCont, int32_t contLen) = NULL; void (*mgmtSendRspToDnodeFp)(void *handle, int32_t code, void *pCont, int32_t contLen) = NULL;
void *mgmtStatusTimer = NULL; void *mgmtStatusTimer = NULL;
static void mgmtProcessDnodeStatus(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code);
static void mgmtSendMsgToDnodeQueueFp(SSchedMsg *sched) { static void mgmtSendMsgToDnodeQueueFp(SSchedMsg *sched) {
int32_t contLen = *(int32_t *) (sched->msg - 4); int32_t contLen = *(int32_t *) (sched->msg - 4);
int32_t code = *(int32_t *) (sched->msg - 8); int32_t code = *(int32_t *) (sched->msg - 8);
...@@ -225,6 +231,14 @@ void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, vo ...@@ -225,6 +231,14 @@ void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, vo
} }
} }
static void mgmtProcessDnodeGrantMsg(void *pCont, void *thandle) {
if (mgmtUpdateGrantInfoFp) {
mgmtUpdateGrantInfoFp(pCont);
mTrace("grant info is updated");
}
rpcSendResponse(thandle, TSDB_CODE_SUCCESS, NULL, 0);
}
void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) { void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) {
if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) { if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) {
mError("invalid msg type:%d", msgType); mError("invalid msg type:%d", msgType);
...@@ -249,6 +263,10 @@ void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *p ...@@ -249,6 +263,10 @@ void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *p
mgmtProcessDropStableRsp(msgType, pCont, contLen, pConn, code); mgmtProcessDropStableRsp(msgType, pCont, contLen, pConn, code);
} else if (msgType == TSDB_MSG_TYPE_DNODE_CFG_RSP) { } else if (msgType == TSDB_MSG_TYPE_DNODE_CFG_RSP) {
} else if (msgType == TSDB_MSG_TYPE_ALTER_STREAM_RSP) { } else if (msgType == TSDB_MSG_TYPE_ALTER_STREAM_RSP) {
} else if (msgType == TSDB_MSG_TYPE_STATUS) {
mgmtProcessDnodeStatus(msgType, pCont, contLen, pConn, code);
} else if (msgType == TSDB_MSG_TYPE_GRANT) {
mgmtProcessDnodeGrantMsg(pCont, pConn);
} else { } else {
mError("%s from dnode is not processed", taosMsg[(int8_t)msgType]); mError("%s from dnode is not processed", taosMsg[(int8_t)msgType]);
} }
...@@ -256,8 +274,6 @@ void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *p ...@@ -256,8 +274,6 @@ void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *p
//rpcFreeCont(pCont); //rpcFreeCont(pCont);
} }
void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) { void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) {
mTrace("table:%s, sid:%d send alter stream msg, ahandle:%p", pTable->tableId, pTable->sid, ahandle); mTrace("table:%s, sid:%d send alter stream msg, ahandle:%p", pTable->tableId, pTable->sid, ahandle);
} }
...@@ -317,108 +333,113 @@ int32_t mgmtCfgDynamicOptions(SDnodeObj *pDnode, char *msg) { ...@@ -317,108 +333,113 @@ int32_t mgmtCfgDynamicOptions(SDnodeObj *pDnode, char *msg) {
} }
int32_t mgmtSendCfgDnodeMsg(char *cont) { int32_t mgmtSendCfgDnodeMsg(char *cont) {
#ifdef CLUSTER //#ifdef CLUSTER
char * pMsg, *pStart; // char * pMsg, *pStart;
int32_t msgLen = 0; // int32_t msgLen = 0;
#endif //#endif
//
SDnodeObj *pDnode; // SDnodeObj *pDnode;
SCfgDnodeMsg * pCfg = (SCfgDnodeMsg *)cont; // SCfgDnodeMsg * pCfg = (SCfgDnodeMsg *)cont;
uint32_t ip; // uint32_t ip;
//
ip = inet_addr(pCfg->ip); // ip = inet_addr(pCfg->ip);
pDnode = mgmtGetDnode(ip); // pDnode = mgmtGetDnode(ip);
if (pDnode == NULL) { // if (pDnode == NULL) {
mError("dnode ip:%s not configured", pCfg->ip); // mError("dnode ip:%s not configured", pCfg->ip);
return TSDB_CODE_NOT_CONFIGURED; // return TSDB_CODE_NOT_CONFIGURED;
} // }
//
mTrace("dnode:%s, dynamic option received, content:%s", taosIpStr(pDnode->privateIp), pCfg->config); // mTrace("dnode:%s, dynamic option received, content:%s", taosIpStr(pDnode->privateIp), pCfg->config);
int32_t code = mgmtCfgDynamicOptions(pDnode, pCfg->config); // int32_t code = mgmtCfgDynamicOptions(pDnode, pCfg->config);
if (code != -1) { // if (code != -1) {
return code; // return code;
} // }
//
#ifdef CLUSTER //#ifdef CLUSTER
pStart = taosBuildReqMsg(pDnode->thandle, TSDB_MSG_TYPE_DNODE_CFG); // pStart = taosBuildReqMsg(pDnode->thandle, TSDB_MSG_TYPE_DNODE_CFG);
if (pStart == NULL) return TSDB_CODE_NODE_OFFLINE; // if (pStart == NULL) return TSDB_CODE_NODE_OFFLINE;
pMsg = pStart; // pMsg = pStart;
//
memcpy(pMsg, cont, sizeof(SCfgDnodeMsg)); // memcpy(pMsg, cont, sizeof(SCfgDnodeMsg));
pMsg += sizeof(SCfgDnodeMsg); // pMsg += sizeof(SCfgDnodeMsg);
//
msgLen = pMsg - pStart; // msgLen = pMsg - pStart;
mgmtSendMsgToDnode(pDnode, pStart, msgLen); // mgmtSendMsgToDnode(pDnode, pStart, msgLen);
#else //#else
(void)tsCfgDynamicOptions(pCfg->config); // (void)tsCfgDynamicOptions(pCfg->config);
#endif //#endif
return 0; // return 0;
} }
int32_t mgmtInitDnodeIntImp() { return 0; } int32_t mgmtInitDnodeInt() {
int32_t (*mgmtInitDnodeInt)() = mgmtInitDnodeIntImp; if (mgmtInitDnodeIntFp) {
return mgmtInitDnodeIntFp();
void mgmtCleanUpDnodeIntImp() {} } else {
void (*mgmtCleanUpDnodeInt)() = mgmtCleanUpDnodeIntImp; return 0;
}
void mgmtProcessDnodeStatusImp(void *handle, void *tmrId) { }
/*
SDnodeObj *pObj = &tsDnodeObj;
pObj->openVnodes = tsOpenVnodes;
pObj->status = TSDB_DN_STATUS_READY;
float memoryUsedMB = 0; void mgmtCleanUpDnodeInt() {
taosGetSysMemory(&memoryUsedMB); if (mgmtCleanUpDnodeIntFp) {
pObj->diskAvailable = tsAvailDataDirGB; mgmtCleanUpDnodeIntFp();
}
for (int32_t vnode = 0; vnode < pObj->numOfVnodes; ++vnode) { }
SVnodeLoad *pVload = &(pObj->vload[vnode]);
SVnodeObj * pVnode = vnodeList + vnode;
// wait vnode dropped
if (pVload->dropStatus == TSDB_VN_DROP_STATUS_DROPPING) {
if (vnodeList[vnode].cfg.maxSessions <= 0) {
pVload->dropStatus = TSDB_VN_DROP_STATUS_READY;
pVload->status = TSDB_VN_STATUS_OFFLINE;
mPrint("dnode:%s, vid:%d, drop finished", taosIpStr(pObj->privateIp), vnode);
taosTmrStart(mgmtMonitorDbDrop, 10000, NULL, tsMgmtTmr);
}
}
if (vnodeList[vnode].cfg.maxSessions <= 0) { void mgmtProcessDnodeStatus(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) {
continue; SStatusMsg *pStatus = (SStatusMsg *)pCont;
}
pVload->vnode = vnode; SDnodeObj *pObj = mgmtGetDnode(htonl(pStatus->privateIp));
pVload->status = TSDB_VN_STATUS_MASTER; if (pObj == NULL) {
pVload->totalStorage = pVnode->vnodeStatistic.totalStorage; mError("dnode:%s not exist", taosIpStr(pObj->privateIp));
pVload->compStorage = pVnode->vnodeStatistic.compStorage; mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_DNODE_NOT_EXIST, NULL, 0);
pVload->pointsWritten = pVnode->vnodeStatistic.pointsWritten; return;
uint32_t vgId = pVnode->cfg.vgId; }
SVgObj *pVgroup = mgmtGetVgroup(vgId);
if (pVgroup == NULL) {
mError("vgroup:%d is not there, but associated with vnode %d", vgId, vnode);
pVload->dropStatus = TSDB_VN_DROP_STATUS_DROPPING;
continue;
}
SDbObj *pDb = mgmtGetDb(pVgroup->dbName); pObj->lastReboot = htonl(pStatus->lastReboot);
if (pDb == NULL) { pObj->numOfTotalVnodes = htons(pStatus->numOfTotalVnodes);
mError("vgroup:%d not belongs to any database, vnode:%d", vgId, vnode); pObj->openVnodes = htons(pStatus->openVnodes);
continue; pObj->numOfCores = htons(pStatus->numOfCores);
} pObj->diskAvailable = pStatus->diskAvailable;
pObj->alternativeRole = pStatus->alternativeRole;
//
// if (mgmtProcessDnodeStatusFp) {
// mgmtProcessDnodeStatusFp(pStatus, pObj, pConn);
// return;
// }
if (pVload->vgId == 0 || pVload->dropStatus == TSDB_VN_DROP_STATUS_DROPPING) { pObj->status = TSDB_DN_STATUS_READY;
mError("vid:%d, mgmt not exist, drop it", vnode);
pVload->dropStatus = TSDB_VN_DROP_STATUS_DROPPING;
}
}
taosTmrReset(mgmtProcessDnodeStatus, tsStatusInterval * 1000, NULL, tsMgmtTmr, &mgmtStatusTimer); // // wait vnode dropped
if (mgmtStatusTimer == NULL) { // for (int32_t vnode = 0; vnode < pObj->numOfVnodes; ++vnode) {
mError("Failed to start status timer"); // SVnodeLoad *pVload = &(pObj->vload[vnode]);
} // if (pVload->dropStatus == TSDB_VN_DROP_STATUS_DROPPING) {
*/ // bool existInDnode = false;
// for (int32_t j = 0; j < pObj->openVnodes; ++j) {
// if (htonl(pStatus->load[j].vnode) == vnode) {
// existInDnode = true;
// break;
// }
// }
//
// if (!existInDnode) {
// pVload->dropStatus = TSDB_VN_DROP_STATUS_READY;
// pVload->status = TSDB_VN_STATUS_OFFLINE;
// mgmtUpdateDnode(pObj);
// mPrint("dnode:%s, vid:%d, drop finished", taosIpStr(pObj->privateIp), vnode);
// taosTmrStart(mgmtMonitorDbDrop, 10000, NULL, tsMgmtTmr);
// }
// } else if (pVload->vgId == 0) {
// /*
// * In some cases, vnode information may be reported abnormally, recover it
// */
// if (pVload->dropStatus != TSDB_VN_DROP_STATUS_READY || pVload->status != TSDB_VN_STATUS_OFFLINE) {
// mPrint("dnode:%s, vid:%d, vgroup:%d status:%s dropStatus:%s, set it to avail status",
// taosIpStr(pObj->privateIp), vnode, pVload->vgId, taosGetVnodeStatusStr(pVload->status),
// taosGetVnodeDropStatusStr(pVload->dropStatus));
// pVload->dropStatus = TSDB_VN_DROP_STATUS_READY;
// pVload->status = TSDB_VN_STATUS_OFFLINE;
// mgmtUpdateDnode(pObj);
// }
// }
// }
} }
void (*mgmtProcessDnodeStatus)(void *handle, void *tmrId) = mgmtProcessDnodeStatusImp;
...@@ -18,32 +18,79 @@ ...@@ -18,32 +18,79 @@
#include "mnode.h" #include "mnode.h"
#include "mgmtAcct.h" #include "mgmtAcct.h"
#include "mgmtGrant.h" #include "mgmtGrant.h"
#include "mgmtUser.h"
int32_t mgmtCheckUserGrantImp() { return 0; } int32_t (*mgmtCheckUserGrantFp)() = NULL;
int32_t (*mgmtCheckUserGrant)() = mgmtCheckUserGrantImp; int32_t (*mgmtCheckDbGrantFp)() = NULL;
void (*mgmtAddTimeSeriesFp)(uint32_t timeSeriesNum) = NULL;
void (*mgmtRestoreTimeSeriesFp)(uint32_t timeSeriesNum) = NULL;
int32_t (*mgmtCheckTimeSeriesFp)(uint32_t timeseries) = NULL;
bool (*mgmtCheckExpiredFp)() = NULL;
int32_t (*mgmtGetGrantsMetaFp)(STableMeta *pMeta, SShowObj *pShow, void *pConn) = NULL;
int32_t (*mgmtRetrieveGrantsFp)(SShowObj *pShow, char *data, int rows, void *pConn) = NULL;
void (*mgmtUpdateGrantInfoFp)(void *pCont) = NULL;
int32_t mgmtCheckDbGrantImp() { return 0; } int32_t mgmtCheckUserGrant() {
int32_t (*mgmtCheckDbGrant)() = mgmtCheckDbGrantImp; if (mgmtCheckUserGrantFp) {
return mgmtCheckUserGrantFp();
} else {
return 0;
}
}
int32_t mgmtCheckDbGrant() {
if (mgmtCheckDbGrantFp) {
return mgmtCheckDbGrantFp();
} else {
return 0;
}
}
void mgmtAddTimeSeriesImp(SAcctObj *pAcct, uint32_t timeSeriesNum) { void mgmtAddTimeSeries(SAcctObj *pAcct, uint32_t timeSeriesNum) {
pAcct->acctInfo.numOfTimeSeries += timeSeriesNum; pAcct->acctInfo.numOfTimeSeries += timeSeriesNum;
if (mgmtAddTimeSeriesFp) {
mgmtAddTimeSeriesFp(timeSeriesNum);
}
} }
void (*mgmtAddTimeSeries)(SAcctObj *pAcct, uint32_t timeSeriesNum) = mgmtAddTimeSeriesImp;
void mgmtRestoreTimeSeriesImp(SAcctObj *pAcct, uint32_t timeSeriesNum) { void mgmtRestoreTimeSeries(SAcctObj *pAcct, uint32_t timeSeriesNum) {
pAcct->acctInfo.numOfTimeSeries -= timeSeriesNum; pAcct->acctInfo.numOfTimeSeries -= timeSeriesNum;
if (mgmtRestoreTimeSeriesFp) {
mgmtRestoreTimeSeriesFp(timeSeriesNum);
}
} }
void (*mgmtRestoreTimeSeries)(SAcctObj *pAcct, uint32_t timeSeriesNum) = mgmtRestoreTimeSeriesImp;
int32_t mgmtCheckTimeSeriesImp(uint32_t timeseries) { return 0; }
int32_t (*mgmtCheckTimeSeries)(uint32_t timeseries) = mgmtCheckTimeSeriesImp;
bool mgmtCheckExpiredImp() { return false; } int32_t mgmtCheckTimeSeries(uint32_t timeseries) {
bool (*mgmtCheckExpired)() = mgmtCheckExpiredImp; if (mgmtCheckTimeSeriesFp) {
return mgmtCheckTimeSeriesFp(timeseries);
} else {
return 0;
}
}
int32_t mgmtGetGrantsMetaImp(STableMeta *pMeta, SShowObj *pShow, void *pConn) { return TSDB_CODE_OPS_NOT_SUPPORT; } bool mgmtCheckExpired() {
int32_t (*mgmtGetGrantsMeta)(STableMeta *pMeta, SShowObj *pShow, void *pConn) = mgmtGetGrantsMetaImp; if (mgmtCheckExpiredFp) {
return mgmtCheckExpiredFp();
} else {
return false;
}
}
int32_t mgmtRetrieveGrantsImp(SShowObj *pShow, char *data, int rows, void *pConn) { return 0; } int32_t mgmtGetGrantsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
int32_t (*mgmtRetrieveGrants)(SShowObj *pShow, char *data, int rows, void *pConn) = mgmtRetrieveGrantsImp; if (mgmtGetGrantsMetaFp) {
SUserObj *pUser = mgmtGetUserFromConn(pConn);
if (pUser == NULL) return 0;
if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
return mgmtGetGrantsMetaFp(pMeta, pShow, pConn);
} else {
return TSDB_CODE_OPS_NOT_SUPPORT;
}
}
int32_t mgmtRetrieveGrants(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
if (mgmtRetrieveGrantsFp) {
return mgmtRetrieveGrantsFp(pShow, data, rows, pConn);
} else {
return 0;
}
}
...@@ -14,16 +14,134 @@ ...@@ -14,16 +14,134 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "tschemautil.h"
#include "mgmtMnode.h" #include "mgmtMnode.h"
#include "mgmtUser.h"
int32_t mgmtGetMnodeMetaImp(STableMeta *pMeta, SShowObj *pShow, void *pConn) { void *(*mgmtGetNextMnodeFp)(SShowObj *pShow, SSdbPeer **pMnode) = NULL;
return TSDB_CODE_OPS_NOT_SUPPORT; int32_t (*mgmtInitMnodesFp)() = NULL;
} int32_t (*mgmtGetMnodesNumFp)() = NULL;
static int32_t mgmtGetMnodesNum();
static void *mgmtGetNextMnode(SShowObj *pShow, SSdbPeer **pMnode);
int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0;
SUserObj *pUser = mgmtGetUserFromConn(pConn);
if (pUser == NULL) return 0;
if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
SSchema *pSchema = tsGetSchema(pMeta);
pShow->bytes[cols] = 16;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "IP");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "created time");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
int32_t (*mgmtGetMnodeMeta)(STableMeta *pMeta, SShowObj *pShow, void *pConn) = mgmtGetMnodeMetaImp; pShow->bytes[cols] = 10;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "status");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 10;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "role");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 16;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "public ip");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = mgmtGetMnodesNum();
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->pNode = NULL;
int32_t mgmtRetrieveMnodesImp(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
return 0; return 0;
} }
int32_t (*mgmtRetrieveMnodes)(SShowObj *pShow, char *data, int32_t rows, void *pConn) = mgmtRetrieveMnodesImp; int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
int32_t cols = 0;
SSdbPeer *pMnode = NULL;
char *pWrite;
char ipstr[20];
while (numOfRows < rows) {
pShow->pNode = mgmtGetNextMnode(pShow, (SDnodeObj **)&pMnode);
// pShow->pNode = sdbFetchRow(mnodeSdb, pShow->pNode, (void **)&pMnode);
// if (pMnode == NULL) break;
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, pMnode->ipstr);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pMnode->createdTime;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, sdbStatusStr[(uint8_t)pMnode->status]);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, sdbRoleStr[(uint8_t)pMnode->role]);
cols++;
tinet_ntoa(ipstr, pMnode->publicIp);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, ipstr);
cols++;
numOfRows++;
}
pShow->numOfReads += numOfRows;
return numOfRows;
}
static int32_t mgmtGetMnodesNum() {
if (mgmtGetMnodesNumFp) {
return mgmtGetMnodesNumFp();
} else {
return 1;
}
}
static void *mgmtGetNextMnode(SShowObj *pShow, SSdbPeer **pMnode) {
if (mgmtGetNextMnodeFp) {
return mgmtGetNextMnodeFp(pShow, pMnode);
} else {
if (*pMnode == NULL) {
*pMnode = NULL;
} else {
*pMnode = NULL;
}
}
return *pMnode;
}
\ No newline at end of file
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "taoserror.h"
#include "tlog.h" #include "tlog.h"
#include "trpc.h" #include "trpc.h"
#include "tstatus.h" #include "tstatus.h"
...@@ -51,6 +52,7 @@ static void (*mgmtProcessShellMsg[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contL ...@@ -51,6 +52,7 @@ static void (*mgmtProcessShellMsg[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contL
static void mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle); static void mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle);
static int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); static int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey);
uint32_t mgmtAccessSquence = 0;
void *tsShellConnServer = NULL; void *tsShellConnServer = NULL;
void mgmtProcessTranRequest(SSchedMsg *sched) { void mgmtProcessTranRequest(SSchedMsg *sched) {
...@@ -1047,38 +1049,6 @@ static void mgmtProcessMsgFromShell(char type, void *pCont, int contLen, void *a ...@@ -1047,38 +1049,6 @@ static void mgmtProcessMsgFromShell(char type, void *pCont, int contLen, void *a
// rpcFreeCont(pCont); // rpcFreeCont(pCont);
} }
void mgmtInitProcessShellMsg() {
mgmtProcessShellMsg[TSDB_MSG_TYPE_CONNECT] = mgmtProcessConnectMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_HEARTBEAT] = mgmtProcessHeartBeatMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_DB] = mgmtProcessCreateDbMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_ALTER_DB] = mgmtProcessAlterDbMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_DB] = mgmtProcessDropDbMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_USE_DB] = mgmtProcessUnSupportMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_USER] = mgmtProcessCreateUserMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_ALTER_USER] = mgmtProcessAlterUserMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_USER] = mgmtProcessDropUserMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_ACCT] = mgmtProcessCreateAcctMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_ACCT] = mgmtProcessDropAcctMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_ALTER_ACCT] = mgmtProcessAlterAcctMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_TABLE] = mgmtProcessCreateTableMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_TABLE] = mgmtProcessDropTableMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_ALTER_TABLE] = mgmtProcessAlterTableMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_DNODE] = mgmtProcessCreateDnodeMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_DNODE] = mgmtProcessDropDnodeMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_DNODE_CFG] = mgmtProcessCfgDnodeMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_MNODE] = mgmtProcessUnSupportMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_MNODE] = mgmtProcessDropMnodeMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_CFG_MNODE] = mgmtProcessCfgMnodeMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_QUERY] = mgmtProcessKillQueryMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_STREAM] = mgmtProcessKillStreamMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_CONNECTION] = mgmtProcessKillConnectionMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_SHOW] = mgmtProcessShowMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_RETRIEVE] = mgmtProcessRetrieveMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_TABLE_META] = mgmtProcessTableMetaMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_MULTI_TABLE_META] = mgmtProcessMultiTableMetaMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_STABLE_META] = mgmtProcessSuperTableMetaMsg;
}
void mgmtProcessCreateVgroup(SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) { void mgmtProcessCreateVgroup(SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) {
SDbObj *pDb = mgmtGetDb(pCreate->db); SDbObj *pDb = mgmtGetDb(pCreate->db);
if (pDb == NULL) { if (pDb == NULL) {
...@@ -1194,10 +1164,167 @@ static void mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle) ...@@ -1194,10 +1164,167 @@ static void mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle)
rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0); rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0);
} }
void (*mgmtProcessAlterAcctMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg;
void (*mgmtProcessCreateDnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; void (*mgmtProcessCreateDnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg;
void (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; void (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg;
void (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; void (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg;
void (*mgmtProcessDropDnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; void (*mgmtProcessDropDnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg;
void (*mgmtProcessDropAcctMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg;
void (*mgmtProcessCreateAcctMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; static void mgmtProcessAlterAcctMsg(void *pCont, int32_t contLen, void *ahandle) {
\ No newline at end of file if (!mgmtAlterAcctFp) {
rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0);
return;
}
SAlterAcctMsg *pAlter = pCont;
pAlter->cfg.maxUsers = htonl(pAlter->cfg.maxUsers);
pAlter->cfg.maxDbs = htonl(pAlter->cfg.maxDbs);
pAlter->cfg.maxTimeSeries = htonl(pAlter->cfg.maxTimeSeries);
pAlter->cfg.maxConnections = htonl(pAlter->cfg.maxConnections);
pAlter->cfg.maxStreams = htonl(pAlter->cfg.maxStreams);
pAlter->cfg.maxPointsPerSecond = htonl(pAlter->cfg.maxPointsPerSecond);
pAlter->cfg.maxStorage = htobe64(pAlter->cfg.maxStorage);
pAlter->cfg.maxQueryTime = htobe64(pAlter->cfg.maxQueryTime);
pAlter->cfg.maxInbound = htobe64(pAlter->cfg.maxInbound);
pAlter->cfg.maxOutbound = htobe64(pAlter->cfg.maxOutbound);
if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) {
mError("account:%s, failed to alter account, need redirect message", pAlter->user);
return;
}
SUserObj *pUser = mgmtGetUserFromConn(ahandle);
if (pUser == NULL) {
mError("account:%s, failed to alter account, invalid user", pAlter->user);
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
return;
}
if (strcmp(pUser->user, "root") != 0) {
mError("account:%s, failed to alter account, no rights", pAlter->user);
rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0);
return;
}
int32_t code = mgmtAlterAcctFp(pAlter->user, pAlter->pass, &(pAlter->cfg));;
if (code == TSDB_CODE_SUCCESS) {
mLPrint("account:%s is altered by %s", pAlter->user, pUser->user);
} else {
mError("account:%s, failed to alter account, reason:%s", pAlter->user, tstrerror(code));
}
rpcSendResponse(ahandle, code, NULL, 0);
}
static void mgmtProcessDropAcctMsg(void *pCont, int32_t contLen, void *ahandle) {
if (!mgmtDropAcctFp) {
rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0);
return;
}
SDropAcctMsg *pDrop = (SDropAcctMsg *) pCont;
if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) {
mError("account:%s, failed to drop account, need redirect message", pDrop->user);
return;
}
SUserObj *pUser = mgmtGetUserFromConn(ahandle);
if (pUser == NULL) {
mError("account:%s, failed to drop account, invalid user", pDrop->user);
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
return;
}
if (strcmp(pUser->user, "root") != 0) {
mError("account:%s, failed to drop account, no rights", pDrop->user);
rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0);
return;
}
int32_t code = mgmtDropAcctFp(pDrop->user);
if (code == TSDB_CODE_SUCCESS) {
mLPrint("account:%s is dropped by %s", pDrop->user, pUser->user);
} else {
mError("account:%s, failed to drop account, reason:%s", pDrop->user, tstrerror(code));
}
rpcSendResponse(ahandle, code, NULL, 0);
}
static void mgmtProcessCreateAcctMsg(void *pCont, int32_t contLen, void *ahandle) {
if (!mgmtCreateAcctFp) {
rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0);
return;
}
SCreateAcctMsg *pCreate = (SCreateAcctMsg *) pCont;
pCreate->cfg.maxUsers = htonl(pCreate->cfg.maxUsers);
pCreate->cfg.maxDbs = htonl(pCreate->cfg.maxDbs);
pCreate->cfg.maxTimeSeries = htonl(pCreate->cfg.maxTimeSeries);
pCreate->cfg.maxConnections = htonl(pCreate->cfg.maxConnections);
pCreate->cfg.maxStreams = htonl(pCreate->cfg.maxStreams);
pCreate->cfg.maxPointsPerSecond = htonl(pCreate->cfg.maxPointsPerSecond);
pCreate->cfg.maxStorage = htobe64(pCreate->cfg.maxStorage);
pCreate->cfg.maxQueryTime = htobe64(pCreate->cfg.maxQueryTime);
pCreate->cfg.maxInbound = htobe64(pCreate->cfg.maxInbound);
pCreate->cfg.maxOutbound = htobe64(pCreate->cfg.maxOutbound);
if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) {
mError("account:%s, failed to create account, need redirect message", pCreate->user);
return;
}
SUserObj *pUser = mgmtGetUserFromConn(ahandle);
if (pUser == NULL) {
mError("account:%s, failed to create account, invalid user", pCreate->user);
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
return;
}
if (strcmp(pUser->user, "root") != 0) {
mError("account:%s, failed to create account, no rights", pCreate->user);
rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0);
return;
}
int32_t code = mgmtCreateAcctFp(pCreate->user, pCreate->pass, &(pCreate->cfg));
if (code == TSDB_CODE_SUCCESS) {
mLPrint("account:%s is created by %s", pCreate->user, pUser->user);
} else {
mError("account:%s, failed to create account, reason:%s", pCreate->user, tstrerror(code));
}
rpcSendResponse(ahandle, code, NULL, 0);
}
void mgmtInitProcessShellMsg() {
mgmtProcessShellMsg[TSDB_MSG_TYPE_CONNECT] = mgmtProcessConnectMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_HEARTBEAT] = mgmtProcessHeartBeatMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_DB] = mgmtProcessCreateDbMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_ALTER_DB] = mgmtProcessAlterDbMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_DB] = mgmtProcessDropDbMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_USE_DB] = mgmtProcessUnSupportMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_USER] = mgmtProcessCreateUserMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_ALTER_USER] = mgmtProcessAlterUserMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_USER] = mgmtProcessDropUserMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_ACCT] = mgmtProcessCreateAcctMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_ACCT] = mgmtProcessDropAcctMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_ALTER_ACCT] = mgmtProcessAlterAcctMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_TABLE] = mgmtProcessCreateTableMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_TABLE] = mgmtProcessDropTableMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_ALTER_TABLE] = mgmtProcessAlterTableMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_DNODE] = mgmtProcessCreateDnodeMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_DNODE] = mgmtProcessDropDnodeMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_DNODE_CFG] = mgmtProcessCfgDnodeMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_MNODE] = mgmtProcessUnSupportMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_MNODE] = mgmtProcessDropMnodeMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_CFG_MNODE] = mgmtProcessCfgMnodeMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_QUERY] = mgmtProcessKillQueryMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_STREAM] = mgmtProcessKillStreamMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_CONNECTION] = mgmtProcessKillConnectionMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_SHOW] = mgmtProcessShowMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_RETRIEVE] = mgmtProcessRetrieveMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_TABLE_META] = mgmtProcessTableMetaMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_MULTI_TABLE_META] = mgmtProcessMultiTableMetaMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_STABLE_META] = mgmtProcessSuperTableMetaMsg;
}
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taosdef.h" #include "taosdef.h"
#include "tmodule.h"
#include "tsched.h" #include "tsched.h"
#include "mnode.h" #include "mnode.h"
#include "mgmtAcct.h" #include "mgmtAcct.h"
...@@ -32,13 +33,11 @@ ...@@ -32,13 +33,11 @@
char tsMgmtDirectory[128] = {0}; char tsMgmtDirectory[128] = {0};
void *tsMgmtTmr = NULL; void *tsMgmtTmr = NULL;
void *tsMgmtTranQhandle = NULL; void *tsMgmtTranQhandle = NULL;
void *tsMgmtStatisTimer = NULL;
void mgmtCleanUpSystem() { void mgmtCleanUpSystem() {
mPrint("starting to clean up mgmt"); mPrint("starting to clean up mgmt");
taosTmrStopA(&tsMgmtStatisTimer);
mgmtCleanUpRedirect();
sdbCleanUpPeers(); sdbCleanUpPeers();
mgmtCleanupBalance(); mgmtCleanupBalance();
mgmtCleanUpDnodeInt(); mgmtCleanUpDnodeInt();
...@@ -55,6 +54,20 @@ void mgmtCleanUpSystem() { ...@@ -55,6 +54,20 @@ void mgmtCleanUpSystem() {
mPrint("mgmt is cleaned up"); mPrint("mgmt is cleaned up");
} }
int32_t mgmtCheckMgmtRunning() {
if (tsModuleStatus & (1 << TSDB_MOD_MGMT)) {
return -1;
}
tsetModuleStatus(TSDB_MOD_MGMT);
// strcpy(sdbMasterIp, mgmtIpStr[0]);
// strcpy(sdbPrivateIp, tsPrivateIp);
// sdbPublicIp = inet_addr(tsPublicIp);
return 0;
}
int32_t mgmtStartSystem() { int32_t mgmtStartSystem() {
mPrint("starting to initialize TDengine mgmt ..."); mPrint("starting to initialize TDengine mgmt ...");
...@@ -111,10 +124,10 @@ int32_t mgmtStartSystem() { ...@@ -111,10 +124,10 @@ int32_t mgmtStartSystem() {
return -1; return -1;
} }
if (mgmtInitShell() < 0) { // if (mgmtInitShell() < 0) {
mError("failed to init shell"); // mError("failed to init shell");
return -1; // return -1;
} // }
if (sdbInitPeers(tsMgmtDirectory) < 0) { if (sdbInitPeers(tsMgmtDirectory) < 0) {
mError("failed to init peers"); mError("failed to init peers");
...@@ -125,39 +138,38 @@ int32_t mgmtStartSystem() { ...@@ -125,39 +138,38 @@ int32_t mgmtStartSystem() {
mError("failed to init dnode balance") mError("failed to init dnode balance")
} }
taosTmrReset(mgmtDoStatistic, tsStatusInterval * 30000, NULL, tsMgmtTmr, &tsMgmtStatisTimer);
mPrint("TDengine mgmt is initialized successfully"); mPrint("TDengine mgmt is initialized successfully");
return 0; return 0;
} }
int32_t mgmtInitSystemImp() { int32_t mgmtInitSystem() {
int32_t code = mgmtStartSystem(); struct stat dirstat;
if (code != 0) { bool directoryExist = (stat(tsMgmtDirectory, &dirstat) == 0);
return code; bool equalWithMaster = (strcmp(tsMasterIp, tsPrivateIp) == 0);
}
taosTmrReset(mgmtProcessDnodeStatus, 500, NULL, tsMgmtTmr, &mgmtStatusTimer); if (equalWithMaster || directoryExist) {
return code; if (mgmtStartSystem() != 0) {
} return -1;
}
}
int32_t (*mgmtInitSystem)() = mgmtInitSystemImp; if (mgmtInitShell() < 0) {
mError("failed to init shell");
return -1;
}
int32_t mgmtCheckMgmtRunningImp() {
return 0; return 0;
} }
int32_t (*mgmtCheckMgmtRunning)() = mgmtCheckMgmtRunningImp; void mgmtStopSystem() {
if (sdbMaster) {
void mgmtDoStatisticImp(void *handle, void *tmrId) {} mTrace("it is a master mgmt node, it could not be stopped");
return;
void (*mgmtDoStatistic)(void *handle, void *tmrId) = mgmtDoStatisticImp; }
void mgmtStopSystemImp() {}
void (*mgmtStopSystem)() = mgmtStopSystemImp;
void mgmtCleanUpRedirectImp() {}
void (*mgmtCleanUpRedirect)() = mgmtCleanUpRedirectImp; mgmtCleanUpSystem();
remove(tsMgmtDirectory);
// mgmtInitRedirect();
}
...@@ -585,6 +585,7 @@ void tsPrintOsInfo() { ...@@ -585,6 +585,7 @@ void tsPrintOsInfo() {
pPrint(" os release: %s", buf.release); pPrint(" os release: %s", buf.release);
pPrint(" os version: %s", buf.version); pPrint(" os version: %s", buf.version);
pPrint(" os machine: %s", buf.machine); pPrint(" os machine: %s", buf.machine);
pPrint("==================================");
} }
void taosKillSystem() { void taosKillSystem() {
......
...@@ -48,7 +48,7 @@ typedef struct { ...@@ -48,7 +48,7 @@ typedef struct {
char spi:3; // security parameter index char spi:3; // security parameter index
char encrypt:3; // encrypt algorithm, 0: no encryption char encrypt:3; // encrypt algorithm, 0: no encryption
uint16_t tranId; // transcation ID uint16_t tranId; // transcation ID
uint32_t uid; // for unique ID inside a client uint32_t linkUid; // for unique connection ID assigned by client
uint32_t sourceId; // source ID, an index for connection list uint32_t sourceId; // source ID, an index for connection list
uint32_t destId; // destination ID, an index for connection list uint32_t destId; // destination ID, an index for connection list
uint32_t destIp; // destination IP address, for NAT scenario uint32_t destIp; // destination IP address, for NAT scenario
......
...@@ -94,8 +94,9 @@ typedef struct _RpcConn { ...@@ -94,8 +94,9 @@ typedef struct _RpcConn {
char encrypt; // encryption, 0:1 char encrypt; // encryption, 0:1
char secret[TSDB_KEY_LEN]; // secret for the link char secret[TSDB_KEY_LEN]; // secret for the link
char ckey[TSDB_KEY_LEN]; // ciphering key char ckey[TSDB_KEY_LEN]; // ciphering key
char secured; // if set to 1, no authentication
uint16_t localPort; // for UDP only uint16_t localPort; // for UDP only
uint32_t peerUid; // peer UID uint32_t linkUid; // connection unique ID assigned by client
uint32_t peerIp; // peer IP uint32_t peerIp; // peer IP
uint32_t destIp; // server destination IP to handle NAT uint32_t destIp; // server destination IP to handle NAT
uint16_t peerPort; // peer port uint16_t peerPort; // peer port
...@@ -264,7 +265,7 @@ void *rpcOpen(SRpcInit *pInit) { ...@@ -264,7 +265,7 @@ void *rpcOpen(SRpcInit *pInit) {
return NULL; return NULL;
} }
} else { } else {
pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, tsShellActivityTimer*1000); pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, pRpc->idleTime);
if ( pRpc->pCache == NULL ) { if ( pRpc->pCache == NULL ) {
tError("%s failed to init connection cache", pRpc->label); tError("%s failed to init connection cache", pRpc->label);
rpcClose(pRpc); rpcClose(pRpc);
...@@ -399,10 +400,9 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { ...@@ -399,10 +400,9 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
pHead->tranId = pConn->inTranId; pHead->tranId = pConn->inTranId;
pHead->sourceId = pConn->ownId; pHead->sourceId = pConn->ownId;
pHead->destId = pConn->peerId; pHead->destId = pConn->peerId;
pHead->uid = 0; pHead->linkUid = pConn->linkUid;
pHead->port = htons(pConn->localPort); pHead->port = htons(pConn->localPort);
pHead->code = htonl(code); pHead->code = htonl(code);
memcpy(pHead->user, pConn->user, tListLen(pHead->user));
// set pConn parameters // set pConn parameters
pConn->inType = 0; pConn->inType = 0;
...@@ -417,6 +417,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { ...@@ -417,6 +417,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
taosTmrStopA(&pConn->pTimer); taosTmrStopA(&pConn->pTimer);
rpcSendMsgToPeer(pConn, msg, msgLen); rpcSendMsgToPeer(pConn, msg, msgLen);
pConn->secured = 1; // connection shall be secured
return; return;
} }
...@@ -499,7 +500,7 @@ static void rpcCloseConn(void *thandle) { ...@@ -499,7 +500,7 @@ static void rpcCloseConn(void *thandle) {
if ( pRpc->connType == TAOS_CONN_SERVER) { if ( pRpc->connType == TAOS_CONN_SERVER) {
char hashstr[40] = {0}; char hashstr[40] = {0};
sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->peerUid, pConn->peerId, pConn->connType); sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, pConn->connType);
taosDeleteStrHash(pRpc->hash, hashstr); taosDeleteStrHash(pRpc->hash, hashstr);
rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg
pConn->pRspMsg = NULL; pConn->pRspMsg = NULL;
...@@ -535,6 +536,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) { ...@@ -535,6 +536,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) {
pConn->sid = sid; pConn->sid = sid;
pConn->tranId = (uint16_t)(rand() & 0xFFFF); pConn->tranId = (uint16_t)(rand() & 0xFFFF);
pConn->ownId = htonl(pConn->sid); pConn->ownId = htonl(pConn->sid);
pConn->linkUid = (uint32_t)((int64_t)pConn + (int64_t)getpid());
pConn->spi = pRpc->spi; pConn->spi = pRpc->spi;
pConn->encrypt = pRpc->encrypt; pConn->encrypt = pRpc->encrypt;
if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_KEY_LEN); if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_KEY_LEN);
...@@ -548,7 +550,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -548,7 +550,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
char hashstr[40] = {0}; char hashstr[40] = {0};
SRpcHead *pHead = (SRpcHead *)pRecv->msg; SRpcHead *pHead = (SRpcHead *)pRecv->msg;
sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->uid, pHead->sourceId, pRecv->connType); sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType);
// check if it is already allocated // check if it is already allocated
SRpcConn **ppConn = (SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr)); SRpcConn **ppConn = (SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr));
...@@ -567,6 +569,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -567,6 +569,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
pConn->sid = sid; pConn->sid = sid;
pConn->tranId = (uint16_t)(rand() & 0xFFFF); pConn->tranId = (uint16_t)(rand() & 0xFFFF);
pConn->ownId = htonl(pConn->sid); pConn->ownId = htonl(pConn->sid);
pConn->linkUid = pHead->linkUid;
if (pRpc->afp && (*pRpc->afp)(pConn->user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey) < 0) { if (pRpc->afp && (*pRpc->afp)(pConn->user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey) < 0) {
tWarn("%s %p, user not there", pRpc->label, pConn); tWarn("%s %p, user not there", pRpc->label, pConn);
taosFreeId(pRpc->idPool, sid); // sid shall be released taosFreeId(pRpc->idPool, sid); // sid shall be released
...@@ -601,8 +604,8 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) { ...@@ -601,8 +604,8 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) {
} }
if (pConn) { if (pConn) {
if (memcmp(pConn->user, pHead->user, tListLen(pConn->user)) != 0) { if (pConn->linkUid != pHead->linkUid) {
tTrace("%s %p, user:%s is not matched, received:%s", pRpc->label, pConn, pConn->user, pHead->user); tTrace("%s %p, linkUid:0x%x not matched, received:0x%x", pRpc->label, pConn, pConn->linkUid, pHead->linkUid);
terrno = TSDB_CODE_MISMATCHED_METER_ID; terrno = TSDB_CODE_MISMATCHED_METER_ID;
pConn = NULL; pConn = NULL;
} }
...@@ -748,7 +751,6 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -748,7 +751,6 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
if (pRecv->port) pConn->peerPort = pRecv->port; if (pRecv->port) pConn->peerPort = pRecv->port;
if (pHead->port) pConn->peerPort = htons(pHead->port); if (pHead->port) pConn->peerPort = htons(pHead->port);
if (pHead->uid) pConn->peerUid = pHead->uid;
terrno = rpcCheckAuthentication(pConn, (char *)pHead, pRecv->msgLen); terrno = rpcCheckAuthentication(pConn, (char *)pHead, pRecv->msgLen);
...@@ -813,7 +815,8 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { ...@@ -813,7 +815,8 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->port); pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->port);
} }
if (pConn && pRpc->idleTime) { if (pRpc->connType == TAOS_CONN_SERVER && pConn && pRpc->idleTime) {
// only for server, starts the idle timer. For client, it is started by cache mgmt
taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
} }
...@@ -881,7 +884,7 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) { ...@@ -881,7 +884,7 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) {
pHead->tranId = pConn->inTranId; pHead->tranId = pConn->inTranId;
pHead->sourceId = pConn->ownId; pHead->sourceId = pConn->ownId;
pHead->destId = pConn->peerId; pHead->destId = pConn->peerId;
pHead->uid = 0; pHead->linkUid = pConn->linkUid;
memcpy(pHead->user, pConn->user, tListLen(pHead->user)); memcpy(pHead->user, pConn->user, tListLen(pHead->user));
pHead->code = htonl(code); pHead->code = htonl(code);
...@@ -905,7 +908,7 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) { ...@@ -905,7 +908,7 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) {
pReplyHead->tranId = pRecvHead->tranId; pReplyHead->tranId = pRecvHead->tranId;
pReplyHead->sourceId = pRecvHead->destId; pReplyHead->sourceId = pRecvHead->destId;
pReplyHead->destId = pRecvHead->sourceId; pReplyHead->destId = pRecvHead->sourceId;
memcpy(pReplyHead->user, pRecvHead->user, tListLen(pReplyHead->user)); pReplyHead->linkUid = pRecvHead->linkUid;
pReplyHead->code = htonl(code); pReplyHead->code = htonl(code);
msgLen = sizeof(SRpcHead); msgLen = sizeof(SRpcHead);
...@@ -951,8 +954,8 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { ...@@ -951,8 +954,8 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
pHead->destId = pConn->peerId; pHead->destId = pConn->peerId;
pHead->destIp = pConn->destIp; pHead->destIp = pConn->destIp;
pHead->port = 0; pHead->port = 0;
pHead->uid = (uint32_t)((int64_t)pConn + (int64_t)getpid()); pHead->linkUid = pConn->linkUid;
memcpy(pHead->user, pConn->user, tListLen(pHead->user)); if (!pConn->secured) memcpy(pHead->user, pConn->user, tListLen(pHead->user));
// set the connection parameters // set the connection parameters
pConn->outType = msgType; pConn->outType = msgType;
...@@ -1026,8 +1029,8 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { ...@@ -1026,8 +1029,8 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
pConn->retry++; pConn->retry++;
if (pConn->retry < 4) { if (pConn->retry < 4) {
tTrace("%s %p, re-send msg:%s to %s:%hu retry:%d", pRpc->label, pConn, tTrace("%s %p, re-send msg:%s to %s:%hud", pRpc->label, pConn,
taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort, pConn->retry); taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort);
rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
} else { } else {
...@@ -1179,7 +1182,7 @@ static void rpcBuildAuthHead(void *pMsg, int msgLen, void *pAuth, void *pKey) { ...@@ -1179,7 +1182,7 @@ static void rpcBuildAuthHead(void *pMsg, int msgLen, void *pAuth, void *pKey) {
static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) { static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) {
SRpcHead *pHead = (SRpcHead *)msg; SRpcHead *pHead = (SRpcHead *)msg;
if (pConn->spi) { if (pConn->spi && pConn->secured == 0) {
// add auth part // add auth part
pHead->spi = pConn->spi; pHead->spi = pConn->spi;
SRpcDigest *pDigest = (SRpcDigest *)(msg + msgLen); SRpcDigest *pDigest = (SRpcDigest *)(msg + msgLen);
...@@ -1188,6 +1191,7 @@ static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) { ...@@ -1188,6 +1191,7 @@ static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) {
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
rpcBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret); rpcBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
} else { } else {
pHead->spi = 0;
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
} }
...@@ -1197,9 +1201,10 @@ static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) { ...@@ -1197,9 +1201,10 @@ static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) {
static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
SRpcHead *pHead = (SRpcHead *)msg; SRpcHead *pHead = (SRpcHead *)msg;
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
int32_t code = 0; int code = 0;
if (pConn->spi == 0) { if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)){
// secured link, or no authentication
pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
return 0; return 0;
} }
...@@ -1214,7 +1219,6 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { ...@@ -1214,7 +1219,6 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
} }
code = 0; code = 0;
if (pHead->spi == pConn->spi) { if (pHead->spi == pConn->spi) {
// authentication // authentication
SRpcDigest *pDigest = (SRpcDigest *)((char *)pHead + msgLen - sizeof(SRpcDigest)); SRpcDigest *pDigest = (SRpcDigest *)((char *)pHead + msgLen - sizeof(SRpcDigest));
...@@ -1231,6 +1235,8 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { ...@@ -1231,6 +1235,8 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
code = TSDB_CODE_AUTH_FAILURE; code = TSDB_CODE_AUTH_FAILURE;
} else { } else {
pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest); pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest);
if ( !rpcIsReq(pHead->msgType) ) pConn->secured = 1; // link is secured for client
tTrace("%s %p, message is authenticated", pRpc->label, pConn);
} }
} }
} else { } else {
...@@ -1243,7 +1249,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { ...@@ -1243,7 +1249,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
static void rpcLockConn(SRpcConn *pConn) { static void rpcLockConn(SRpcConn *pConn) {
int64_t tid = taosGetPthreadId(); int64_t tid = taosGetPthreadId();
int i = 0; int i = 0;
while (atomic_val_compare_exchange_64(&(pConn->lockedBy), 0, tid) != 0) { while (atomic_val_compare_exchange_64(&(pConn->lockedBy), 0, tid) != 0) {
if (++i % 1000 == 0) { if (++i % 1000 == 0) {
sched_yield(); sched_yield();
......
...@@ -106,11 +106,12 @@ int main(int argc, char *argv[]) { ...@@ -106,11 +106,12 @@ int main(int argc, char *argv[]) {
rpcInit.cfp = processResponse; rpcInit.cfp = processResponse;
rpcInit.ufp = processUpdateIpSet; rpcInit.ufp = processUpdateIpSet;
rpcInit.sessions = 100; rpcInit.sessions = 100;
rpcInit.idleTime = 2000; rpcInit.idleTime = tsShellActivityTimer*1000;
rpcInit.user = "michael"; rpcInit.user = "michael";
rpcInit.secret = "mypassword"; rpcInit.secret = "mypassword";
rpcInit.ckey = "key"; rpcInit.ckey = "key";
rpcInit.spi = 1; rpcInit.spi = 1;
rpcInit.connType = TAOS_CONN_CLIENT;
for (int i=1; i<argc; ++i) { for (int i=1; i<argc; ++i) {
if (strcmp(argv[i], "-p")==0 && i < argc-1) { if (strcmp(argv[i], "-p")==0 && i < argc-1) {
...@@ -159,8 +160,8 @@ int main(int argc, char *argv[]) { ...@@ -159,8 +160,8 @@ int main(int argc, char *argv[]) {
} }
} }
rpcInit.connType = TAOS_CONN_CLIENT;
taosInitLog("client.log", 100000, 10); taosInitLog("client.log", 100000, 10);
tPrint("rpcDebugFlag:%d", rpcDebugFlag);
void *pRpc = rpcOpen(&rpcInit); void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) { if (pRpc == NULL) {
...@@ -200,7 +201,7 @@ int main(int argc, char *argv[]) { ...@@ -200,7 +201,7 @@ int main(int argc, char *argv[]) {
tPrint("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs*appThreads); tPrint("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs*appThreads);
tPrint("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize); tPrint("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize);
taosCloseLog(); taosCloseLogger();
return 0; return 0;
} }
......
...@@ -110,7 +110,7 @@ int main(int argc, char *argv[]) { ...@@ -110,7 +110,7 @@ int main(int argc, char *argv[]) {
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = processRequestMsg; rpcInit.cfp = processRequestMsg;
rpcInit.sessions = 1000; rpcInit.sessions = 1000;
rpcInit.idleTime = 2000; rpcInit.idleTime = tsShellActivityTimer*1500;
rpcInit.afp = retrieveAuthInfo; rpcInit.afp = retrieveAuthInfo;
for (int i=1; i<argc; ++i) { for (int i=1; i<argc; ++i) {
......
...@@ -193,7 +193,6 @@ extern char tsCharset[64]; // default encode string ...@@ -193,7 +193,6 @@ extern char tsCharset[64]; // default encode string
// //
void tsReadGlobalLogConfig(); void tsReadGlobalLogConfig();
bool tsReadGlobalConfig(); bool tsReadGlobalConfig();
bool tsReadGlobalConfigSpec();
int tsCfgDynamicOptions(char *msg); int tsCfgDynamicOptions(char *msg);
void tsPrintGlobalConfig(); void tsPrintGlobalConfig();
void tsPrintGlobalConfigSpec(); void tsPrintGlobalConfigSpec();
......
...@@ -24,6 +24,9 @@ ...@@ -24,6 +24,9 @@
#include "tsystem.h" #include "tsystem.h"
#include "tutil.h" #include "tutil.h"
void (*tsReadStorageConfig)() = NULL;
void (*tsPrintStorageConfig)() = NULL;
// monitor module api // monitor module api
int (*startMonitor)() = NULL; int (*startMonitor)() = NULL;
void (*stopMonitor)() = NULL; void (*stopMonitor)() = NULL;
...@@ -942,7 +945,9 @@ bool tsReadGlobalConfig() { ...@@ -942,7 +945,9 @@ bool tsReadGlobalConfig() {
fclose(fp); fclose(fp);
} }
tsReadGlobalConfigSpec(); if (tsReadStorageConfig) {
tsReadStorageConfig();
}
if (tsPrivateIp[0] == 0) { if (tsPrivateIp[0] == 0) {
taosGetPrivateIp(tsPrivateIp); taosGetPrivateIp(tsPrivateIp);
...@@ -1111,11 +1116,13 @@ void tsPrintGlobalConfig() { ...@@ -1111,11 +1116,13 @@ void tsPrintGlobalConfig() {
} }
} }
tsPrintGlobalConfigSpec(); if (tsPrintStorageConfig) {
tsPrintStorageConfig();
} else {
pPrint(" dataDir: %s", dataDir);
}
tsPrintOsInfo(); tsPrintOsInfo();
pPrint("==================================");
} }
void tsSetAllDebugFlag() { void tsSetAllDebugFlag() {
...@@ -1206,12 +1213,3 @@ void tsSetTimeZone() { ...@@ -1206,12 +1213,3 @@ void tsSetTimeZone() {
pPrint("timezone format changed to %s", tsTimezone); pPrint("timezone format changed to %s", tsTimezone);
} }
#ifndef CLUSTER
bool tsReadGlobalConfigSpec() { return true; }
void tsPrintGlobalConfigSpec() {
pPrint(" dataDir: %s", dataDir);
}
#endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册