提交 d90df175 编写于 作者: S slguan

failed to drop vnode in dnodeMgmt

上级 6639ef6e
...@@ -226,20 +226,18 @@ static void dnodeCheckDataDirOpenned(char *dir) { ...@@ -226,20 +226,18 @@ static void dnodeCheckDataDirOpenned(char *dir) {
static int32_t dnodeInitStorage() { static int32_t dnodeInitStorage() {
struct stat dirstat; struct stat dirstat;
strcpy(tsDirectory, dataDir);
if (stat(dataDir, &dirstat) < 0) { if (stat(dataDir, &dirstat) < 0) {
mkdir(dataDir, 0755); mkdir(dataDir, 0755);
} }
char fileName[128]; sprintf(tsMnodeDir, "%s/mnode", dataDir);
sprintf(fileName, "%s/tsdb", tsDirectory); sprintf(tsVnodeDir, "%s/vnode", dataDir);
mkdir(fileName, 0755); sprintf(tsDnodeDir, "%s/dnode", dataDir);
sprintf(fileName, "%s/data", tsDirectory); mkdir(tsMnodeDir, 0755);
mkdir(fileName, 0755); mkdir(tsVnodeDir, 0755);
sprintf(tsMgmtDirectory, "%s/mgmt", tsDirectory); mkdir(tsDnodeDir, 0755);
sprintf(tsDirectory, "%s/tsdb", dataDir);
dnodeCheckDataDirOpenned(dataDir); dnodeCheckDataDirOpenned(tsDnodeDir);
dPrint("storage directory is initialized"); dPrint("storage directory is initialized");
return 0; return 0;
......
...@@ -29,6 +29,7 @@ ...@@ -29,6 +29,7 @@
typedef struct { typedef struct {
int32_t vgId; // global vnode group ID int32_t vgId; // global vnode group ID
int32_t vnode;
int32_t status; // status: master, slave, notready, deleting int32_t status; // status: master, slave, notready, deleting
int32_t refCount; // reference count int32_t refCount; // reference count
int64_t version; int64_t version;
...@@ -43,7 +44,7 @@ typedef struct { ...@@ -43,7 +44,7 @@ typedef struct {
static int32_t dnodeOpenVnodes(); static int32_t dnodeOpenVnodes();
static void dnodeCleanupVnodes(); static void dnodeCleanupVnodes();
static int32_t dnodeOpenVnode(int32_t vgId); static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir);
static void dnodeCleanupVnode(SVnodeObj *pVnode); static void dnodeCleanupVnode(SVnodeObj *pVnode);
static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *cfg); static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *cfg);
static void dnodeDropVnode(SVnodeObj *pVnode); static void dnodeDropVnode(SVnodeObj *pVnode);
...@@ -79,7 +80,21 @@ int32_t dnodeInitMgmt() { ...@@ -79,7 +80,21 @@ int32_t dnodeInitMgmt() {
} }
taosTmrReset(dnodeSendStatusMsg, 500, NULL, tsDnodeTmr, &tsStatusTimer); taosTmrReset(dnodeSendStatusMsg, 500, NULL, tsDnodeTmr, &tsStatusTimer);
return dnodeOpenVnodes(); SMDCreateVnodeMsg cfg;
cfg.cfg.vgId = 1;
cfg.cfg.precision = 0;
cfg.vnode = 1;
cfg.cfg.maxSessions = 1000;
cfg.cfg.daysPerFile = 10;
dnodeCreateVnode(&cfg);
SVnodeObj *pVnode = dnodeGetVnode(cfg.cfg.vgId);
dnodeDropVnode(pVnode);
dnodeOpenVnodes();
dnodeCleanupVnodes();
//return dnodeOpenVnodes();
} }
void dnodeCleanupMgmt() { void dnodeCleanupMgmt() {
...@@ -149,28 +164,55 @@ void dnodeReleaseVnode(void *pVnode) { ...@@ -149,28 +164,55 @@ void dnodeReleaseVnode(void *pVnode) {
} }
static int32_t dnodeOpenVnodes() { static int32_t dnodeOpenVnodes() {
dPrint("open all vnodes"); DIR *dir = opendir(tsVnodeDir);
if (dir == NULL) {
return TSDB_CODE_NO_WRITE_ACCESS;
}
int32_t numOfVnodes = 0;
struct dirent *de = NULL;
while ((de = readdir(dir)) != NULL) {
if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0) continue;
if (de->d_type & DT_DIR) {
if (strncmp("vnode", de->d_name, 5) != 0) continue;
int32_t vnode = atoi(de->d_name + 5);
if (vnode == 0) continue;
char tsdbDir[TSDB_FILENAME_LEN];
sprintf(tsdbDir, "%s/%s", tsVnodeDir, de->d_name);
int32_t code = dnodeOpenVnode(vnode, tsdbDir);
if (code == 0) {
numOfVnodes++;
}
}
}
closedir(dir);
dPrint("all vnodes is opened, num:%d", numOfVnodes);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void dnodeCleanupVnodes() { static void dnodeCleanupVnodes() {
dPrint("clean all vnodes"); int32_t num = taosGetIntHashSize(tsDnodeVnodesHash);
taosCleanUpIntHashWithFp(tsDnodeVnodesHash, dnodeCleanupVnode);
dPrint("all vnodes is opened, num:%d", num);
} }
static int32_t dnodeOpenVnode(int32_t vgId) { static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir) {
char rootDir[TSDB_FILENAME_LEN] = {0};
sprintf(rootDir, "%s/vnode%d", tsDirectory, vgId);
void *pTsdb = tsdbOpenRepo(rootDir); void *pTsdb = tsdbOpenRepo(rootDir);
if (pTsdb != NULL) { if (pTsdb == NULL) {
dError("failed to open vnode:%d in dir:%s, reason:%s", vnode, rootDir, tstrerror(terrno));
return terrno; return terrno;
} }
//STsdbRepoInfo *tsdbInfo = tsdbGetStatus(pTsdb);
SVnodeObj vnodeObj; SVnodeObj vnodeObj;
vnodeObj.vgId = vgId; vnodeObj.vgId = vnode; //tsdbInfo->tsdbCfg.vgId;
vnodeObj.vnode = vnode; //tsdbInfo->tsdbCfg.tsdbId;
vnodeObj.status = TSDB_VN_STATUS_NOT_READY; vnodeObj.status = TSDB_VN_STATUS_NOT_READY;
vnodeObj.refCount = 1; vnodeObj.refCount = 1;
vnodeObj.version = 0; vnodeObj.version = version;
vnodeObj.wworker = dnodeAllocateWriteWorker(); vnodeObj.wworker = dnodeAllocateWriteWorker();
vnodeObj.rworker = dnodeAllocateReadWorker(); vnodeObj.rworker = dnodeAllocateReadWorker();
vnodeObj.wal = NULL; vnodeObj.wal = NULL;
...@@ -181,6 +223,7 @@ static int32_t dnodeOpenVnode(int32_t vgId) { ...@@ -181,6 +223,7 @@ static int32_t dnodeOpenVnode(int32_t vgId) {
taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *) (&vnodeObj)); taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *) (&vnodeObj));
dTrace("open vnode:%d in %s", vnodeObj.vnode, rootDir);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -209,11 +252,12 @@ static void dnodeCleanupVnode(SVnodeObj *pVnode) { ...@@ -209,11 +252,12 @@ static void dnodeCleanupVnode(SVnodeObj *pVnode) {
pVnode->tsdb = NULL; pVnode->tsdb = NULL;
} }
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); dTrace("cleanup vnode:%d", pVnode->vnode);
} }
static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) { static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) {
STsdbCfg tsdbCfg; STsdbCfg tsdbCfg;
tsdbCfg.vgId = pVnodeCfg->cfg.vgId;
tsdbCfg.precision = pVnodeCfg->cfg.precision; tsdbCfg.precision = pVnodeCfg->cfg.precision;
tsdbCfg.tsdbId = pVnodeCfg->vnode; tsdbCfg.tsdbId = pVnodeCfg->vnode;
tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions; tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions;
...@@ -224,15 +268,16 @@ static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -224,15 +268,16 @@ static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) {
tsdbCfg.maxCacheSize = -1; tsdbCfg.maxCacheSize = -1;
char rootDir[TSDB_FILENAME_LEN] = {0}; char rootDir[TSDB_FILENAME_LEN] = {0};
sprintf(rootDir, "%s/vnode%d", tsDirectory, pVnodeCfg->cfg.vgId); sprintf(rootDir, "%s/vnode%d", tsVnodeDir, pVnodeCfg->cfg.vgId);
void *pTsdb = tsdbCreateRepo(rootDir, &tsdbCfg, NULL); void *pTsdb = tsdbCreateRepo(rootDir, &tsdbCfg, NULL);
if (pTsdb != NULL) { if (pTsdb == NULL) {
return terrno; return terrno;
} }
SVnodeObj vnodeObj; SVnodeObj vnodeObj;
vnodeObj.vgId = pVnodeCfg->cfg.vgId; vnodeObj.vgId = pVnodeCfg->cfg.vgId;
vnodeObj.vnode = pVnodeCfg->vnode;
vnodeObj.status = TSDB_VN_STATUS_NOT_READY; vnodeObj.status = TSDB_VN_STATUS_NOT_READY;
vnodeObj.refCount = 1; vnodeObj.refCount = 1;
vnodeObj.version = 0; vnodeObj.version = 0;
...@@ -263,6 +308,7 @@ static void dnodeDropVnode(SVnodeObj *pVnode) { ...@@ -263,6 +308,7 @@ static void dnodeDropVnode(SVnodeObj *pVnode) {
} }
dnodeCleanupVnode(pVnode); dnodeCleanupVnode(pVnode);
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
} }
static void dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { static void dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
...@@ -274,14 +320,13 @@ static void dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { ...@@ -274,14 +320,13 @@ static void dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions); pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions);
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile); pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile);
// SVnodeObj *pVnodeObj = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pCreate->cfg.vgId); SVnodeObj *pVnodeObj = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pCreate->cfg.vgId);
// if (pVnodeObj != NULL) { if (pVnodeObj != NULL) {
// rpcRsp.code = TSDB_CODE_SUCCESS; rpcRsp.code = TSDB_CODE_SUCCESS;
// } else { } else {
// rpcRsp.code = dnodeCreateVnode(pCreate); rpcRsp.code = dnodeCreateVnode(pCreate);
// } }
rpcRsp.code = TSDB_CODE_SUCCESS; rpcRsp.code = TSDB_CODE_SUCCESS;
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
...@@ -339,7 +384,6 @@ static void dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) { ...@@ -339,7 +384,6 @@ static void dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
// dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); // dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
} }
static void dnodeSendStatusMsg(void *handle, void *tmrId) { static void dnodeSendStatusMsg(void *handle, void *tmrId) {
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
if (tsStatusTimer == NULL) { if (tsStatusTimer == NULL) {
......
...@@ -60,11 +60,13 @@ int32_t dnodeInitRead() { ...@@ -60,11 +60,13 @@ int32_t dnodeInitRead() {
maxThreads = tsNumOfCores*tsNumOfThreadsPerCore; maxThreads = tsNumOfCores*tsNumOfThreadsPerCore;
if (maxThreads <= minThreads*2) maxThreads = 2*minThreads; if (maxThreads <= minThreads*2) maxThreads = 2*minThreads;
dPrint("dnode read is opened");
return 0; return 0;
} }
void dnodeCleanupRead() { void dnodeCleanupRead() {
taosCloseQset(readQset); taosCloseQset(readQset);
dPrint("dnode read is closed");
} }
void dnodeRead(SRpcMsg *pMsg) { void dnodeRead(SRpcMsg *pMsg) {
......
...@@ -76,11 +76,13 @@ int32_t dnodeInitWrite() { ...@@ -76,11 +76,13 @@ int32_t dnodeInitWrite() {
wWorkerPool.writeWorker[i].workerId = i; wWorkerPool.writeWorker[i].workerId = i;
} }
dPrint("dnode write is opened");
return 0; return 0;
} }
void dnodeCleanupWrite() { void dnodeCleanupWrite() {
free(wWorkerPool.writeWorker); free(wWorkerPool.writeWorker);
dPrint("dnode write is closed");
} }
void dnodeWrite(SRpcMsg *pMsg) { void dnodeWrite(SRpcMsg *pMsg) {
...@@ -145,11 +147,16 @@ void dnodeWrite(SRpcMsg *pMsg) { ...@@ -145,11 +147,16 @@ void dnodeWrite(SRpcMsg *pMsg) {
void *dnodeAllocateWriteWorker() { void *dnodeAllocateWriteWorker() {
SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId; SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId;
taos_queue *queue = taosOpenQueue(sizeof(SWriteMsg));
if (queue != NULL) return queue;
if (pWorker->qset == NULL) { if (pWorker->qset == NULL) {
pWorker->qset = taosOpenQset(); pWorker->qset = taosOpenQset();
if (pWorker->qset == NULL) return NULL; if (pWorker->qset == NULL) return NULL;
taosAddIntoQset(pWorker->qset, queue);
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
pthread_attr_t thAttr; pthread_attr_t thAttr;
pthread_attr_init(&thAttr); pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
...@@ -158,14 +165,11 @@ void *dnodeAllocateWriteWorker() { ...@@ -158,14 +165,11 @@ void *dnodeAllocateWriteWorker() {
dError("failed to create thread to process read queue, reason:%s", strerror(errno)); dError("failed to create thread to process read queue, reason:%s", strerror(errno));
taosCloseQset(pWorker->qset); taosCloseQset(pWorker->qset);
} }
} } else {
taos_queue *queue = taosOpenQueue(sizeof(SWriteMsg));
if (queue) {
taosAddIntoQset(pWorker->qset, queue); taosAddIntoQset(pWorker->qset, queue);
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max; wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
} }
return queue; return queue;
} }
......
...@@ -263,7 +263,7 @@ void mgmtStopSystem(); ...@@ -263,7 +263,7 @@ void mgmtStopSystem();
extern char version[]; extern char version[];
extern void *tsMgmtTmr; extern void *tsMgmtTmr;
extern char tsMgmtDirectory[]; extern char tsMnodeDir[];
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -198,7 +198,7 @@ int32_t mgmtInitChildTables() { ...@@ -198,7 +198,7 @@ int32_t mgmtInitChildTables() {
tsChildTableUpdateSize = tObj.updateEnd - (int8_t *)&tObj; tsChildTableUpdateSize = tObj.updateEnd - (int8_t *)&tObj;
tsChildTableSdb = sdbOpenTable(tsMaxTables, tsChildTableUpdateSize, tsChildTableSdb = sdbOpenTable(tsMaxTables, tsChildTableUpdateSize,
"ctables", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtChildTableAction); "ctables", SDB_KEYTYPE_STRING, tsMnodeDir, mgmtChildTableAction);
if (tsChildTableSdb == NULL) { if (tsChildTableSdb == NULL) {
mError("failed to init child table data"); mError("failed to init child table data");
return -1; return -1;
......
...@@ -81,7 +81,7 @@ int32_t mgmtInitDbs() { ...@@ -81,7 +81,7 @@ int32_t mgmtInitDbs() {
SDbObj tObj; SDbObj tObj;
tsDbUpdateSize = tObj.updateEnd - (char *)&tObj; tsDbUpdateSize = tObj.updateEnd - (char *)&tObj;
tsDbSdb = sdbOpenTable(tsMaxDbs, tsDbUpdateSize, "dbs", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtDbAction); tsDbSdb = sdbOpenTable(tsMaxDbs, tsDbUpdateSize, "dbs", SDB_KEYTYPE_STRING, tsMnodeDir, mgmtDbAction);
if (tsDbSdb == NULL) { if (tsDbSdb == NULL) {
mError("failed to init db data"); mError("failed to init db data");
return -1; return -1;
......
...@@ -31,7 +31,6 @@ ...@@ -31,7 +31,6 @@
#include "mgmtShell.h" #include "mgmtShell.h"
static int32_t mgmtCheckMgmtRunning(); static int32_t mgmtCheckMgmtRunning();
char tsMgmtDirectory[128] = {0};
void *tsMgmtTmr = NULL; void *tsMgmtTmr = NULL;
int32_t mgmtInitSystem() { int32_t mgmtInitSystem() {
...@@ -41,7 +40,7 @@ int32_t mgmtInitSystem() { ...@@ -41,7 +40,7 @@ int32_t mgmtInitSystem() {
} }
struct stat dirstat; struct stat dirstat;
bool fileExist = (stat(tsMgmtDirectory, &dirstat) == 0); bool fileExist = (stat(tsMnodeDir, &dirstat) == 0);
bool asMaster = (strcmp(tsMasterIp, tsPrivateIp) == 0); bool asMaster = (strcmp(tsMasterIp, tsPrivateIp) == 0);
if (asMaster || fileExist) { if (asMaster || fileExist) {
...@@ -57,8 +56,8 @@ int32_t mgmtStartSystem() { ...@@ -57,8 +56,8 @@ int32_t mgmtStartSystem() {
mPrint("starting to initialize TDengine mgmt ..."); mPrint("starting to initialize TDengine mgmt ...");
struct stat dirstat; struct stat dirstat;
if (stat(tsMgmtDirectory, &dirstat) < 0) { if (stat(tsMnodeDir, &dirstat) < 0) {
mkdir(tsMgmtDirectory, 0755); mkdir(tsMnodeDir, 0755);
} }
if (mgmtCheckMgmtRunning() != 0) { if (mgmtCheckMgmtRunning() != 0) {
...@@ -110,7 +109,7 @@ int32_t mgmtStartSystem() { ...@@ -110,7 +109,7 @@ int32_t mgmtStartSystem() {
return -1; return -1;
} }
if (sdbInitPeers(tsMgmtDirectory) < 0) { if (sdbInitPeers(tsMnodeDir) < 0) {
mError("failed to init peers"); mError("failed to init peers");
return -1; return -1;
} }
...@@ -132,7 +131,7 @@ void mgmtStopSystem() { ...@@ -132,7 +131,7 @@ void mgmtStopSystem() {
} }
mgmtCleanUpSystem(); mgmtCleanUpSystem();
remove(tsMgmtDirectory); remove(tsMnodeDir);
} }
void mgmtCleanUpSystem() { void mgmtCleanUpSystem() {
......
...@@ -224,7 +224,7 @@ int32_t mgmtInitNormalTables() { ...@@ -224,7 +224,7 @@ int32_t mgmtInitNormalTables() {
tsNormalTableUpdateSize = tObj.updateEnd - (int8_t *)&tObj; tsNormalTableUpdateSize = tObj.updateEnd - (int8_t *)&tObj;
tsNormalTableSdb = sdbOpenTable(tsMaxTables, sizeof(SNormalTableObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS, tsNormalTableSdb = sdbOpenTable(tsMaxTables, sizeof(SNormalTableObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS,
"ntables", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtNormalTableAction); "ntables", SDB_KEYTYPE_STRING, tsMnodeDir, mgmtNormalTableAction);
if (tsNormalTableSdb == NULL) { if (tsNormalTableSdb == NULL) {
mError("failed to init ntables data"); mError("failed to init ntables data");
return -1; return -1;
......
...@@ -165,7 +165,7 @@ int32_t mgmtInitSuperTables() { ...@@ -165,7 +165,7 @@ int32_t mgmtInitSuperTables() {
mgmtSuperTableActionInit(); mgmtSuperTableActionInit();
tsSuperTableSdb = sdbOpenTable(tsMaxTables, tsSuperTableUpdateSize + sizeof(SSchema) * TSDB_MAX_COLUMNS, tsSuperTableSdb = sdbOpenTable(tsMaxTables, tsSuperTableUpdateSize + sizeof(SSchema) * TSDB_MAX_COLUMNS,
"stables", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtSuperTableAction); "stables", SDB_KEYTYPE_STRING, tsMnodeDir, mgmtSuperTableAction);
if (tsSuperTableSdb == NULL) { if (tsSuperTableSdb == NULL) {
mError("failed to init stables data"); mError("failed to init stables data");
return -1; return -1;
......
...@@ -59,7 +59,7 @@ int32_t mgmtInitUsers() { ...@@ -59,7 +59,7 @@ int32_t mgmtInitUsers() {
SUserObj tObj; SUserObj tObj;
tsUserUpdateSize = tObj.updateEnd - (int8_t *)&tObj; tsUserUpdateSize = tObj.updateEnd - (int8_t *)&tObj;
tsUserSdb = sdbOpenTable(tsMaxUsers, tsUserUpdateSize, "users", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtUserAction); tsUserSdb = sdbOpenTable(tsMaxUsers, tsUserUpdateSize, "users", SDB_KEYTYPE_STRING, tsMnodeDir, mgmtUserAction);
if (tsUserSdb == NULL) { if (tsUserSdb == NULL) {
mError("failed to init user data"); mError("failed to init user data");
return -1; return -1;
......
...@@ -73,7 +73,7 @@ int32_t mgmtInitVgroups() { ...@@ -73,7 +73,7 @@ int32_t mgmtInitVgroups() {
mgmtVgroupActionInit(); mgmtVgroupActionInit();
tsVgroupSdb = sdbOpenTable(tsMaxVGroups, tsVgUpdateSize, "vgroups", SDB_KEYTYPE_AUTO, tsMgmtDirectory, mgmtVgroupAction); tsVgroupSdb = sdbOpenTable(tsMaxVGroups, tsVgUpdateSize, "vgroups", SDB_KEYTYPE_AUTO, tsMnodeDir, mgmtVgroupAction);
if (tsVgroupSdb == NULL) { if (tsVgroupSdb == NULL) {
mError("failed to init vgroups data"); mError("failed to init vgroups data");
return -1; return -1;
......
...@@ -34,7 +34,7 @@ ...@@ -34,7 +34,7 @@
#include "tutil.h" #include "tutil.h"
char configDir[TSDB_FILENAME_LEN] = "/etc/taos"; char configDir[TSDB_FILENAME_LEN] = "/etc/taos";
char tsDirectory[TSDB_FILENAME_LEN] = "/var/lib/taos"; char tsVnodeDir[TSDB_FILENAME_LEN] = "/var/lib/taos";
char dataDir[TSDB_FILENAME_LEN] = "/var/lib/taos"; char dataDir[TSDB_FILENAME_LEN] = "/var/lib/taos";
char logDir[TSDB_FILENAME_LEN] = "~/TDengineLog"; char logDir[TSDB_FILENAME_LEN] = "~/TDengineLog";
char scriptDir[TSDB_FILENAME_LEN] = "/etc/taos"; char scriptDir[TSDB_FILENAME_LEN] = "/etc/taos";
......
...@@ -35,7 +35,9 @@ ...@@ -35,7 +35,9 @@
#include "ttimer.h" #include "ttimer.h"
char configDir[TSDB_FILENAME_LEN] = "/etc/taos"; char configDir[TSDB_FILENAME_LEN] = "/etc/taos";
char tsDirectory[TSDB_FILENAME_LEN] = "/var/lib/taos"; char tsVnodeDir[TSDB_FILENAME_LEN] = {0};
char tsDnodeDir[TSDB_FILENAME_LEN] = {0};
char tsMnodeDir[TSDB_FILENAME_LEN] = {0};
char dataDir[TSDB_FILENAME_LEN] = "/var/lib/taos"; char dataDir[TSDB_FILENAME_LEN] = "/var/lib/taos";
char logDir[TSDB_FILENAME_LEN] = "/var/log/taos"; char logDir[TSDB_FILENAME_LEN] = "/var/log/taos";
char scriptDir[TSDB_FILENAME_LEN] = "/etc/taos"; char scriptDir[TSDB_FILENAME_LEN] = "/etc/taos";
......
...@@ -33,7 +33,7 @@ ...@@ -33,7 +33,7 @@
#include <Winsock2.h> #include <Winsock2.h>
char configDir[TSDB_FILENAME_LEN] = "C:/TDengine/cfg"; char configDir[TSDB_FILENAME_LEN] = "C:/TDengine/cfg";
char tsDirectory[TSDB_FILENAME_LEN] = "C:/TDengine/data"; char tsVnodeDir[TSDB_FILENAME_LEN] = "C:/TDengine/data";
char logDir[TSDB_FILENAME_LEN] = "C:/TDengine/log"; char logDir[TSDB_FILENAME_LEN] = "C:/TDengine/log";
char dataDir[TSDB_FILENAME_LEN] = "C:/TDengine/data"; char dataDir[TSDB_FILENAME_LEN] = "C:/TDengine/data";
char scriptDir[TSDB_FILENAME_LEN] = "C:/TDengine/script"; char scriptDir[TSDB_FILENAME_LEN] = "C:/TDengine/script";
......
...@@ -34,6 +34,12 @@ char *taosAddIntHash(void *handle, uint64_t key, char *pData); ...@@ -34,6 +34,12 @@ char *taosAddIntHash(void *handle, uint64_t key, char *pData);
int32_t taosHashInt(void *handle, uint64_t key); int32_t taosHashInt(void *handle, uint64_t key);
void taosCleanUpIntHashWithFp(void *handle, void (*fp)(char *));
char *taosVisitIntHashWithFp(void *handle, int (*fp)(char *));
int32_t taosGetIntHashSize(void *handle);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -50,7 +50,9 @@ extern int tscEmbedded; ...@@ -50,7 +50,9 @@ extern int tscEmbedded;
extern int64_t tsMsPerDay[2]; extern int64_t tsMsPerDay[2];
extern char configDir[]; extern char configDir[];
extern char tsDirectory[]; extern char tsVnodeDir[];
extern char tsDnodeDir[];
extern char tsMnodeDir[];
extern char dataDir[]; extern char dataDir[];
extern char logDir[]; extern char logDir[];
extern char scriptDir[]; extern char scriptDir[];
...@@ -263,9 +265,6 @@ SGlobalConfig *tsGetConfigOption(const char *option); ...@@ -263,9 +265,6 @@ SGlobalConfig *tsGetConfigOption(const char *option);
#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
extern char tsMgmtDirectory[];
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -26,7 +26,7 @@ typedef struct { ...@@ -26,7 +26,7 @@ typedef struct {
IHashNode **hashList; IHashNode **hashList;
int32_t maxSessions; int32_t maxSessions;
int32_t dataSize; int32_t dataSize;
int32_t (*hashFp)(void *, uint64_t key); int32_t (*hashFp)(void *, uint64_t key);
pthread_mutex_t mutex; pthread_mutex_t mutex;
} IHashObj; } IHashObj;
...@@ -186,3 +186,93 @@ void taosCleanUpIntHash(void *handle) { ...@@ -186,3 +186,93 @@ void taosCleanUpIntHash(void *handle) {
memset(pObj, 0, sizeof(IHashObj)); memset(pObj, 0, sizeof(IHashObj));
free(pObj); free(pObj);
} }
void taosCleanUpIntHashWithFp(void *handle, void (*fp)(char *)) {
IHashObj * pObj;
IHashNode *pNode, *pNext;
pObj = (IHashObj *)handle;
if (pObj == NULL || pObj->maxSessions <= 0) return;
pthread_mutex_lock(&pObj->mutex);
if (pObj->hashList) {
for (int i = 0; i < pObj->maxSessions; ++i) {
pNode = pObj->hashList[i];
while (pNode) {
pNext = pNode->next;
if (fp != NULL) fp(pNode->data);
free(pNode);
pNode = pNext;
}
}
free(pObj->hashList);
}
pthread_mutex_unlock(&pObj->mutex);
pthread_mutex_destroy(&pObj->mutex);
memset(pObj, 0, sizeof(IHashObj));
free(pObj);
}
char *taosVisitIntHashWithFp(void *handle, int (*fp)(char *)) {
IHashObj * pObj;
IHashNode *pNode, *pNext;
char * pData = NULL;
pObj = (IHashObj *)handle;
if (pObj == NULL || pObj->maxSessions <= 0) return NULL;
pthread_mutex_lock(&pObj->mutex);
if (pObj->hashList) {
for (int i = 0; i < pObj->maxSessions; ++i) {
pNode = pObj->hashList[i];
while (pNode) {
pNext = pNode->next;
int flag = fp(pNode->data);
if (flag) {
pData = pNode->data;
goto VisitEnd;
}
pNode = pNext;
}
}
}
VisitEnd:
pthread_mutex_unlock(&pObj->mutex);
return pData;
}
int32_t taosGetIntHashSize(void *handle) {
IHashObj * pObj;
IHashNode *pNode, *pNext;
char * pData = NULL;
int32_t num = 0;
pObj = (IHashObj *)handle;
if (pObj == NULL || pObj->maxSessions <= 0) return NULL;
pthread_mutex_lock(&pObj->mutex);
if (pObj->hashList) {
for (int i = 0; i < pObj->maxSessions; ++i) {
pNode = pObj->hashList[i];
while (pNode) {
pNext = pNode->next;
num++;
pNode = pNext;
}
}
}
pthread_mutex_unlock(&pObj->mutex);
return num;
}
\ No newline at end of file
...@@ -33,7 +33,7 @@ typedef struct { ...@@ -33,7 +33,7 @@ typedef struct {
SHashNode **hashList; SHashNode **hashList;
uint32_t maxSessions; uint32_t maxSessions;
uint32_t dataSize; uint32_t dataSize;
uint32_t (*hashFp)(void *, char *string); uint32_t (*hashFp)(void *, char *string);
pthread_mutex_t mutex; pthread_mutex_t mutex;
} SHashObj; } SHashObj;
......
...@@ -58,6 +58,7 @@ enum { TSDB_PRECISION_MILLI, TSDB_PRECISION_MICRO, TSDB_PRECISION_NANO }; ...@@ -58,6 +58,7 @@ enum { TSDB_PRECISION_MILLI, TSDB_PRECISION_MICRO, TSDB_PRECISION_NANO };
// the TSDB repository configuration // the TSDB repository configuration
typedef struct { typedef struct {
int8_t precision; int8_t precision;
int32_t vgId;
int32_t tsdbId; int32_t tsdbId;
int32_t maxTables; // maximum number of tables this repository can have int32_t maxTables; // maximum number of tables this repository can have
int32_t daysPerFile; // day per file sharding policy int32_t daysPerFile; // day per file sharding policy
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册