提交 79341c73 编写于 作者: S slguan

reorgnize mgmtDnode.c

上级 46641bad
......@@ -24,34 +24,6 @@ extern "C" {
#include <stdbool.h>
#include <pthread.h>
#define tsetModuleStatus(mod) \
{ tsModuleStatus |= (1 << mod); }
#define tclearModuleStatus(mod) \
{ tsModuleStatus &= ~(1 << mod); }
enum _module {
TSDB_MOD_MGMT,
TSDB_MOD_HTTP,
TSDB_MOD_MONITOR,
TSDB_MOD_DCLUSTER,
TSDB_MOD_MSTORAGE,
TSDB_MOD_MAX
};
typedef struct {
char *name;
int (*initFp)();
void (*cleanUpFp)();
int (*startFp)();
void (*stopFp)();
int num;
int curNum;
int equalVnodeNum;
} SModule;
extern uint32_t tsModuleStatus;
extern SModule tsModule[];
void dnodeAllocModules();
int32_t dnodeInitModules();
void dnodeCleanUpModules();
......
......@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "tlog.h"
#include "tmodule.h"
#include "tglobalcfg.h"
#include "mnode.h"
#include "http.h"
......@@ -23,9 +24,6 @@
#include "dnodeModule.h"
#include "dnodeSystem.h"
SModule tsModule[TSDB_MOD_MAX] = {0};
uint32_t tsModuleStatus = 0;
void dnodeAllocModules() {
tsModule[TSDB_MOD_MGMT].name = "mgmt";
tsModule[TSDB_MOD_MGMT].initFp = mgmtInitSystem;
......
......@@ -19,6 +19,7 @@
#include "taoserror.h"
#include "tcrc32c.h"
#include "tlog.h"
#include "tmodule.h"
#include "tsched.h"
#include "ttime.h"
#include "ttimer.h"
......
......@@ -23,25 +23,24 @@ extern "C" {
#include "mnode.h"
void mgmtMonitorDbDrop(void *unused, void *unusedt);
int mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter);
int mgmtUseDb(SConnObj *pConn, char *name);
int mgmtAddVgroupIntoDb(SDbObj *pDb, SVgObj *pVgroup);
int mgmtAddVgroupIntoDbTail(SDbObj *pDb, SVgObj *pVgroup);
int mgmtRemoveVgroupFromDb(SDbObj *pDb, SVgObj *pVgroup);
int mgmtMoveVgroupToTail(SDbObj *pDb, SVgObj *pVgroup);
int mgmtMoveVgroupToHead(SDbObj *pDb, SVgObj *pVgroup);
int mgmtGetDbMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
int mgmtRetrieveDbs(SShowObj *pShow, char *data, int rows, SConnObj *pConn);
int32_t mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter);
int32_t mgmtUseDb(SConnObj *pConn, char *name);
int32_t mgmtAddVgroupIntoDb(SDbObj *pDb, SVgObj *pVgroup);
int32_t mgmtAddVgroupIntoDbTail(SDbObj *pDb, SVgObj *pVgroup);
int32_t mgmtRemoveVgroupFromDb(SDbObj *pDb, SVgObj *pVgroup);
int32_t mgmtMoveVgroupToTail(SDbObj *pDb, SVgObj *pVgroup);
int32_t mgmtMoveVgroupToHead(SDbObj *pDb, SVgObj *pVgroup);
int32_t mgmtGetDbMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn);
void mgmtCleanUpDbs();
int32_t mgmtInitDbs();
int mgmtUpdateDb(SDbObj *pDb);
int32_t mgmtUpdateDb(SDbObj *pDb);
SDbObj *mgmtGetDb(char *db);
SDbObj *mgmtGetDbByMeterId(char *db);
int mgmtCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate);
int mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists);
int mgmtDropDb(SDbObj *pDb);
SDbObj *mgmtGetDbByTableId(char *db);
int32_t mgmtCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate);
int32_t mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists);
int32_t mgmtDropDb(SDbObj *pDb);
#ifdef __cplusplus
}
......
......@@ -28,21 +28,21 @@ int32_t mgmtCreateDnode(uint32_t ip);
int32_t mgmtDropDnode(SDnodeObj *pDnode);
int32_t mgmtDropDnodeByIp(uint32_t ip);
int32_t mgmtGetNextVnode(SVnodeGid *pVnodeGid);
void mgmtSetDnodeVgid(SVnodeGid vnodeGid[], int numOfVnodes, int vgId);
void mgmtUnSetDnodeVgid(SVnodeGid vnodeGid[], int numOfVnodes);
void mgmtSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes, int32_t vgId);
void mgmtUnSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes);
int32_t mgmtGetDnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int rows, SConnObj *pConn);
int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn);
int32_t mgmtSendCfgDnodeMsg(char *cont);
void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode);
int32_t mgmtGetConfigMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int rows, SConnObj *pConn);
int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn);
int32_t mgmtGetModuleMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int rows, SConnObj *pConn);
int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn);
int32_t mgmtGetVnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int rows, SConnObj *pConn);
int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn);
extern int32_t (*mgmtInitDnodes)();
extern void (*mgmtCleanUpDnodes)();
......@@ -52,10 +52,10 @@ extern void* (*mgmtGetNextDnode)(SShowObj *pShow, SDnodeObj **pDnode);
extern int32_t (*mgmtUpdateDnode)(SDnodeObj *pDnode);
extern void (*mgmtSetDnodeUnRemove)(SDnodeObj *pDnode);
extern int32_t (*mgmtGetScoresMeta)(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
extern int32_t (*mgmtRetrieveScores)(SShowObj *pShow, char *data, int rows, SConnObj *pConn);
extern int32_t (*mgmtRetrieveScores)(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn);
extern bool (*mgmtCheckConfigShow)(SGlobalConfig *cfg);
extern SDnodeObj dnodeObj;
extern SDnodeObj tsDnodeObj;
#ifdef __cplusplus
}
......
......@@ -24,16 +24,15 @@ extern "C" {
#include <stdbool.h>
#include "mnode.h"
int mgmtInitVgroups();
SVgObj *mgmtGetVgroup(int vgId);
int32_t mgmtInitVgroups();
SVgObj *mgmtGetVgroup(int32_t vgId);
SVgObj *mgmtCreateVgroup(SDbObj *pDb);
int mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup);
int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup);
void mgmtSetVgroupIdPool();
int mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
int mgmtRetrieveVgroups(SShowObj *pShow, char *data, int rows, SConnObj *pConn);
int32_t mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn);
void mgmtCleanUpVgroups();
SVgObj *mgmtGetAvailVgroup(SDbObj *pDb);
int32_t mgmtAllocateSid(SDbObj *pDb, SVgObj *pVgroup);
......
......@@ -24,7 +24,7 @@
extern void *tsUserSdb;
extern void *tsDbSdb;
SAcctObj acctObj;
static SAcctObj tsAcctObj;
int32_t mgmtAddDbIntoAcct(SAcctObj *pAcct, SDbObj *pDb) {
pthread_mutex_lock(&pAcct->mutex);
......@@ -103,7 +103,7 @@ int32_t mgmtInitAcctsImp() {
int32_t (*mgmtInitAccts)() = mgmtInitAcctsImp;
SAcctObj *mgmtGetAcctImp(char *acctName) {
return &acctObj;
return &tsAcctObj;
}
SAcctObj *(*mgmtGetAcct)(char *acctName) = mgmtGetAcctImp;
......@@ -137,7 +137,7 @@ int32_t mgmtCheckTableLimitImp(SAcctObj *pAcct, SCreateTableMsg *pCreate) {
int32_t (*mgmtCheckTableLimit)(SAcctObj *pAcct, SCreateTableMsg *pCreate) = mgmtCheckTableLimitImp;
void mgmtCheckAcctImp() {
SAcctObj *pAcct = &acctObj;
SAcctObj *pAcct = &tsAcctObj;
pAcct->acctId = 0;
strcpy(pAcct->user, "root");
......
......@@ -14,12 +14,12 @@
*/
#define _DEFAULT_SOURCE
#include "mgmtBalance.h"
#include "mgmtDnode.h"
#include "dnodeModule.h"
#include "tstatus.h"
#include "tglobalcfg.h"
#include "tmodule.h"
#include "tstatus.h"
#include "ttime.h"
#include "mgmtBalance.h"
#include "mgmtDnode.h"
void mgmtStartBalanceTimerImp(int64_t mseconds) {}
void (*mgmtStartBalanceTimer)(int64_t mseconds) = mgmtStartBalanceTimerImp;
......@@ -32,7 +32,7 @@ void (*mgmtCleanupBalance)() = mgmtCleanupBalanceImp;
int32_t mgmtAllocVnodesImp(SVgObj *pVgroup) {
int selectedVnode = -1;
SDnodeObj *pDnode = &dnodeObj;
SDnodeObj *pDnode = &tsDnodeObj;
int lastAllocVode = pDnode->lastAllocVnode;
for (int i = 0; i < pDnode->numOfVnodes; i++) {
......@@ -59,21 +59,34 @@ int32_t (*mgmtAllocVnodes)(SVgObj *pVgroup) = mgmtAllocVnodesImp;
bool mgmtCheckModuleInDnodeImp(SDnodeObj *pDnode, int moduleType) {
return tsModule[moduleType].num != 0;
}
bool (*mgmtCheckModuleInDnode)(SDnodeObj *pDnode, int moduleType) = mgmtCheckModuleInDnodeImp;
char *mgmtGetVnodeStatusImp(SVgObj *pVgroup, SVnodeGid *pVnode) { return "master"; }
char *mgmtGetVnodeStatusImp(SVgObj *pVgroup, SVnodeGid *pVnode) {
return "master";
}
char *(*mgmtGetVnodeStatus)(SVgObj *pVgroup, SVnodeGid *pVnode) = mgmtGetVnodeStatusImp;
bool mgmtCheckVnodeReadyImp(SDnodeObj *pDnode, SVgObj *pVgroup, SVnodeGid *pVnode) { return true; }
bool mgmtCheckVnodeReadyImp(SDnodeObj *pDnode, SVgObj *pVgroup, SVnodeGid *pVnode) {
return true;
}
bool (*mgmtCheckVnodeReady)(SDnodeObj *pDnode, SVgObj *pVgroup, SVnodeGid *pVnode) = mgmtCheckVnodeReadyImp;
void mgmtUpdateDnodeStateImp(SDnodeObj *pDnode, int lbStatus) {
}
void mgmtUpdateDnodeStateImp(SDnodeObj *pDnode, int lbStatus) {}
void (*mgmtUpdateDnodeState)(SDnodeObj *pDnode, int lbStatus) = mgmtUpdateDnodeStateImp;
void mgmtUpdateVgroupStateImp(SVgObj *pVgroup, int lbStatus, int srcIp) {}
void mgmtUpdateVgroupStateImp(SVgObj *pVgroup, int lbStatus, int srcIp) {
}
void (*mgmtUpdateVgroupState)(SVgObj *pVgroup, int lbStatus, int srcIp) = mgmtUpdateVgroupStateImp;
bool mgmtAddVnodeImp(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode) { return false; }
bool mgmtAddVnodeImp(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode) {
return false;
}
bool (*mgmtAddVnode)(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode) = mgmtAddVnodeImp;
......@@ -15,35 +15,33 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "mgmtDnode.h"
#include "mgmtDb.h"
#include "taoserror.h"
#include "tschemautil.h"
#include "tstatus.h"
#include "mnode.h"
#include "mgmtAcct.h"
#include "mgmtGrant.h"
#include "mgmtBalance.h"
#include "mgmtDb.h"
#include "mgmtDnode.h"
#include "mgmtDnodeInt.h"
#include "mgmtGrant.h"
#include "mgmtTable.h"
#include "mgmtUtil.h"
#include "mgmtVgroup.h"
#include "mgmtTable.h"
#include "tschemautil.h"
#include "tstatus.h"
#include "mnode.h"
#include "taoserror.h"
extern void *tsVgroupSdb;
void *tsDbSdb = NULL;
extern void *vgSdb;
int tsDbUpdateSize;
void *(*mgmtDbActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int size, int *ssize);
void *mgmtDbActionInsert(void *row, char *str, int size, int *ssize);
void *mgmtDbActionDelete(void *row, char *str, int size, int *ssize);
void *mgmtDbActionUpdate(void *row, char *str, int size, int *ssize);
void *mgmtDbActionEncode(void *row, char *str, int size, int *ssize);
void *mgmtDbActionDecode(void *row, char *str, int size, int *ssize);
void *mgmtDbActionBeforeBatchUpdate(void *row, char *str, int size, int *ssize);
void *mgmtDbActionBatchUpdate(void *row, char *str, int size, int *ssize);
void *mgmtDbActionAfterBatchUpdate(void *row, char *str, int size, int *ssize);
void *mgmtDbActionReset(void *row, char *str, int size, int *ssize);
void *mgmtDbActionDestroy(void *row, char *str, int size, int *ssize);
int32_t tsDbUpdateSize;
void *(*mgmtDbActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtDbActionInsert(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtDbActionDelete(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtDbActionUpdate(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtDbActionEncode(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtDbActionDecode(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtDbActionReset(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtDbActionDestroy(void *row, char *str, int32_t size, int32_t *ssize);
void mgmtDbActionInit() {
mgmtDbActionFp[SDB_TYPE_INSERT] = mgmtDbActionInsert;
......@@ -51,31 +49,17 @@ void mgmtDbActionInit() {
mgmtDbActionFp[SDB_TYPE_UPDATE] = mgmtDbActionUpdate;
mgmtDbActionFp[SDB_TYPE_ENCODE] = mgmtDbActionEncode;
mgmtDbActionFp[SDB_TYPE_DECODE] = mgmtDbActionDecode;
mgmtDbActionFp[SDB_TYPE_BEFORE_BATCH_UPDATE] = mgmtDbActionBeforeBatchUpdate;
mgmtDbActionFp[SDB_TYPE_BATCH_UPDATE] = mgmtDbActionBatchUpdate;
mgmtDbActionFp[SDB_TYPE_AFTER_BATCH_UPDATE] = mgmtDbActionAfterBatchUpdate;
mgmtDbActionFp[SDB_TYPE_RESET] = mgmtDbActionReset;
mgmtDbActionFp[SDB_TYPE_DESTROY] = mgmtDbActionDestroy;
}
void *mgmtDbAction(char action, void *row, char *str, int size, int *ssize) {
void *mgmtDbAction(char action, void *row, char *str, int32_t size, int32_t *ssize) {
if (mgmtDbActionFp[(uint8_t)action] != NULL) {
return (*(mgmtDbActionFp[(uint8_t)action]))(row, str, size, ssize);
}
return NULL;
}
void mgmtGetAcctStr(char *src, char *dest) {
char *pos = strstr(src, TS_PATH_DELIMITER);
while ((pos != NULL) && (*src != *pos)) {
*dest = *src;
src++;
dest++;
}
*dest = 0;
}
int32_t mgmtInitDbs() {
void * pNode = NULL;
SDbObj * pDb = NULL;
......@@ -118,9 +102,11 @@ int32_t mgmtInitDbs() {
return 0;
}
SDbObj *mgmtGetDb(char *db) { return (SDbObj *)sdbGetRow(tsDbSdb, db); }
SDbObj *mgmtGetDb(char *db) {
return (SDbObj *)sdbGetRow(tsDbSdb, db);
}
SDbObj *mgmtGetDbByMeterId(char *meterId) {
SDbObj *mgmtGetDbByTableId(char *meterId) {
char db[TSDB_TABLE_ID_LEN], *pos;
pos = strstr(meterId, TS_PATH_DELIMITER);
......@@ -131,7 +117,7 @@ SDbObj *mgmtGetDbByMeterId(char *meterId) {
return (SDbObj *)sdbGetRow(tsDbSdb, db);
}
int mgmtCheckDbParams(SCreateDbMsg *pCreate) {
int32_t mgmtCheckDbParams(SCreateDbMsg *pCreate) {
// assign default parameters
if (pCreate->maxSessions < 0) pCreate->maxSessions = tsSessionsPerVnode; //
if (pCreate->cacheBlockSize < 0) pCreate->cacheBlockSize = tsCacheBlockSize; //
......@@ -176,16 +162,13 @@ int mgmtCheckDbParams(SCreateDbMsg *pCreate) {
return TSDB_CODE_SUCCESS;
}
int mgmtCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate) {
SDbObj *pDb;
int code;
code = mgmtCheckDbLimit(pAcct);
int32_t mgmtCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate) {
int32_t code = mgmtCheckDbLimit(pAcct);
if (code != 0) {
return code;
}
pDb = (SDbObj *)sdbGetRow(tsDbSdb, pCreate->db);
SDbObj *pDb = (SDbObj *)sdbGetRow(tsDbSdb, pCreate->db);
if (pDb != NULL) {
return TSDB_CODE_DB_ALREADY_EXIST;
}
......@@ -215,14 +198,16 @@ int mgmtCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate) {
return code;
}
int mgmtUpdateDb(SDbObj *pDb) { return sdbUpdateRow(tsDbSdb, pDb, tsDbUpdateSize, 1); }
int32_t mgmtUpdateDb(SDbObj *pDb) {
return sdbUpdateRow(tsDbSdb, pDb, tsDbUpdateSize, 1);
}
int mgmtSetDbDropping(SDbObj *pDb) {
int32_t mgmtSetDbDropping(SDbObj *pDb) {
if (pDb->dropStatus == TSDB_DB_STATUS_DROP_FROM_SDB) return 0;
SVgObj *pVgroup = pDb->pHead;
while (pVgroup != NULL) {
for (int i = 0; i < pVgroup->numOfVnodes; i++) {
for (int32_t i = 0; i < pVgroup->numOfVnodes; i++) {
SVnodeGid *pVnodeGid = pVgroup->vnodeGid + i;
SDnodeObj *pDnode = mgmtGetDnode(pVnodeGid->ip);
if (pDnode == NULL) continue;
......@@ -257,7 +242,7 @@ int mgmtSetDbDropping(SDbObj *pDb) {
bool mgmtCheckDropDbFinished(SDbObj *pDb) {
SVgObj *pVgroup = pDb->pHead;
while (pVgroup) {
for (int i = 0; i < pVgroup->numOfVnodes; i++) {
for (int32_t i = 0; i < pVgroup->numOfVnodes; i++) {
SVnodeGid *pVnodeGid = pVgroup->vnodeGid + i;
SDnodeObj *pDnode = mgmtGetDnode(pVnodeGid->ip);
......@@ -292,7 +277,7 @@ void mgmtDropDbFromSdb(SDbObj *pDb) {
mPrint("db:%s database drop finished", pDb->name);
}
int mgmtDropDb(SDbObj *pDb) {
int32_t mgmtDropDb(SDbObj *pDb) {
if (pDb->dropStatus == TSDB_DB_STATUS_DROPPING) {
bool finished = mgmtCheckDropDbFinished(pDb);
if (!finished) {
......@@ -309,15 +294,14 @@ int mgmtDropDb(SDbObj *pDb) {
mgmtDropDbFromSdb(pDb);
return 0;
} else {
int code = mgmtSetDbDropping(pDb);
int32_t code = mgmtSetDbDropping(pDb);
if (code != 0) return code;
return TSDB_CODE_ACTION_IN_PROGRESS;
}
}
int mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists) {
SDbObj *pDb;
pDb = (SDbObj *)sdbGetRow(tsDbSdb, name);
int32_t mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists) {
SDbObj *pDb = (SDbObj *)sdbGetRow(tsDbSdb, name);
if (pDb == NULL) {
if (ignoreNotExists) return TSDB_CODE_SUCCESS;
mWarn("db:%s is not there", name);
......@@ -344,17 +328,16 @@ void mgmtMonitorDbDrop(void *unused, void *unusedt) {
}
}
int mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) {
SDbObj *pDb;
int code = TSDB_CODE_SUCCESS;
int32_t mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) {
int32_t code = TSDB_CODE_SUCCESS;
pDb = (SDbObj *)sdbGetRow(tsDbSdb, pAlter->db);
SDbObj *pDb = (SDbObj *) sdbGetRow(tsDbSdb, pAlter->db);
if (pDb == NULL) {
mTrace("db:%s is not exist", pAlter->db);
return TSDB_CODE_INVALID_DB;
}
int oldReplicaNum = pDb->cfg.replications;
int32_t oldReplicaNum = pDb->cfg.replications;
if (pAlter->daysToKeep > 0) {
mTrace("db:%s daysToKeep:%d change to %d", pDb->name, pDb->cfg.daysToKeep, pAlter->daysToKeep);
pDb->cfg.daysToKeep = pAlter->daysToKeep;
......@@ -400,7 +383,7 @@ int mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) {
}
if (pAlter->maxSessions > 0) {
//rebuild meterList in mgmtVgroup.c
sdbUpdateRow(vgSdb, pVgroup, tsVgUpdateSize, 0);
sdbUpdateRow(tsVgroupSdb, pVgroup, tsVgUpdateSize, 0);
}
mgmtSendVPeersMsg(pVgroup);
pVgroup = pVgroup->next;
......@@ -410,9 +393,9 @@ int mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) {
return code;
}
int mgmtUseDb(SConnObj *pConn, char *name) {
int32_t mgmtUseDb(SConnObj *pConn, char *name) {
SDbObj *pDb;
int code = TSDB_CODE_INVALID_DB;
int32_t code = TSDB_CODE_INVALID_DB;
// here change the default db for connect.
pDb = mgmtGetDb(name);
......@@ -424,12 +407,11 @@ int mgmtUseDb(SConnObj *pConn, char *name) {
return code;
}
int mgmtAddVgroupIntoDb(SDbObj *pDb, SVgObj *pVgroup) {
int32_t mgmtAddVgroupIntoDb(SDbObj *pDb, SVgObj *pVgroup) {
pVgroup->next = pDb->pHead;
pVgroup->prev = NULL;
if (pDb->pHead) pDb->pHead->prev = pVgroup;
if (pDb->pTail == NULL) pDb->pTail = pVgroup;
pDb->pHead = pVgroup;
......@@ -438,12 +420,11 @@ int mgmtAddVgroupIntoDb(SDbObj *pDb, SVgObj *pVgroup) {
return 0;
}
int mgmtAddVgroupIntoDbTail(SDbObj *pDb, SVgObj *pVgroup) {
int32_t mgmtAddVgroupIntoDbTail(SDbObj *pDb, SVgObj *pVgroup) {
pVgroup->next = NULL;
pVgroup->prev = pDb->pTail;
if (pDb->pTail) pDb->pTail->next = pVgroup;
if (pDb->pHead == NULL) pDb->pHead = pVgroup;
pDb->pTail = pVgroup;
......@@ -452,46 +433,36 @@ int mgmtAddVgroupIntoDbTail(SDbObj *pDb, SVgObj *pVgroup) {
return 0;
}
int mgmtRemoveVgroupFromDb(SDbObj *pDb, SVgObj *pVgroup) {
int32_t mgmtRemoveVgroupFromDb(SDbObj *pDb, SVgObj *pVgroup) {
if (pVgroup->prev) pVgroup->prev->next = pVgroup->next;
if (pVgroup->next) pVgroup->next->prev = pVgroup->prev;
if (pVgroup->prev == NULL) pDb->pHead = pVgroup->next;
if (pVgroup->next == NULL) pDb->pTail = pVgroup->prev;
pDb->numOfVgroups--;
return 0;
}
int mgmtMoveVgroupToTail(SDbObj *pDb, SVgObj *pVgroup) {
int32_t mgmtMoveVgroupToTail(SDbObj *pDb, SVgObj *pVgroup) {
mgmtRemoveVgroupFromDb(pDb, pVgroup);
mgmtAddVgroupIntoDbTail(pDb, pVgroup);
return 0;
}
int mgmtMoveVgroupToHead(SDbObj *pDb, SVgObj *pVgroup) {
int32_t mgmtMoveVgroupToHead(SDbObj *pDb, SVgObj *pVgroup) {
mgmtRemoveVgroupFromDb(pDb, pVgroup);
mgmtAddVgroupIntoDb(pDb, pVgroup);
return 0;
}
int mgmtShowTables(SAcctObj *pAcct, char *db) {
int code;
code = 0;
return code;
void mgmtCleanUpDbs() {
sdbCloseTable(tsDbSdb);
}
void mgmtCleanUpDbs() { sdbCloseTable(tsDbSdb); }
int mgmtGetDbMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int cols = 0;
int32_t mgmtGetDbMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int32_t cols = 0;
SSchema *pSchema = tsGetSchema(pMeta);
......@@ -619,11 +590,12 @@ int mgmtGetDbMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
// pShow->numOfRows = sdbGetNumOfRows (tsDbSdb);
pShow->numOfRows = pConn->pAcct->acctInfo.numOfDbs;
pShow->pNode = pConn->pAcct->pHead;
......@@ -636,11 +608,11 @@ char *mgmtGetDbStr(char *src) {
return ++pos;
}
int mgmtRetrieveDbs(SShowObj *pShow, char *data, int rows, SConnObj *pConn) {
int numOfRows = 0;
int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) {
int32_t numOfRows = 0;
SDbObj *pDb = NULL;
char * pWrite;
int cols = 0;
int32_t cols = 0;
while (numOfRows < rows) {
pDb = (SDbObj *)pShow->pNode;
......@@ -753,8 +725,8 @@ int mgmtRetrieveDbs(SShowObj *pShow, char *data, int rows, SConnObj *pConn) {
return numOfRows;
}
void *mgmtDbActionInsert(void *row, char *str, int size, int *ssize) {
SDbObj * pDb = (SDbObj *)row;
void *mgmtDbActionInsert(void *row, char *str, int32_t size, int32_t *ssize) {
SDbObj *pDb = (SDbObj *) row;
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
pDb->pHead = NULL;
......@@ -767,19 +739,22 @@ void *mgmtDbActionInsert(void *row, char *str, int size, int *ssize) {
return NULL;
}
void *mgmtDbActionDelete(void *row, char *str, int size, int *ssize) {
SDbObj * pDb = (SDbObj *)row;
void *mgmtDbActionDelete(void *row, char *str, int32_t size, int32_t *ssize) {
SDbObj *pDb = (SDbObj *) row;
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
mgmtRemoveDbFromAcct(pAcct, pDb);
return NULL;
}
void *mgmtDbActionUpdate(void *row, char *str, int size, int *ssize) {
void *mgmtDbActionUpdate(void *row, char *str, int32_t size, int32_t *ssize) {
return mgmtDbActionReset(row, str, size, ssize);
}
void *mgmtDbActionEncode(void *row, char *str, int size, int *ssize) {
SDbObj *pDb = (SDbObj *)row;
int tsize = pDb->updateEnd - (char *)pDb;
void *mgmtDbActionEncode(void *row, char *str, int32_t size, int32_t *ssize) {
SDbObj *pDb = (SDbObj *) row;
int32_t tsize = pDb->updateEnd - (char *) pDb;
if (size < tsize) {
*ssize = -1;
} else {
......@@ -789,27 +764,26 @@ void *mgmtDbActionEncode(void *row, char *str, int size, int *ssize) {
return NULL;
}
void *mgmtDbActionDecode(void *row, char *str, int size, int *ssize) {
SDbObj *pDb = (SDbObj *)malloc(sizeof(SDbObj));
void *mgmtDbActionDecode(void *row, char *str, int32_t size, int32_t *ssize) {
SDbObj *pDb = (SDbObj *) malloc(sizeof(SDbObj));
if (pDb == NULL) return NULL;
memset(pDb, 0, sizeof(SDbObj));
int tsize = pDb->updateEnd - (char *)pDb;
int32_t tsize = pDb->updateEnd - (char *)pDb;
memcpy(pDb, str, tsize);
return (void *)pDb;
}
void *mgmtDbActionBeforeBatchUpdate(void *row, char *str, int size, int *ssize) { return NULL; }
void *mgmtDbActionBatchUpdate(void *row, char *str, int size, int *ssize) { return NULL; }
void *mgmtDbActionAfterBatchUpdate(void *row, char *str, int size, int *ssize) { return NULL; }
void *mgmtDbActionReset(void *row, char *str, int size, int *ssize) {
SDbObj *pDb = (SDbObj *)row;
int tsize = pDb->updateEnd - (char *)pDb;
void *mgmtDbActionReset(void *row, char *str, int32_t size, int32_t *ssize) {
SDbObj *pDb = (SDbObj *) row;
int32_t tsize = pDb->updateEnd - (char *) pDb;
memcpy(pDb, str, tsize);
return NULL;
}
void *mgmtDbActionDestroy(void *row, char *str, int size, int *ssize) {
void *mgmtDbActionDestroy(void *row, char *str, int32_t size, int32_t *ssize) {
tfree(row);
return NULL;
}
......@@ -14,19 +14,18 @@
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "dnodeSystem.h"
#include "tmodule.h"
#include "tschemautil.h"
#include "tstatus.h"
#include "mnode.h"
#include "mgmtDnode.h"
#include "mgmtBalance.h"
#include "tschemautil.h"
#include "tstatus.h"
#include "dnodeModule.h"
SDnodeObj tsDnodeObj;
void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode) {
int maxVnodes = pDnode->numOfCores * tsNumOfVnodesPerCore;
int32_t maxVnodes = pDnode->numOfCores * tsNumOfVnodesPerCore;
maxVnodes = maxVnodes > TSDB_MAX_VNODES ? TSDB_MAX_VNODES : maxVnodes;
maxVnodes = maxVnodes < TSDB_MIN_VNODES ? TSDB_MIN_VNODES : maxVnodes;
if (pDnode->numOfTotalVnodes != 0) {
......@@ -39,19 +38,14 @@ void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode) {
pDnode->numOfVnodes = maxVnodes;
pDnode->numOfFreeVnodes = maxVnodes;
pDnode->openVnodes = 0;
#ifdef CLUSTER
pDnode->status = TSDB_DN_STATUS_OFFLINE;
#else
pDnode->status = TSDB_DN_STATUS_READY;
#endif
}
void mgmtCalcNumOfFreeVnodes(SDnodeObj *pDnode) {
int totalVnodes = 0;
int32_t totalVnodes = 0;
mTrace("dnode:%s, begin calc free vnodes", taosIpStr(pDnode->privateIp));
for (int i = 0; i < pDnode->numOfVnodes; ++i) {
for (int32_t i = 0; i < pDnode->numOfVnodes; ++i) {
SVnodeLoad *pVload = pDnode->vload + i;
if (pVload->vgId != 0) {
mTrace("%d-dnode:%s, calc free vnodes, exist vnode:%d, vgroup:%d, state:%d %s, dropstate:%d %s, syncstatus:%d %s",
......@@ -68,10 +62,10 @@ void mgmtCalcNumOfFreeVnodes(SDnodeObj *pDnode) {
taosIpStr(pDnode->privateIp), pDnode->numOfVnodes, pDnode->numOfFreeVnodes, totalVnodes);
}
void mgmtSetDnodeVgid(SVnodeGid vnodeGid[], int numOfVnodes, int vgId) {
void mgmtSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes, int32_t vgId) {
SDnodeObj *pDnode;
for (int i = 0; i < numOfVnodes; ++i) {
for (int32_t i = 0; i < numOfVnodes; ++i) {
pDnode = mgmtGetDnode(vnodeGid[i].ip);
if (pDnode) {
SVnodeLoad *pVload = pDnode->vload + vnodeGid[i].vnode;
......@@ -86,10 +80,10 @@ void mgmtSetDnodeVgid(SVnodeGid vnodeGid[], int numOfVnodes, int vgId) {
}
}
void mgmtUnSetDnodeVgid(SVnodeGid vnodeGid[], int numOfVnodes) {
void mgmtUnSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes) {
SDnodeObj *pDnode;
for (int i = 0; i < numOfVnodes; ++i) {
for (int32_t i = 0; i < numOfVnodes; ++i) {
pDnode = mgmtGetDnode(vnodeGid[i].ip);
if (pDnode) {
SVnodeLoad *pVload = pDnode->vload + vnodeGid[i].vnode;
......@@ -102,8 +96,8 @@ void mgmtUnSetDnodeVgid(SVnodeGid vnodeGid[], int numOfVnodes) {
}
}
int mgmtGetDnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int cols = 0;
int32_t mgmtGetDnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int32_t cols = 0;
if (strcmp(pConn->pAcct->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
......@@ -155,7 +149,7 @@ int mgmtGetDnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
pShow->numOfRows = mgmtGetDnodesNum();
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
......@@ -164,11 +158,11 @@ int mgmtGetDnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
return 0;
}
int mgmtRetrieveDnodes(SShowObj *pShow, char *data, int rows, SConnObj *pConn) {
int numOfRows = 0;
int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) {
int32_t numOfRows = 0;
SDnodeObj *pDnode = NULL;
char * pWrite;
int cols = 0;
char *pWrite;
int32_t cols = 0;
char ipstr[20];
while (numOfRows < rows) {
......@@ -214,8 +208,8 @@ int mgmtRetrieveDnodes(SShowObj *pShow, char *data, int rows, SConnObj *pConn) {
return numOfRows;
}
int mgmtGetModuleMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int cols = 0;
int32_t mgmtGetModuleMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int32_t cols = 0;
if (strcmp(pConn->pAcct->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
......@@ -243,14 +237,16 @@ int mgmtGetModuleMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = 0;
SDnodeObj *pDnode = NULL;
while (1) {
pShow->pNode = mgmtGetNextDnode(pShow, (SDnodeObj **)&pDnode);
if (pDnode == NULL) break;
for (int moduleType = 0; moduleType < TSDB_MOD_MAX; ++moduleType) {
for (int32_t moduleType = 0; moduleType < TSDB_MOD_MAX; ++moduleType) {
if (mgmtCheckModuleInDnode(pDnode, moduleType)) {
pShow->numOfRows++;
}
......@@ -263,18 +259,18 @@ int mgmtGetModuleMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
return 0;
}
int mgmtRetrieveModules(SShowObj *pShow, char *data, int rows, SConnObj *pConn) {
int numOfRows = 0;
int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) {
int32_t numOfRows = 0;
SDnodeObj *pDnode = NULL;
char * pWrite;
int cols = 0;
int32_t cols = 0;
char ipstr[20];
while (numOfRows < rows) {
pShow->pNode = mgmtGetNextDnode(pShow, (SDnodeObj **)&pDnode);
if (pDnode == NULL) break;
for (int moduleType = 0; moduleType < TSDB_MOD_MAX; ++moduleType) {
for (int32_t moduleType = 0; moduleType < TSDB_MOD_MAX; ++moduleType) {
if (!mgmtCheckModuleInDnode(pDnode, moduleType)) {
continue;
}
......@@ -302,8 +298,8 @@ int mgmtRetrieveModules(SShowObj *pShow, char *data, int rows, SConnObj *pConn)
return numOfRows;
}
int mgmtGetConfigMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int cols = 0;
int32_t mgmtGetConfigMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int32_t cols = 0;
if (strcmp(pConn->pAcct->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
......@@ -325,10 +321,10 @@ int mgmtGetConfigMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
pShow->numOfRows = 0;
for (int i = tsGlobalConfigNum - 1; i >= 0; --i) {
for (int32_t i = tsGlobalConfigNum - 1; i >= 0; --i) {
SGlobalConfig *cfg = tsGlobalConfig + i;
if (!mgmtCheckConfigShow(cfg)) continue;
pShow->numOfRows++;
......@@ -340,15 +336,15 @@ int mgmtGetConfigMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
return 0;
}
int mgmtRetrieveConfigs(SShowObj *pShow, char *data, int rows, SConnObj *pConn) {
int numOfRows = 0;
int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) {
int32_t numOfRows = 0;
for (int i = tsGlobalConfigNum - 1; i >= 0 && numOfRows < rows; --i) {
for (int32_t i = tsGlobalConfigNum - 1; i >= 0 && numOfRows < rows; --i) {
SGlobalConfig *cfg = tsGlobalConfig + i;
if (!mgmtCheckConfigShow(cfg)) continue;
char *pWrite;
int cols = 0;
int32_t cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
snprintf(pWrite, TSDB_CFG_OPTION_LEN, "%s", cfg->option);
......@@ -388,7 +384,7 @@ int mgmtRetrieveConfigs(SShowObj *pShow, char *data, int rows, SConnObj *pConn)
}
int32_t mgmtGetVnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int cols = 0;
int32_t cols = 0;
if (strcmp(pConn->pAcct->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
......@@ -422,7 +418,7 @@ int32_t mgmtGetVnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
// TODO: if other thread drop dnode ????
SDnodeObj *pDnode = NULL;
......@@ -435,7 +431,7 @@ int32_t mgmtGetVnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
SVnodeLoad* pVnode;
pShow->numOfRows = 0;
for (int i = 0 ; i < TSDB_MAX_VNODES; i++) {
for (int32_t i = 0 ; i < TSDB_MAX_VNODES; i++) {
pVnode = &pDnode->vload[i];
if (0 != pVnode->vgId) {
pShow->numOfRows++;
......@@ -460,11 +456,11 @@ int32_t mgmtGetVnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
return 0;
}
int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int rows, SConnObj *pConn) {
int numOfRows = 0;
int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) {
int32_t numOfRows = 0;
SDnodeObj *pDnode = NULL;
char * pWrite;
int cols = 0;
int32_t cols = 0;
if (0 == rows) return 0;
......@@ -473,7 +469,7 @@ int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int rows, SConnObj *pCon
pDnode = (SDnodeObj *)(pShow->pNode);
if (pDnode != NULL) {
SVnodeLoad* pVnode;
for (int i = 0 ; i < TSDB_MAX_VNODES; i++) {
for (int32_t i = 0 ; i < TSDB_MAX_VNODES; i++) {
pVnode = &pDnode->vload[i];
if (0 == pVnode->vgId) {
continue;
......@@ -509,56 +505,76 @@ int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int rows, SConnObj *pCon
return numOfRows;
}
SDnodeObj dnodeObj;
extern uint32_t tsRebootTime;
SDnodeObj *mgmtGetDnodeImp(uint32_t ip) {
return &tsDnodeObj;
}
SDnodeObj* mgmtGetDnodeImp(uint32_t ip) { return &dnodeObj; }
SDnodeObj* (*mgmtGetDnode)(uint32_t ip) = mgmtGetDnodeImp;
SDnodeObj *(*mgmtGetDnode)(uint32_t ip) = mgmtGetDnodeImp;
int32_t mgmtUpdateDnodeImp(SDnodeObj *pDnode) {
return 0;
}
int32_t mgmtUpdateDnodeImp(SDnodeObj *pDnode) { return 0; }
int32_t (*mgmtUpdateDnode)(SDnodeObj *pDnode) = mgmtUpdateDnodeImp;
void mgmtCleanUpDnodesImp() {}
void mgmtCleanUpDnodesImp() {
}
void (*mgmtCleanUpDnodes)() = mgmtCleanUpDnodesImp;
int32_t mgmtInitDnodesImp() {
dnodeObj.privateIp = inet_addr(tsPrivateIp);;
dnodeObj.createdTime = (int64_t)tsRebootTime * 1000;
dnodeObj.lastReboot = tsRebootTime;
dnodeObj.numOfCores = (uint16_t)tsNumOfCores;
dnodeObj.status = TSDB_DN_STATUS_READY;
dnodeObj.alternativeRole = TSDB_DNODE_ROLE_ANY;
dnodeObj.numOfTotalVnodes = tsNumOfTotalVnodes;
dnodeObj.thandle = (void*)(1); //hack way
if (dnodeObj.numOfVnodes == TSDB_INVALID_VNODE_NUM) {
mgmtSetDnodeMaxVnodes(&dnodeObj);
mPrint("dnode first access, set total vnodes:%d", dnodeObj.numOfVnodes);
tsDnodeObj.privateIp = inet_addr(tsPrivateIp);;
tsDnodeObj.createdTime = taosGetTimestampMs();
tsDnodeObj.lastReboot = taosGetTimestampSec();
tsDnodeObj.numOfCores = (uint16_t) tsNumOfCores;
tsDnodeObj.status = TSDB_DN_STATUS_READY;
tsDnodeObj.alternativeRole = TSDB_DNODE_ROLE_ANY;
tsDnodeObj.numOfTotalVnodes = tsNumOfTotalVnodes;
tsDnodeObj.thandle = (void *) (1); //hack way
if (tsDnodeObj.numOfVnodes == TSDB_INVALID_VNODE_NUM) {
mgmtSetDnodeMaxVnodes(&tsDnodeObj);
mPrint("dnode first access, set total vnodes:%d", tsDnodeObj.numOfVnodes);
}
tsDnodeObj.status = TSDB_DN_STATUS_READY;
return 0;
}
int32_t (*mgmtInitDnodes)() = mgmtInitDnodesImp;
int32_t mgmtGetDnodesNumImp() { return 1; }
int32_t mgmtGetDnodesNumImp() {
return 1;
}
int32_t (*mgmtGetDnodesNum)() = mgmtGetDnodesNumImp;
void* mgmtGetNextDnodeImp(SShowObj *pShow, SDnodeObj **pDnode) {
void *mgmtGetNextDnodeImp(SShowObj *pShow, SDnodeObj **pDnode) {
if (*pDnode == NULL) {
*pDnode = &dnodeObj;
*pDnode = &tsDnodeObj;
} else {
*pDnode = NULL;
}
return *pDnode;
}
void* (*mgmtGetNextDnode)(SShowObj *pShow, SDnodeObj **pDnode) = mgmtGetNextDnodeImp;
int32_t mgmtGetScoresMetaImp(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { return TSDB_CODE_OPS_NOT_SUPPORT; }
void *(*mgmtGetNextDnode)(SShowObj *pShow, SDnodeObj **pDnode) = mgmtGetNextDnodeImp;
int32_t mgmtGetScoresMetaImp(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
return TSDB_CODE_OPS_NOT_SUPPORT;
}
int32_t (*mgmtGetScoresMeta)(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) = mgmtGetScoresMetaImp;
int32_t mgmtRetrieveScoresImp(SShowObj *pShow, char *data, int rows, SConnObj *pConn) { return 0; }
int32_t (*mgmtRetrieveScores)(SShowObj *pShow, char *data, int rows, SConnObj *pConn) = mgmtRetrieveScoresImp;
int32_t mgmtRetrieveScoresImp(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) {
return 0;
}
int32_t (*mgmtRetrieveScores)(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) = mgmtRetrieveScoresImp;
void mgmtSetDnodeUnRemoveImp(SDnodeObj *pDnode) {
}
void mgmtSetDnodeUnRemoveImp(SDnodeObj *pDnode) {}
void (*mgmtSetDnodeUnRemove)(SDnodeObj *pDnode) = mgmtSetDnodeUnRemoveImp;
bool mgmtCheckConfigShowImp(SGlobalConfig *cfg) {
......@@ -568,4 +584,5 @@ bool mgmtCheckConfigShowImp(SGlobalConfig *cfg) {
return false;
return true;
}
bool (*mgmtCheckConfigShow)(SGlobalConfig *cfg) = mgmtCheckConfigShowImp;
\ No newline at end of file
......@@ -538,7 +538,7 @@ void (*mgmtCleanUpDnodeInt)() = mgmtCleanUpDnodeIntImp;
void mgmtProcessDnodeStatusImp(void *handle, void *tmrId) {
/*
SDnodeObj *pObj = &dnodeObj;
SDnodeObj *pObj = &tsDnodeObj;
pObj->openVnodes = tsOpenVnodes;
pObj->status = TSDB_DN_STATUS_READY;
......
......@@ -16,8 +16,14 @@
#define _DEFAULT_SOURCE
#include "mgmtMnode.h"
int32_t mgmtGetMnodeMetaImp(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { return TSDB_CODE_OPS_NOT_SUPPORT; }
int32_t mgmtGetMnodeMetaImp(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
return TSDB_CODE_OPS_NOT_SUPPORT;
}
int32_t (*mgmtGetMnodeMeta)(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) = mgmtGetMnodeMetaImp;
int32_t mgmtRetrieveMnodesImp(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) { return 0; }
int32_t mgmtRetrieveMnodesImp(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) {
return 0;
}
int32_t (*mgmtRetrieveMnodes)(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) = mgmtRetrieveMnodesImp;
......@@ -352,7 +352,7 @@ int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int
}
}
SDbObj *pDb = mgmtGetDbByMeterId(pTable->tableId);
SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId);
if (pDb == NULL) {
mError("table: %s not belongs to any database", pTable->tableId);
return TSDB_CODE_APP_ERROR;
......@@ -388,7 +388,7 @@ int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName)
return TSDB_CODE_APP_ERROR;
}
SDbObj *pDb = mgmtGetDbByMeterId(pTable->tableId);
SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId);
if (pDb == NULL) {
mError("table: %s not belongs to any database", pTable->tableId);
return TSDB_CODE_APP_ERROR;
......
......@@ -229,7 +229,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
// memcpy(pCreateMsg->schema, pInfo->tags, sizeof(STagData));
// strcpy(pCreateMsg->meterId, pInfo->meterId);
//
// SDbObj* pMeterDb = mgmtGetDbByMeterId(pCreateMsg->meterId);
// SDbObj* pMeterDb = mgmtGetDbByTableId(pCreateMsg->meterId);
// mTrace("meter:%s, pConnDb:%p, pConnDbName:%s, pMeterDb:%p, pMeterDbName:%s",
// pCreateMsg->meterId, pDb, pDb->name, pMeterDb, pMeterDb->name);
// assert(pDb == pMeterDb);
......@@ -411,7 +411,7 @@ int mgmtProcessMultiMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
//
// // get meter schema, and fill into resp payload
// pMeterObj = mgmtGetTable(tblName);
// pDbObj = mgmtGetDbByMeterId(tblName);
// pDbObj = mgmtGetDbByTableId(tblName);
//
// if (pMeterObj == NULL || (pDbObj == NULL)) {
// continue;
......
......@@ -188,7 +188,7 @@ int32_t mgmtInitSuperTables() {
break;
}
SDbObj *pDb = mgmtGetDbByMeterId(pTable->tableId);
SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId);
if (pDb == NULL) {
mError("super table:%s, failed to get db, discard it", pTable->tableId);
sdbDeleteRow(tsSuperTableSdb, pTable);
......@@ -429,7 +429,7 @@ int32_t mgmtAddSuperTableColumn(SSuperTableObj *pMetric, SSchema schema[], int n
}
}
SDbObj *pDb = mgmtGetDbByMeterId(pMetric->tableId);
SDbObj *pDb = mgmtGetDbByTableId(pMetric->tableId);
if (pDb == NULL) {
mError("meter: %s not belongs to any database", pMetric->tableId);
return TSDB_CODE_APP_ERROR;
......@@ -468,7 +468,7 @@ int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pMetric, char *colName) {
return TSDB_CODE_APP_ERROR;
}
SDbObj *pDb = mgmtGetDbByMeterId(pMetric->tableId);
SDbObj *pDb = mgmtGetDbByTableId(pMetric->tableId);
if (pDb == NULL) {
mError("table: %s not belongs to any database", pMetric->tableId);
return TSDB_CODE_APP_ERROR;
......
......@@ -15,37 +15,34 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tlog.h"
#include "tschemautil.h"
#include "tstatus.h"
#include "mnode.h"
#include "mgmtBalance.h"
#include "mgmtDb.h"
#include "mgmtDnode.h"
#include "mgmtDnodeInt.h"
#include "mgmtVgroup.h"
#include "mgmtTable.h"
#include "tschemautil.h"
#include "tlog.h"
#include "tstatus.h"
#include "taoserror.h"
#include "mgmtVgroup.h"
void * vgSdb = NULL;
int tsVgUpdateSize;
void * tsVgroupSdb = NULL;
int32_t tsVgUpdateSize;
extern void *tsDbSdb;
extern void *acctSdb;
extern void *tsUserSdb;
extern void *dnodeSdb;
void *(*mgmtVgroupActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int size, int *ssize);
void *mgmtVgroupActionInsert(void *row, char *str, int size, int *ssize);
void *mgmtVgroupActionDelete(void *row, char *str, int size, int *ssize);
void *mgmtVgroupActionUpdate(void *row, char *str, int size, int *ssize);
void *mgmtVgroupActionEncode(void *row, char *str, int size, int *ssize);
void *mgmtVgroupActionDecode(void *row, char *str, int size, int *ssize);
void *mgmtVgroupActionBeforeBatchUpdate(void *row, char *str, int size, int *ssize);
void *mgmtVgroupActionBatchUpdate(void *row, char *str, int size, int *ssize);
void *mgmtVgroupActionAfterBatchUpdate(void *row, char *str, int size, int *ssize);
void *mgmtVgroupActionReset(void *row, char *str, int size, int *ssize);
void *mgmtVgroupActionDestroy(void *row, char *str, int size, int *ssize);
void *(*mgmtVgroupActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtVgroupActionInsert(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtVgroupActionDelete(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtVgroupActionUpdate(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtVgroupActionEncode(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtVgroupActionDecode(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtVgroupActionBeforeBatchUpdate(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtVgroupActionBatchUpdate(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtVgroupActionAfterBatchUpdate(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtVgroupActionReset(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtVgroupActionDestroy(void *row, char *str, int32_t size, int32_t *ssize);
void mgmtVgroupActionInit() {
mgmtVgroupActionFp[SDB_TYPE_INSERT] = mgmtVgroupActionInsert;
......@@ -53,21 +50,18 @@ void mgmtVgroupActionInit() {
mgmtVgroupActionFp[SDB_TYPE_UPDATE] = mgmtVgroupActionUpdate;
mgmtVgroupActionFp[SDB_TYPE_ENCODE] = mgmtVgroupActionEncode;
mgmtVgroupActionFp[SDB_TYPE_DECODE] = mgmtVgroupActionDecode;
mgmtVgroupActionFp[SDB_TYPE_BEFORE_BATCH_UPDATE] = mgmtVgroupActionBeforeBatchUpdate;
mgmtVgroupActionFp[SDB_TYPE_BATCH_UPDATE] = mgmtVgroupActionBatchUpdate;
mgmtVgroupActionFp[SDB_TYPE_AFTER_BATCH_UPDATE] = mgmtVgroupActionAfterBatchUpdate;
mgmtVgroupActionFp[SDB_TYPE_RESET] = mgmtVgroupActionReset;
mgmtVgroupActionFp[SDB_TYPE_DESTROY] = mgmtVgroupActionDestroy;
}
void *mgmtVgroupAction(char action, void *row, char *str, int size, int *ssize) {
void *mgmtVgroupAction(char action, void *row, char *str, int32_t size, int32_t *ssize) {
if (mgmtVgroupActionFp[(uint8_t)action] != NULL) {
return (*(mgmtVgroupActionFp[(uint8_t)action]))(row, str, size, ssize);
}
return NULL;
}
int mgmtInitVgroups() {
int32_t mgmtInitVgroups() {
void * pNode = NULL;
SVgObj *pVgroup = NULL;
......@@ -76,14 +70,14 @@ int mgmtInitVgroups() {
SVgObj tObj;
tsVgUpdateSize = tObj.updateEnd - (int8_t *)&tObj;
vgSdb = sdbOpenTable(tsMaxVGroups, sizeof(SVgObj), "vgroups", SDB_KEYTYPE_AUTO, tsMgmtDirectory, mgmtVgroupAction);
if (vgSdb == NULL) {
tsVgroupSdb = sdbOpenTable(tsMaxVGroups, sizeof(SVgObj), "vgroups", SDB_KEYTYPE_AUTO, tsMgmtDirectory, mgmtVgroupAction);
if (tsVgroupSdb == NULL) {
mError("failed to init vgroup data");
return -1;
}
while (1) {
pNode = sdbFetchRow(vgSdb, pNode, (void **)&pVgroup);
pNode = sdbFetchRow(tsVgroupSdb, pNode, (void **)&pVgroup);
if (pVgroup == NULL) break;
SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
......@@ -91,7 +85,7 @@ int mgmtInitVgroups() {
pVgroup->prev = NULL;
pVgroup->next = NULL;
int size = sizeof(STabObj *) * pDb->cfg.maxSessions;
int32_t size = sizeof(STabObj *) * pDb->cfg.maxSessions;
pVgroup->meterList = (STabObj **)malloc(size);
if (pVgroup->meterList == NULL) {
mError("failed to malloc(size:%d) for the meterList of vgroups", size);
......@@ -109,15 +103,10 @@ int mgmtInitVgroups() {
taosIdPoolReinit(pVgroup->idPool);
if (tsIsCluster) {
/*
* Upgrade from open source version to cluster version for the first time
*/
if (pVgroup->vnodeGid[0].publicIp == 0) {
pVgroup->vnodeGid[0].publicIp = inet_addr(tsPublicIp);
pVgroup->vnodeGid[0].ip = inet_addr(tsPrivateIp);
sdbUpdateRow(vgSdb, pVgroup, tsVgUpdateSize, 1);
}
sdbUpdateRow(tsVgroupSdb, pVgroup, tsVgUpdateSize, 1);
}
mgmtSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes, pVgroup->vgId);
......@@ -127,7 +116,9 @@ int mgmtInitVgroups() {
return 0;
}
SVgObj *mgmtGetVgroup(int vgId) { return (SVgObj *)sdbGetRow(vgSdb, &vgId); }
SVgObj *mgmtGetVgroup(int32_t vgId) {
return (SVgObj *)sdbGetRow(tsVgroupSdb, &vgId);
}
SVgObj *mgmtGetAvailVgroup(SDbObj *pDb) {
SVgObj *pVgroup = pDb->pHead;
......@@ -191,7 +182,7 @@ void mgmtProcessVgTimer(void *handle, void *tmrId) {
SVgObj *mgmtCreateVgroup(SDbObj *pDb) {
SVgObj *pVgroup;
int size;
int32_t size;
size = sizeof(SVgObj);
pVgroup = (SVgObj *)malloc(size);
......@@ -209,10 +200,10 @@ SVgObj *mgmtCreateVgroup(SDbObj *pDb) {
return NULL;
}
sdbInsertRow(vgSdb, pVgroup, 0);
sdbInsertRow(tsVgroupSdb, pVgroup, 0);
mTrace("vgroup:%d, vgroup is created, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
for (int i = 0; i < pVgroup->numOfVnodes; ++i)
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i)
mTrace("vgroup:%d, dnode:%s vnode:%d is created", pVgroup->vgId, taosIpStr(pVgroup->vnodeGid[i].ip), pVgroup->vnodeGid[i].vnode);
mgmtSendVPeersMsg(pVgroup);
......@@ -220,11 +211,11 @@ SVgObj *mgmtCreateVgroup(SDbObj *pDb) {
return pVgroup;
}
int mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) {
int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) {
STabObj *pTable;
if (pVgroup->numOfMeters > 0) {
for (int i = 0; i < pDb->cfg.maxSessions; ++i) {
for (int32_t i = 0; i < pDb->cfg.maxSessions; ++i) {
if (pVgroup->meterList != NULL) {
pTable = pVgroup->meterList[i];
if (pTable) mgmtDropTable(pDb, pTable->meterId, 0);
......@@ -234,7 +225,7 @@ int mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) {
mTrace("vgroup:%d, db:%s replica:%d is deleted", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
mgmtSendFreeVnodeMsg(pVgroup);
sdbDeleteRow(vgSdb, pVgroup);
sdbDeleteRow(tsVgroupSdb, pVgroup);
return 0;
}
......@@ -245,7 +236,7 @@ void mgmtSetVgroupIdPool() {
SDbObj *pDb;
while (1) {
pNode = sdbFetchRow(vgSdb, pNode, (void **)&pVgroup);
pNode = sdbFetchRow(tsVgroupSdb, pNode, (void **)&pVgroup);
if (pVgroup == NULL || pVgroup->idPool == 0) break;
taosIdPoolSetFreeList(pVgroup->idPool);
......@@ -260,10 +251,10 @@ void mgmtSetVgroupIdPool() {
}
}
void mgmtCleanUpVgroups() { sdbCloseTable(vgSdb); }
void mgmtCleanUpVgroups() { sdbCloseTable(tsVgroupSdb); }
int mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int cols = 0;
int32_t mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int32_t cols = 0;
SDbObj *pDb = NULL;
if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
......@@ -290,7 +281,7 @@ int mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
int maxReplica = 0;
int32_t maxReplica = 0;
SVgObj *pVgroup = NULL;
STabObj *pTable = NULL;
if (pShow->payloadLen > 0 ) {
......@@ -311,7 +302,7 @@ int mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
}
}
for (int i = 0; i < maxReplica; ++i) {
for (int32_t i = 0; i < maxReplica; ++i) {
pShow->bytes[cols] = 16;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "ip");
......@@ -341,7 +332,7 @@ int mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
......@@ -356,14 +347,14 @@ int mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
return 0;
}
int mgmtRetrieveVgroups(SShowObj *pShow, char *data, int rows, SConnObj *pConn) {
int numOfRows = 0;
int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) {
int32_t numOfRows = 0;
SVgObj *pVgroup = NULL;
char * pWrite;
int cols = 0;
int32_t cols = 0;
char ipstr[20];
int maxReplica = 0;
int32_t maxReplica = 0;
SDbObj *pDb = NULL;
if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
......@@ -376,7 +367,7 @@ int mgmtRetrieveVgroups(SShowObj *pShow, char *data, int rows, SConnObj *pConn)
}
while (numOfRows < rows) {
// pShow->pNode = sdbFetchRow(vgSdb, pShow->pNode, (void **)&pVgroup);
// pShow->pNode = sdbFetchRow(tsVgroupSdb, pShow->pNode, (void **)&pVgroup);
pVgroup = (SVgObj *)pShow->pNode;
if (pVgroup == NULL) break;
pShow->pNode = (void *)pVgroup->next;
......@@ -395,7 +386,7 @@ int mgmtRetrieveVgroups(SShowObj *pShow, char *data, int rows, SConnObj *pConn)
strcpy(pWrite, taosGetVgroupLbStatusStr(pVgroup->lbStatus));
cols++;
for (int i = 0; i < maxReplica; ++i) {
for (int32_t i = 0; i < maxReplica; ++i) {
tinet_ntoa(ipstr, pVgroup->vnodeGid[i].ip);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, ipstr);
......@@ -427,13 +418,13 @@ int mgmtRetrieveVgroups(SShowObj *pShow, char *data, int rows, SConnObj *pConn)
return numOfRows;
}
void *mgmtVgroupActionInsert(void *row, char *str, int size, int *ssize) {
void *mgmtVgroupActionInsert(void *row, char *str, int32_t size, int32_t *ssize) {
SVgObj *pVgroup = (SVgObj *)row;
SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
if (pDb == NULL) return NULL;
int tsize = sizeof(STabObj *) * pDb->cfg.maxSessions;
int32_t tsize = sizeof(STabObj *) * pDb->cfg.maxSessions;
pVgroup->meterList = (STabObj **)malloc(tsize);
memset(pVgroup->meterList, 0, tsize);
pVgroup->numOfMeters = 0;
......@@ -444,7 +435,7 @@ void *mgmtVgroupActionInsert(void *row, char *str, int size, int *ssize) {
return NULL;
}
void *mgmtVgroupActionDelete(void *row, char *str, int size, int *ssize) {
void *mgmtVgroupActionDelete(void *row, char *str, int32_t size, int32_t *ssize) {
SVgObj *pVgroup = (SVgObj *)row;
SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
......@@ -455,17 +446,17 @@ void *mgmtVgroupActionDelete(void *row, char *str, int size, int *ssize) {
return NULL;
}
void *mgmtVgroupActionUpdate(void *row, char *str, int size, int *ssize) {
void *mgmtVgroupActionUpdate(void *row, char *str, int32_t size, int32_t *ssize) {
mgmtVgroupActionReset(row, str, size, ssize);
SVgObj *pVgroup = (SVgObj *)row;
int oldTables = taosIdPoolMaxSize(pVgroup->idPool);
int32_t oldTables = taosIdPoolMaxSize(pVgroup->idPool);
SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
if (pDb != NULL) {
if (pDb->cfg.maxSessions != oldTables) {
mPrint("vgroup:%d tables change from %d to %d", pVgroup->vgId, oldTables, pDb->cfg.maxSessions);
taosUpdateIdPool(pVgroup->idPool, pDb->cfg.maxSessions);
int size = sizeof(STabObj *) * pDb->cfg.maxSessions;
int32_t size = sizeof(STabObj *) * pDb->cfg.maxSessions;
pVgroup->meterList = (STabObj **)realloc(pVgroup->meterList, size);
}
}
......@@ -474,9 +465,9 @@ void *mgmtVgroupActionUpdate(void *row, char *str, int size, int *ssize) {
return NULL;
}
void *mgmtVgroupActionEncode(void *row, char *str, int size, int *ssize) {
void *mgmtVgroupActionEncode(void *row, char *str, int32_t size, int32_t *ssize) {
SVgObj *pVgroup = (SVgObj *)row;
int tsize = pVgroup->updateEnd - (int8_t *)pVgroup;
int32_t tsize = pVgroup->updateEnd - (int8_t *)pVgroup;
if (size < tsize) {
*ssize = -1;
} else {
......@@ -486,28 +477,28 @@ void *mgmtVgroupActionEncode(void *row, char *str, int size, int *ssize) {
return NULL;
}
void *mgmtVgroupActionDecode(void *row, char *str, int size, int *ssize) {
void *mgmtVgroupActionDecode(void *row, char *str, int32_t size, int32_t *ssize) {
SVgObj *pVgroup = (SVgObj *)malloc(sizeof(SVgObj));
if (pVgroup == NULL) return NULL;
memset(pVgroup, 0, sizeof(SVgObj));
int tsize = pVgroup->updateEnd - (int8_t *)pVgroup;
int32_t tsize = pVgroup->updateEnd - (int8_t *)pVgroup;
memcpy(pVgroup, str, tsize);
return (void *)pVgroup;
}
void *mgmtVgroupActionBeforeBatchUpdate(void *row, char *str, int size, int *ssize) { return NULL; }
void *mgmtVgroupActionBatchUpdate(void *row, char *str, int size, int *ssize) { return NULL; }
void *mgmtVgroupActionAfterBatchUpdate(void *row, char *str, int size, int *ssize) { return NULL; }
void *mgmtVgroupActionReset(void *row, char *str, int size, int *ssize) {
void *mgmtVgroupActionBeforeBatchUpdate(void *row, char *str, int32_t size, int32_t *ssize) { return NULL; }
void *mgmtVgroupActionBatchUpdate(void *row, char *str, int32_t size, int32_t *ssize) { return NULL; }
void *mgmtVgroupActionAfterBatchUpdate(void *row, char *str, int32_t size, int32_t *ssize) { return NULL; }
void *mgmtVgroupActionReset(void *row, char *str, int32_t size, int32_t *ssize) {
SVgObj *pVgroup = (SVgObj *)row;
int tsize = pVgroup->updateEnd - (int8_t *)pVgroup;
int32_t tsize = pVgroup->updateEnd - (int8_t *)pVgroup;
memcpy(pVgroup, str, tsize);
return NULL;
}
void *mgmtVgroupActionDestroy(void *row, char *str, int size, int *ssize) {
void *mgmtVgroupActionDestroy(void *row, char *str, int32_t size, int32_t *ssize) {
SVgObj *pVgroup = (SVgObj *)row;
if (pVgroup->idPool) {
taosIdPoolCleanUp(pVgroup->idPool);
......
......@@ -20,55 +20,37 @@
extern "C" {
#endif
#include <stdint.h>
#include <stdbool.h>
#include <pthread.h>
#include <semaphore.h>
#include "os.h"
typedef struct _msg_header {
int mid; /* message ID */
int cid; /* call ID */
int tid; /* transaction ID */
// int len; /* length of msg */
char *msg; /* content holder */
} msg_header_t, msg_t;
enum _module {
TSDB_MOD_MGMT,
TSDB_MOD_HTTP,
TSDB_MOD_MONITOR,
TSDB_MOD_DCLUSTER,
TSDB_MOD_MSTORAGE,
TSDB_MOD_MAX
};
typedef struct {
char * name; /* module name */
pthread_t thread; /* thread ID */
tsem_t emptySem;
tsem_t fullSem;
int fullSlot;
int emptySlot;
int debugFlag;
int queueSize;
int msgSize;
pthread_mutex_t queueMutex;
pthread_mutex_t stmMutex;
msg_t * queue;
int (*processMsg)(msg_t *);
int (*init)();
void (*cleanUp)();
} module_t;
#define tsetModuleStatus(mod) \
{ tsModuleStatus |= (1 << mod); }
#define tclearModuleStatus(mod) \
{ tsModuleStatus &= ~(1 << mod); }
typedef struct {
short len;
unsigned char data[0];
} sim_data_t;
extern int maxCid;
extern module_t moduleObj[];
extern char * msgName[];
extern int taosSendMsgToModule(module_t *mod_p, int cid, int mid, int tid, char *msg);
extern char *taosDisplayModuleStatus(int moduleNum);
extern int taosInitModule(module_t *);
extern void taosCleanUpModule(module_t *);
char *name;
int (*initFp)();
void (*cleanUpFp)();
int (*startFp)();
void (*stopFp)();
int num;
int curNum;
int equalVnodeNum;
} SModule;
extern uint32_t tsModuleStatus;
extern SModule tsModule[];
#ifdef __cplusplus
}
......
......@@ -13,170 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#define _DEFAULT_SOURCE
#include "tmodule.h"
#include "tutil.h"
void *taosProcessQueue(void *param);
char *taosDisplayModuleStatus(int moduleNum) {
static char status[256];
int i;
status[0] = 0;
for (i = 1; i < moduleNum; ++i)
if (taosCheckPthreadValid(moduleObj[i].thread)) sprintf(status + strlen(status), "%s ", moduleObj[i].name);
if (status[0] == 0)
sprintf(status, "all module is down");
else
sprintf(status, " is(are) up");
return status;
}
int taosInitModule(module_t *pMod) {
pthread_attr_t attr;
if (pthread_mutex_init(&pMod->queueMutex, NULL) < 0) {
printf("ERROR: init %s queueMutex failed, reason:%s\n", pMod->name, strerror(errno));
taosCleanUpModule(pMod);
return -1;
}
if (pthread_mutex_init(&pMod->stmMutex, NULL) < 0) {
printf("ERROR: init %s stmMutex failed, reason:%s\n", pMod->name, strerror(errno));
taosCleanUpModule(pMod);
return -1;
}
if (tsem_init(&pMod->emptySem, 0, (unsigned int)pMod->queueSize) != 0) {
printf("ERROR: init %s empty semaphore failed, reason:%s\n", pMod->name, strerror(errno));
taosCleanUpModule(pMod);
return -1;
}
if (tsem_init(&pMod->fullSem, 0, 0) != 0) {
printf("ERROR: init %s full semaphore failed, reason:%s\n", pMod->name, strerror(errno));
taosCleanUpModule(pMod);
return -1;
}
if ((pMod->queue = (msg_t *)malloc((size_t)pMod->queueSize * sizeof(msg_t))) == NULL) {
printf("ERROR: %s no enough memory, reason:%s\n", pMod->name, strerror(errno));
taosCleanUpModule(pMod);
return -1;
}
memset(pMod->queue, 0, (size_t)pMod->queueSize * sizeof(msg_t));
pMod->fullSlot = 0;
pMod->emptySlot = 0;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pMod->thread, &attr, taosProcessQueue, (void *)pMod) != 0) {
printf("ERROR: %s failed to create thread, reason:%s\n", pMod->name, strerror(errno));
taosCleanUpModule(pMod);
return -1;
}
if (pMod->init) return (*(pMod->init))();
return 0;
}
void *taosProcessQueue(void *param) {
msg_t msg;
module_t *pMod = (module_t *)param;
int oldType;
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &oldType);
signal(SIGINT, SIG_IGN);
while (1) {
if (tsem_wait(&pMod->fullSem) != 0)
printf("ERROR: wait %s fullSem failed, reason:%s\n", pMod->name, strerror(errno));
if (pthread_mutex_lock(&pMod->queueMutex) != 0)
printf("ERROR: lock %s queueMutex failed, reason:%s\n", pMod->name, strerror(errno));
msg = pMod->queue[pMod->fullSlot];
memset(&(pMod->queue[pMod->fullSlot]), 0, sizeof(msg_t));
pMod->fullSlot = (pMod->fullSlot + 1) % pMod->queueSize;
if (pthread_mutex_unlock(&pMod->queueMutex) != 0)
printf("ERROR: unlock %s queueMutex failed, reason:%s\n", pMod->name, strerror(errno));
if (tsem_post(&pMod->emptySem) != 0)
printf("ERROR: post %s emptySem failed, reason:%s\n", pMod->name, strerror(errno));
/* process the message */
if (msg.cid < 0 || msg.cid >= maxCid) {
/*printf("ERROR: cid:%d is out of range, msg is discarded\n", msg.cid);*/
continue;
}
/*
if ( pthread_mutex_lock ( &(pMod->stmMutex)) != 0 )
printf("ERROR: lock %s stmMutex failed, reason:%s\n", pMod->name,
strerror(errno));
*/
(*(pMod->processMsg))(&msg);
tfree(msg.msg);
/*
if ( pthread_mutex_unlock ( &(pMod->stmMutex)) != 0 )
printf("ERROR: unlock %s stmMutex failed, reason:%s\n", pMod->name,
strerror(errno));
*/
}
}
int taosSendMsgToModule(module_t *pMod, int cid, int mid, int tid, char *msg) {
if (tsem_wait(&pMod->emptySem) != 0)
printf("ERROR: wait %s emptySem failed, reason:%s\n", pMod->name, strerror(errno));
if (pthread_mutex_lock(&pMod->queueMutex) != 0)
printf("ERROR: lock %s queueMutex failed, reason:%s\n", pMod->name, strerror(errno));
pMod->queue[pMod->emptySlot].cid = cid;
pMod->queue[pMod->emptySlot].mid = mid;
pMod->queue[pMod->emptySlot].tid = tid;
pMod->queue[pMod->emptySlot].msg = msg;
pMod->emptySlot = (pMod->emptySlot + 1) % pMod->queueSize;
if (pthread_mutex_unlock(&pMod->queueMutex) != 0)
printf("ERROR: unlock %s queueMutex failed, reason:%s\n", pMod->name, strerror(errno));
if (tsem_post(&pMod->fullSem) != 0) printf("ERROR: post %s fullSem failed, reason:%s\n", pMod->name, strerror(errno));
return 0;
}
void taosCleanUpModule(module_t *pMod) {
int i;
if (pMod->cleanUp) pMod->cleanUp();
if (taosCheckPthreadValid(pMod->thread)) {
pthread_cancel(pMod->thread);
pthread_join(pMod->thread, NULL);
}
taosResetPthread(&pMod->thread);
tsem_destroy(&pMod->emptySem);
tsem_destroy(&pMod->fullSem);
pthread_mutex_destroy(&pMod->queueMutex);
pthread_mutex_destroy(&pMod->stmMutex);
for (i = 0; i < pMod->queueSize; ++i) {
tfree(pMod->queue[i].msg);
}
tfree(pMod->queue);
memset(pMod, 0, sizeof(module_t));
}
SModule tsModule[TSDB_MOD_MAX] = {0};
uint32_t tsModuleStatus = 0;
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册