提交 18c871a9 编写于 作者: S Shengliang Guan

TD-10431 add global variables in sdb

上级 35b0c693
...@@ -126,6 +126,10 @@ typedef enum { ...@@ -126,6 +126,10 @@ typedef enum {
SDB_MAX = 12 SDB_MAX = 12
} ESdbType; } ESdbType;
typedef struct SSdbOpt {
const char *path;
} SSdbOpt;
typedef int32_t (*SdbInsertFp)(void *pObj); typedef int32_t (*SdbInsertFp)(void *pObj);
typedef int32_t (*SdbUpdateFp)(void *pSrcObj, void *pDstObj); typedef int32_t (*SdbUpdateFp)(void *pSrcObj, void *pDstObj);
typedef int32_t (*SdbDeleteFp)(void *pObj); typedef int32_t (*SdbDeleteFp)(void *pObj);
...@@ -146,12 +150,12 @@ typedef struct { ...@@ -146,12 +150,12 @@ typedef struct {
typedef struct SSdb SSdb; typedef struct SSdb SSdb;
int32_t sdbInit(); SSdb *sdbOpen(SSdbOpt *pOption);
void sdbCleanup(); void sdbClose(SSdb *pSdb);
void sdbSetTable(SSdbTable table); void sdbSetTable(SSdb *pSdb, SSdbTable table);
int32_t sdbOpen(); // int32_t sdbOpen();
void sdbClose(); // void sdbClose();
int32_t sdbWrite(SSdbRaw *pRaw); int32_t sdbWrite(SSdbRaw *pRaw);
int32_t sdbDeploy(); int32_t sdbDeploy();
......
...@@ -43,6 +43,7 @@ typedef struct SMnode { ...@@ -43,6 +43,7 @@ typedef struct SMnode {
tmr_h timer; tmr_h timer;
SSdb *pSdb; SSdb *pSdb;
SDnode *pDnode; SDnode *pDnode;
char *path;
SArray steps; SArray steps;
MndMsgFp msgFp[TSDB_MSG_TYPE_MAX]; MndMsgFp msgFp[TSDB_MSG_TYPE_MAX];
SendMsgToDnodeFp sendMsgToDnodeFp; SendMsgToDnodeFp sendMsgToDnodeFp;
......
...@@ -110,8 +110,8 @@ int32_t mndInitAcct(SMnode *pMnode) { ...@@ -110,8 +110,8 @@ int32_t mndInitAcct(SMnode *pMnode) {
.insertFp = (SdbInsertFp)mnodeAcctActionInsert, .insertFp = (SdbInsertFp)mnodeAcctActionInsert,
.updateFp = (SdbUpdateFp)mnodeAcctActionUpdate, .updateFp = (SdbUpdateFp)mnodeAcctActionUpdate,
.deleteFp = (SdbDeleteFp)mnodeAcctActionDelete}; .deleteFp = (SdbDeleteFp)mnodeAcctActionDelete};
sdbSetTable(table);
sdbSetTable(pMnode->pSdb, table);
return 0; return 0;
} }
......
...@@ -320,7 +320,7 @@ int32_t mndInitTrans(SMnode *pMnode) { ...@@ -320,7 +320,7 @@ int32_t mndInitTrans(SMnode *pMnode) {
.insertFp = (SdbInsertFp)trnActionInsert, .insertFp = (SdbInsertFp)trnActionInsert,
.updateFp = (SdbUpdateFp)trnActionUpdate, .updateFp = (SdbUpdateFp)trnActionUpdate,
.deleteFp = (SdbDeleteFp)trnActionDelete}; .deleteFp = (SdbDeleteFp)trnActionDelete};
sdbSetTable(table); sdbSetTable(pMnode->pSdb, table);
mInfo("trn module is initialized"); mInfo("trn module is initialized");
return 0; return 0;
......
...@@ -223,7 +223,7 @@ int32_t mndInitUser(SMnode *pMnode) { ...@@ -223,7 +223,7 @@ int32_t mndInitUser(SMnode *pMnode) {
.insertFp = (SdbInsertFp)mndUserActionInsert, .insertFp = (SdbInsertFp)mndUserActionInsert,
.updateFp = (SdbUpdateFp)mndUserActionUpdate, .updateFp = (SdbUpdateFp)mndUserActionUpdate,
.deleteFp = (SdbDeleteFp)mndUserActionDelete}; .deleteFp = (SdbDeleteFp)mndUserActionDelete};
sdbSetTable(table); sdbSetTable(pMnode->pSdb, table);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_USER, mndProcessCreateUserMsg); mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_USER, mndProcessCreateUserMsg);
......
...@@ -46,6 +46,12 @@ int64_t mndGetClusterId(SMnode *pMnode) { ...@@ -46,6 +46,12 @@ int64_t mndGetClusterId(SMnode *pMnode) {
return -1; return -1;
} }
tmr_h mndGetTimer(SMnode *pMnode) {
if (pMnode != NULL) {
return pMnode->timer;
}
}
void mndSendMsgToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *pMsg) { void mndSendMsgToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *pMsg) {
if (pMnode != NULL && pMnode->sendMsgToDnodeFp != NULL) { if (pMnode != NULL && pMnode->sendMsgToDnodeFp != NULL) {
(*pMnode->sendMsgToDnodeFp)(pMnode->pDnode, pEpSet, pMsg); (*pMnode->sendMsgToDnodeFp)(pMnode->pDnode, pEpSet, pMsg);
...@@ -83,33 +89,42 @@ static void mndCleanupTimer(SMnode *pMnode) { ...@@ -83,33 +89,42 @@ static void mndCleanupTimer(SMnode *pMnode) {
} }
} }
tmr_h mndGetTimer(SMnode *pMnode) { static int32_t mnodeCreateDir(SMnode *pMnode, const char *path) {
if (pMnode != NULL) { pMnode->path = strdup(path);
return pMnode->timer; if (pMnode->path == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
if (taosMkDir(pMnode->path) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return terrno;
} }
return 0;
} }
static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { static int32_t mndInitSdb(SMnode *pMnode) {
pMnode->dnodeId = pOption->dnodeId; SSdbOpt opt = {0};
pMnode->clusterId = pOption->clusterId; opt.path = pMnode->path;
pMnode->replica = pOption->replica;
pMnode->selfIndex = pOption->selfIndex;
memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
pMnode->pDnode = pOption->pDnode;
pMnode->putMsgToApplyMsgFp = pOption->putMsgToApplyMsgFp;
pMnode->sendMsgToDnodeFp = pOption->sendMsgToDnodeFp;
pMnode->sendMsgToMnodeFp = pOption->sendMsgToMnodeFp;
pMnode->sendRedirectMsgFp = pOption->sendRedirectMsgFp;
if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL || pMnode->pSdb = sdbOpen(&opt);
pMnode->putMsgToApplyMsgFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0) { if (pMnode->pSdb == NULL) {
terrno = TSDB_CODE_MND_APP_ERROR;
return -1; return -1;
} }
return 0; return 0;
} }
static int32_t mndDeploySdb(SMnode *pMnode) { return sdbDeploy(pMnode->pSdb); }
static void mndCleanupSdb(SMnode *pMnode) {
if (pMnode->pSdb) {
sdbClose(pMnode->pSdb);
pMnode->pSdb = NULL;
}
}
static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) { static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) {
SMnodeStep step = {0}; SMnodeStep step = {0};
step.name = name; step.name = name;
...@@ -125,33 +140,28 @@ static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCle ...@@ -125,33 +140,28 @@ static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCle
} }
static int32_t mndInitSteps(SMnode *pMnode) { static int32_t mndInitSteps(SMnode *pMnode) {
if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1; if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return terrno;
if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1; if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return terrno;
if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1; if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return terrno;
if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1; if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return terrno;
if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1; if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return terrno;
if (mndAllocStep(pMnode, "mnode-auth", mndInitAuth, mndCleanupAuth) != 0) return -1; if (mndAllocStep(pMnode, "mnode-auth", mndInitAuth, mndCleanupAuth) != 0) return terrno;
if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1; if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return terrno;
if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1; if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return terrno;
if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1; if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return terrno;
if (mndAllocStep(pMnode, "mnode-stable", mndInitStable, mndCleanupStable) != 0) return -1; if (mndAllocStep(pMnode, "mnode-stable", mndInitStable, mndCleanupStable) != 0) return terrno;
if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1; if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return terrno;
if (mndAllocStep(pMnode, "mnode-sdb", sdbInit, sdbCleanup) != 0) return -1; if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return terrno;
if (pMnode->clusterId <= 0) {
if (pMnode->replica == 1) { if (mndAllocStep(pMnode, "mnode-deploy", mndDeploySdb, NULL) != 0) return terrno;
if (mndAllocStep(pMnode, "mnode-deploy-sdb", sdbDeploy, sdbClose) != 0) return -1; }
} else { if (mndAllocStep(pMnode, "mnode-timer", mndInitTimer, NULL) != 0) return terrno;
if (mndAllocStep(pMnode, "mnode-open-sdb", sdbOpen, sdbClose) != 0) return -1; if (mndAllocStep(pMnode, "mnode-balance", mndInitBalance, mndCleanupBalance) != 0) return terrno;
} if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return terrno;
if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return terrno;
if (mndAllocStep(pMnode, "mnode-timer", mndInitTimer, NULL) != 0) return -1; if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return terrno;
if (mndAllocStep(pMnode, "mnode-sdb-file", sdbOpen, sdbClose) != 0) return -1; if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return terrno;
if (mndAllocStep(pMnode, "mnode-balance", mndInitBalance, mndCleanupBalance) != 0) return -1; if (mndAllocStep(pMnode, "mnode-timer", NULL, mndCleanupTimer) != 0) return terrno;
if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-timer", NULL, mndCleanupTimer) != 0) return -1;
return 0; return 0;
} }
...@@ -194,10 +204,39 @@ static int32_t mndExecSteps(SMnode *pMnode) { ...@@ -194,10 +204,39 @@ static int32_t mndExecSteps(SMnode *pMnode) {
} }
} }
static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
pMnode->dnodeId = pOption->dnodeId;
pMnode->clusterId = pOption->clusterId;
pMnode->replica = pOption->replica;
pMnode->selfIndex = pOption->selfIndex;
memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
pMnode->pDnode = pOption->pDnode;
pMnode->putMsgToApplyMsgFp = pOption->putMsgToApplyMsgFp;
pMnode->sendMsgToDnodeFp = pOption->sendMsgToDnodeFp;
pMnode->sendMsgToMnodeFp = pOption->sendMsgToMnodeFp;
pMnode->sendRedirectMsgFp = pOption->sendRedirectMsgFp;
if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL ||
pMnode->putMsgToApplyMsgFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0) {
terrno = TSDB_CODE_MND_APP_ERROR;
return terrno;
}
return 0;
}
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
SMnode *pMnode = calloc(1, sizeof(SMnode)); SMnode *pMnode = calloc(1, sizeof(SMnode));
int32_t code = mndSetOptions(pMnode, pOption); int32_t code = mnodeCreateDir(pMnode, path);
if (code != 0) {
mError("failed to set mnode options since %s", terrstr());
mndClose(pMnode);
terrno = code;
return NULL;
}
code = mndSetOptions(pMnode, pOption);
if (code != 0) { if (code != 0) {
mndClose(pMnode); mndClose(pMnode);
terrno = code; terrno = code;
...@@ -227,7 +266,8 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { ...@@ -227,7 +266,8 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
void mndClose(SMnode *pMnode) { void mndClose(SMnode *pMnode) {
mndCleanupSteps(pMnode, -1); mndCleanupSteps(pMnode, -1);
free(pMnode); tfree(pMnode->path);
tfree(pMnode);
mDebug("mnode:%p object is cleaned up", pMnode); mDebug("mnode:%p object is cleaned up", pMnode);
} }
......
...@@ -19,27 +19,31 @@ ...@@ -19,27 +19,31 @@
SSdb tsSdb = {0}; SSdb tsSdb = {0};
int32_t sdbInit() { SSdb *sdbOpen(SSdbOpt *pOption) {
char path[PATH_MAX + 100]; SSdb *pSdb = calloc(1, sizeof(SSdb));
if (pSdb == NULL) {
snprintf(path, PATH_MAX + 100, "%s%scur%s", tsMnodeDir, TD_DIRSEP, TD_DIRSEP); terrno = TSDB_CODE_OUT_OF_MEMORY;
tsSdb.currDir = strdup(path); return NULL;
}
snprintf(path, PATH_MAX + 100, "%s%ssync%s", tsMnodeDir, TD_DIRSEP, TD_DIRSEP);
tsSdb.syncDir = strdup(path);
snprintf(path, PATH_MAX + 100, "%s%stmp%s", tsMnodeDir, TD_DIRSEP, TD_DIRSEP);
tsSdb.tmpDir = strdup(path);
if (tsSdb.currDir == NULL || tsSdb.currDir == NULL || tsSdb.currDir == NULL) { char path[PATH_MAX + 100];
return TSDB_CODE_OUT_OF_MEMORY; snprintf(path, PATH_MAX + 100, "%s%scur%s", pOption->path, TD_DIRSEP, TD_DIRSEP);
pSdb->currDir = strdup(path);
snprintf(path, PATH_MAX + 100, "%s%ssync%s", pOption->path, TD_DIRSEP, TD_DIRSEP);
pSdb->syncDir = strdup(path);
snprintf(path, PATH_MAX + 100, "%s%stmp%s", pOption->path, TD_DIRSEP, TD_DIRSEP);
pSdb->tmpDir = strdup(path);
if (pSdb->currDir == NULL || pSdb->currDir == NULL || pSdb->currDir == NULL) {
sdbClose(pSdb);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
} }
for (int32_t i = 0; i < SDB_MAX; ++i) { for (int32_t i = 0; i < SDB_MAX; ++i) {
int32_t type; int32_t type;
if (tsSdb.keyTypes[i] == SDB_KEY_INT32) { if (pSdb->keyTypes[i] == SDB_KEY_INT32) {
type = TSDB_DATA_TYPE_INT; type = TSDB_DATA_TYPE_INT;
} else if (tsSdb.keyTypes[i] == SDB_KEY_INT64) { } else if (pSdb->keyTypes[i] == SDB_KEY_INT64) {
type = TSDB_DATA_TYPE_BIGINT; type = TSDB_DATA_TYPE_BIGINT;
} else { } else {
type = TSDB_DATA_TYPE_BINARY; type = TSDB_DATA_TYPE_BINARY;
...@@ -47,45 +51,47 @@ int32_t sdbInit() { ...@@ -47,45 +51,47 @@ int32_t sdbInit() {
SHashObj *hash = taosHashInit(64, taosGetDefaultHashFunction(type), true, HASH_NO_LOCK); SHashObj *hash = taosHashInit(64, taosGetDefaultHashFunction(type), true, HASH_NO_LOCK);
if (hash == NULL) { if (hash == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; sdbClose(pSdb);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
} }
tsSdb.hashObjs[i] = hash; pSdb->hashObjs[i] = hash;
taosInitRWLatch(&tsSdb.locks[i]); taosInitRWLatch(&pSdb->locks[i]);
} }
return 0; return 0;
} }
void sdbCleanup() { void sdbClose(SSdb *pSdb) {
if (tsSdb.currDir != NULL) { if (pSdb->currDir != NULL) {
tfree(tsSdb.currDir); tfree(pSdb->currDir);
} }
if (tsSdb.syncDir != NULL) { if (pSdb->syncDir != NULL) {
tfree(tsSdb.syncDir); tfree(pSdb->syncDir);
} }
if (tsSdb.tmpDir != NULL) { if (pSdb->tmpDir != NULL) {
tfree(tsSdb.tmpDir); tfree(pSdb->tmpDir);
} }
for (int32_t i = 0; i < SDB_MAX; ++i) { for (int32_t i = 0; i < SDB_MAX; ++i) {
SHashObj *hash = tsSdb.hashObjs[i]; SHashObj *hash = pSdb->hashObjs[i];
if (hash != NULL) { if (hash != NULL) {
taosHashCleanup(hash); taosHashCleanup(hash);
} }
tsSdb.hashObjs[i] = NULL; pSdb->hashObjs[i] = NULL;
} }
} }
void sdbSetTable(SSdbTable table) { void sdbSetTable(SSdb *pSdb, SSdbTable table) {
ESdbType sdb = table.sdbType; ESdbType sdb = table.sdbType;
tsSdb.keyTypes[sdb] = table.keyType; pSdb->keyTypes[sdb] = table.keyType;
tsSdb.insertFps[sdb] = table.insertFp; pSdb->insertFps[sdb] = table.insertFp;
tsSdb.updateFps[sdb] = table.updateFp; pSdb->updateFps[sdb] = table.updateFp;
tsSdb.deleteFps[sdb] = table.deleteFp; pSdb->deleteFps[sdb] = table.deleteFp;
tsSdb.deployFps[sdb] = table.deployFp; pSdb->deployFps[sdb] = table.deployFp;
tsSdb.encodeFps[sdb] = table.encodeFp; pSdb->encodeFps[sdb] = table.encodeFp;
tsSdb.decodeFps[sdb] = table.decodeFp; pSdb->decodeFps[sdb] = table.decodeFp;
} }
\ No newline at end of file
...@@ -212,29 +212,29 @@ static int32_t sdbWriteDataFile() { ...@@ -212,29 +212,29 @@ static int32_t sdbWriteDataFile() {
return code; return code;
} }
int32_t sdbOpen() { // int32_t sdbOpen() {
mDebug("start to read mnode file"); // mDebug("start to read mnode file");
if (sdbReadDataFile() != 0) { // if (sdbReadDataFile() != 0) {
return -1; // return -1;
} // }
return 0; // return 0;
} // }
void sdbClose() { // void sdbClose() {
if (tsSdb.curVer != tsSdb.lastCommitVer) { // if (tsSdb.curVer != tsSdb.lastCommitVer) {
mDebug("start to write mnode file"); // mDebug("start to write mnode file");
sdbWriteDataFile(); // sdbWriteDataFile();
} // }
for (int32_t i = 0; i < SDB_MAX; ++i) { // for (int32_t i = 0; i < SDB_MAX; ++i) {
SHashObj *hash = tsSdb.hashObjs[i]; // SHashObj *hash = tsSdb.hashObjs[i];
if (hash != NULL) { // if (hash != NULL) {
taosHashClear(hash); // taosHashClear(hash);
} // }
} // }
} // }
int32_t sdbDeploy() { int32_t sdbDeploy() {
if (sdbCreateDir() != 0) { if (sdbCreateDir() != 0) {
...@@ -249,7 +249,7 @@ int32_t sdbDeploy() { ...@@ -249,7 +249,7 @@ int32_t sdbDeploy() {
return -1; return -1;
} }
sdbClose(); // sdbClose();
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册