From 4271952deaf1d1c0ea3d4ec4ccb2e54352def555 Mon Sep 17 00:00:00 2001 From: slguan Date: Mon, 23 Mar 2020 15:52:56 +0800 Subject: [PATCH] [TD-15] refact the code to delete the database --- src/mnode/src/mgmtDb.c | 340 ++++++++++++++++++++++----------------- src/mnode/src/mgmtSdb.c | 197 +---------------------- src/mnode/src/mgmtUser.c | 13 +- 3 files changed, 209 insertions(+), 341 deletions(-) diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index e502d82a19..a4eeb88d91 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -15,11 +15,15 @@ #define _DEFAULT_SOURCE #include "os.h" - -#include "mgmtDb.h" +#include "taoserror.h" +#include "tstatus.h" +#include "tutil.h" +#include "name.h" +#include "mnode.h" #include "mgmtAcct.h" #include "mgmtBalance.h" #include "mgmtChildTable.h" +#include "mgmtDb.h" #include "mgmtDnode.h" #include "mgmtGrant.h" #include "mgmtShell.h" @@ -31,19 +35,13 @@ #include "mgmtTable.h" #include "mgmtUser.h" #include "mgmtVgroup.h" -#include "mnode.h" - -#include "taoserror.h" -#include "tstatus.h" -#include "tutil.h" -#include "name.h" static void *tsDbSdb = NULL; static int32_t tsDbUpdateSize; static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate); static void mgmtDropDb(void *handle, void *tmrId); -static void mgmtSetDbDirty(SDbObj *pDb); +static int32_t mgmtSetDbDirty(SDbObj *pDb); static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn); @@ -51,64 +49,73 @@ static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg); static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg); static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg); -static int32_t mgmtDbActionDestroy(void *pObj) { - tfree(pObj); +static int32_t mgmtDbActionDestroy(SSdbOperDesc *pOper) { + tfree(pOper->pObj); return TSDB_CODE_SUCCESS; } -static int32_t mgmtDbActionInsert(void *pObj) { - SDbObj *pDb = (SDbObj *) pObj; +static int32_t mgmtDbActionInsert(SSdbOperDesc *pOper) { + SDbObj *pDb = pOper->pObj; SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); pDb->pHead = NULL; pDb->pTail = NULL; + pDb->prev = NULL; + pDb->next = NULL; pDb->numOfVgroups = 0; pDb->numOfTables = 0; - mgmtAddDbIntoAcct(pAcct, pDb); + pDb->numOfSuperTables = 0; + + if (pAcct != NULL) { + mgmtAddDbIntoAcct(pAcct, pDb); + } + else { + mError("db:%s, acct:%s info not exist in sdb", pDb->name, pDb->cfg.acct); + return TSDB_CODE_INVALID_ACCT; + } return TSDB_CODE_SUCCESS; } -static int32_t mgmtDbActionDelete(void *pObj) { - SDbObj *pDb = (SDbObj *) pObj; +static int32_t mgmtDbActionDelete(SSdbOperDesc *pOper) { + SDbObj *pDb = pOper->pObj; SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); - mgmtRemoveDbFromAcct(pAcct, pDb); + mgmtRemoveDbFromAcct(pAcct, pDb); mgmtDropAllNormalTables(pDb); mgmtDropAllChildTables(pDb); mgmtDropAllSuperTables(pDb); - + mgmtDropAllVgroups(pDb); + return TSDB_CODE_SUCCESS; } -static int32_t mgmtDbActionUpdate(void *pObj) { +static int32_t mgmtDbActionUpdate(SSdbOperDesc *pOper) { return TSDB_CODE_SUCCESS; } -static int32_t mgmtDbActionEncode(void *pObj, void *pData, int32_t maxRowSize) { - SDbObj *pDb = (SDbObj *)pObj; - if (maxRowSize < tsDbUpdateSize) { +static int32_t mgmtDbActionEncode(SSdbOperDesc *pOper) { + SDbObj *pDb = pOper->pObj; + + if (pOper->maxRowSize < tsDbUpdateSize) { return -1; } else { - memcpy(pData, pDb, tsDbUpdateSize); - return tsDbUpdateSize; + memcpy(pOper->rowData, pDb, tsDbUpdateSize); + pOper->rowSize = tsDbUpdateSize; + return TSDB_CODE_SUCCESS; } } -static void *mgmtDbActionDecode(void *pData) { - SDbObj *pDb = (SDbObj *) malloc(sizeof(SDbObj)); - if (pDb == NULL) return NULL; - memset(pDb, 0, sizeof(SDbObj)); - memcpy(pDb, pData, tsDbUpdateSize); - - return (void *)pDb; +static int32_t mgmtDbActionDecode(SSdbOperDesc *pOper) { + SDbObj *pDb = (SDbObj *) calloc(1, sizeof(SDbObj)); + if (pDb == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY; + + memcpy(pDb, pOper->rowData, tsDbUpdateSize); + pOper->pObj = pDb; + return TSDB_CODE_SUCCESS; } int32_t mgmtInitDbs() { - void * pNode = NULL; - SDbObj * pDb = NULL; - SAcctObj *pAcct = NULL; - SDbObj tObj; tsDbUpdateSize = tObj.updateEnd - (char *)&tObj; @@ -116,7 +123,7 @@ int32_t mgmtInitDbs() { .tableName = "dbs", .hashSessions = TSDB_MAX_DBS, .maxRowSize = tsDbUpdateSize, - .keyType = SDB_KEYTYPE_STRING, + .keyType = SDB_KEY_TYPE_STRING, .insertFp = mgmtDbActionInsert, .deleteFp = mgmtDbActionDelete, .updateFp = mgmtDbActionUpdate, @@ -131,25 +138,6 @@ int32_t mgmtInitDbs() { return -1; } - while (1) { - pNode = sdbFetchRow(tsDbSdb, pNode, (void **)&pDb); - if (pDb == NULL) break; - - pDb->pHead = NULL; - pDb->pTail = NULL; - pDb->prev = NULL; - pDb->next = NULL; - pDb->numOfTables = 0; - pDb->numOfVgroups = 0; - pDb->numOfSuperTables = 0; - pAcct = mgmtGetAcct(pDb->cfg.acct); - if (pAcct != NULL) - mgmtAddDbIntoAcct(pAcct, pDb); - else { - mError("db:%s acct:%s info not exist in sdb", pDb->name, pDb->cfg.acct); - } - } - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CREATE_DB, mgmtProcessCreateDbMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_ALTER_DB, mgmtProcessAlterDbMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_DROP_DB, mgmtProcessDropDbMsg); @@ -250,14 +238,14 @@ static int32_t mgmtCheckDBParams(SCMCreateDbMsg *pCreate) { static int32_t mgmtCheckDbParams(SCMCreateDbMsg *pCreate) { // assign default parameters - if (pCreate->maxSessions < 0) pCreate->maxSessions = tsSessionsPerVnode; // - if (pCreate->cacheBlockSize < 0) pCreate->cacheBlockSize = tsCacheBlockSize; // - if (pCreate->daysPerFile < 0) pCreate->daysPerFile = tsDaysPerFile; // - if (pCreate->daysToKeep < 0) pCreate->daysToKeep = tsDaysToKeep; // - if (pCreate->daysToKeep1 < 0) pCreate->daysToKeep1 = pCreate->daysToKeep; // - if (pCreate->daysToKeep2 < 0) pCreate->daysToKeep2 = pCreate->daysToKeep; // - if (pCreate->commitTime < 0) pCreate->commitTime = tsCommitTime; // - if (pCreate->compression < 0) pCreate->compression = tsCompression; // + if (pCreate->maxSessions < 0) pCreate->maxSessions = tsSessionsPerVnode; + if (pCreate->cacheBlockSize < 0) pCreate->cacheBlockSize = tsCacheBlockSize; + if (pCreate->daysPerFile < 0) pCreate->daysPerFile = tsDaysPerFile; + if (pCreate->daysToKeep < 0) pCreate->daysToKeep = tsDaysToKeep; + if (pCreate->daysToKeep1 < 0) pCreate->daysToKeep1 = pCreate->daysToKeep; + if (pCreate->daysToKeep2 < 0) pCreate->daysToKeep2 = pCreate->daysToKeep; + if (pCreate->commitTime < 0) pCreate->commitTime = tsCommitTime; + if (pCreate->compression < 0) pCreate->compression = tsCompression; if (pCreate->commitLog < 0) pCreate->commitLog = tsCommitLog; if (pCreate->replications < 0) pCreate->replications = tsReplications; // if (pCreate->rowsInFileBlock < 0) pCreate->rowsInFileBlock = tsRowsInFileBlock; // @@ -321,9 +309,17 @@ static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) { pDb->createdTime = taosGetTimestampMs(); pDb->cfg = *pCreate; - if (sdbInsertRow(tsDbSdb, pDb, SDB_OPER_GLOBAL) < 0) { - code = TSDB_CODE_SDB_ERROR; + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_GLOBAL, + .table = tsDbSdb, + .pObj = pDb, + .rowSize = sizeof(SDbObj) + }; + + code = sdbInsertRow(&oper); + if (code != TSDB_CODE_SUCCESS) { tfree(pDb); + code = TSDB_CODE_SDB_ERROR; } return code; @@ -337,72 +333,6 @@ bool mgmtCheckIsMonitorDB(char *db, char *monitordb) { return (strncasecmp(dbName, monitordb, len) == 0 && len == strlen(monitordb)); } -static int32_t mgmtAlterDb(SAcctObj *pAcct, SCMAlterDbMsg *pAlter) { - return 0; -// int32_t code = TSDB_CODE_SUCCESS; -// -// SDbObj *pDb = (SDbObj *) sdbGetRow(tsDbSdb, pAlter->db); -// if (pDb == NULL) { -// mTrace("db:%s is not exist", pAlter->db); -// return TSDB_CODE_INVALID_DB; -// } -// -// int32_t oldReplicaNum = pDb->cfg.replications; -// if (pAlter->daysToKeep > 0) { -// mTrace("db:%s daysToKeep:%d change to %d", pDb->name, pDb->cfg.daysToKeep, pAlter->daysToKeep); -// pDb->cfg.daysToKeep = pAlter->daysToKeep; -// } else if (pAlter->replications > 0) { -// mTrace("db:%s replica:%d change to %d", pDb->name, pDb->cfg.replications, pAlter->replications); -// if (pAlter->replications < TSDB_REPLICA_MIN_NUM || pAlter->replications > TSDB_REPLICA_MAX_NUM) { -// mError("invalid db option replica: %d valid range: %d--%d", pAlter->replications, TSDB_REPLICA_MIN_NUM, TSDB_REPLICA_MAX_NUM); -// return TSDB_CODE_INVALID_OPTION; -// } -// pDb->cfg.replications = pAlter->replications; -// } else if (pAlter->maxSessions > 0) { -// mTrace("db:%s tables:%d change to %d", pDb->name, pDb->cfg.maxSessions, pAlter->maxSessions); -// if (pAlter->maxSessions < TSDB_MIN_TABLES_PER_VNODE || pAlter->maxSessions > TSDB_MAX_TABLES_PER_VNODE) { -// mError("invalid db option tables: %d valid range: %d--%d", pAlter->maxSessions, TSDB_MIN_TABLES_PER_VNODE, TSDB_MAX_TABLES_PER_VNODE); -// return TSDB_CODE_INVALID_OPTION; -// } -// if (pAlter->maxSessions < pDb->cfg.maxSessions) { -// mError("invalid db option tables: %d should larger than original:%d", pAlter->maxSessions, pDb->cfg.maxSessions); -// return TSDB_CODE_INVALID_OPTION; -// } -// return TSDB_CODE_INVALID_OPTION; -// //The modification of tables needs to rewrite the head file, so disable this option -// //pDb->cfg.maxSessions = pAlter->maxSessions; -// } else { -// mError("db:%s alter msg, replica:%d, keep:%d, tables:%d, origin replica:%d keep:%d", pDb->name, -// pAlter->replications, pAlter->maxSessions, pAlter->daysToKeep, -// pDb->cfg.replications, pDb->cfg.daysToKeep); -// return TSDB_CODE_INVALID_OPTION; -// } -// -// if (sdbUpdateRow(tsDbSdb, pDb, tsDbUpdateSize, 1) < 0) { -// return TSDB_CODE_SDB_ERROR; -// } -// -// SVgObj *pVgroup = pDb->pHead; -// while (pVgroup != NULL) { -// balanceUpdateVgroupState(pVgroup, TSDB_VG_LB_STATUS_UPDATE, 0); -// if (oldReplicaNum < pDb->cfg.replications) { -// if (!balanceAddVnode(pVgroup, NULL, NULL)) { -// mWarn("db:%s vgroup:%d not enough dnode to add vnode", pAlter->db, pVgroup->vgId); -// code = TSDB_CODE_NO_ENOUGH_DNODES; -// } -// } -// if (pAlter->maxSessions > 0) { -// //rebuild meterList in mgmtVgroup.c -// mgmtUpdateVgroup(pVgroup); -// } -//// mgmtSendCreateVnodeMsg(pVgroup); -// pVgroup = pVgroup->next; -// } -// mgmtStartBalanceTimer(10); -// -// return code; -} - int32_t mgmtAddVgroupIntoDb(SDbObj *pDb, SVgObj *pVgroup) { pVgroup->next = pDb->pHead; pVgroup->prev = NULL; @@ -472,7 +402,7 @@ static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "created_time"); + strcpy(pSchema[cols].name, "create time"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; @@ -600,7 +530,7 @@ static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) return 0; } -char *mgmtGetDbStr(char *src) { +static char *mgmtGetDbStr(char *src) { char *pos = strstr(src, TS_PATH_DELIMITER); return ++pos; @@ -740,8 +670,23 @@ void mgmtRemoveTableFromDb(SDbObj *pDb) { atomic_add_fetch_32(&pDb->numOfTables, -1); } -static void mgmtSetDbDirty(SDbObj *pDb) { +static int32_t mgmtSetDbDirty(SDbObj *pDb) { + if (pDb->dirty) return TSDB_CODE_SUCCESS; + pDb->dirty = true; + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_GLOBAL, + .table = tsDbSdb, + .pObj = pDb, + .rowSize = tsDbUpdateSize + }; + + int32_t code = sdbUpdateRow(&oper); + if (code != TSDB_CODE_SUCCESS) { + return TSDB_CODE_SDB_ERROR; + } + + return code; } static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg) { @@ -766,32 +711,121 @@ static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg) { } else { code = mgmtCreateDb(pMsg->pUser->pAcct, pCreate); if (code == TSDB_CODE_SUCCESS) { - mLPrint("DB:%s is created by %s", pCreate->db, pMsg->pUser->user); + mLPrint("db:%s, is created by %s", pCreate->db, pMsg->pUser->user); } } mgmtSendSimpleResp(pMsg->thandle, code); } +static SDbCfg mgmtGetAlterDbOption(SDbObj *pDb, SCMAlterDbMsg *pAlter) { + SDbCfg newCfg = pDb->cfg; + int32_t daysToKeep = htonl(pAlter->daysToKeep); + int32_t maxSessions = htonl(pAlter->maxSessions) + 1; + int8_t replications = pAlter->replications; + + terrno = TSDB_CODE_SUCCESS; + + if (daysToKeep > 0 && daysToKeep != pDb->cfg.daysToKeep) { + mTrace("db:%s, daysToKeep:%d change to %d", pDb->name, pDb->cfg.daysToKeep, daysToKeep); + newCfg.daysToKeep = daysToKeep; + } else if (replications > 0 && replications != pDb->cfg.replications) { + mTrace("db:%s, replica:%d change to %d", pDb->name, pDb->cfg.replications, replications); + if (replications < TSDB_REPLICA_MIN_NUM || replications > TSDB_REPLICA_MAX_NUM) { + mError("invalid db option replica: %d valid range: %d--%d", replications, TSDB_REPLICA_MIN_NUM, TSDB_REPLICA_MAX_NUM); + terrno = TSDB_CODE_INVALID_OPTION; + } + newCfg.replications = replications; + } else if (maxSessions > 0 && maxSessions != pDb->cfg.maxSessions) { + mTrace("db:%s, tables:%d change to %d", pDb->name, pDb->cfg.maxSessions, maxSessions); + if (maxSessions < TSDB_MIN_TABLES_PER_VNODE || maxSessions > TSDB_MAX_TABLES_PER_VNODE) { + mError("invalid db option tables: %d valid range: %d--%d", maxSessions, TSDB_MIN_TABLES_PER_VNODE, TSDB_MAX_TABLES_PER_VNODE); + terrno = TSDB_CODE_INVALID_OPTION; + } + if (maxSessions < pDb->cfg.maxSessions) { + mError("invalid db option tables: %d should larger than original:%d", maxSessions, pDb->cfg.maxSessions); + terrno = TSDB_CODE_INVALID_OPTION; + } + newCfg.maxSessions = maxSessions; + } else { + } + + return newCfg; +} + +static int32_t mgmtAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) { + SDbCfg newCfg = mgmtGetAlterDbOption(pDb, pAlter); + if (terrno != TSDB_CODE_SUCCESS) { + return terrno; + } + + if (memcmp(&newCfg, &pDb->cfg, sizeof(SDbCfg)) != 0) { + pDb->cfg = newCfg; + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_GLOBAL, + .table = tsDbSdb, + .pObj = pDb, + .rowSize = tsDbUpdateSize + }; + + int32_t code = sdbUpdateRow(&oper); + if (code != TSDB_CODE_SUCCESS) { + return TSDB_CODE_SDB_ERROR; + } + } + + return TSDB_CODE_SUCCESS; +} + static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) { if (mgmtCheckRedirect(pMsg->thandle)) return; SCMAlterDbMsg *pAlter = pMsg->pCont; - pAlter->daysPerFile = htonl(pAlter->daysPerFile); - pAlter->daysToKeep = htonl(pAlter->daysToKeep); - pAlter->maxSessions = htonl(pAlter->maxSessions) + 1; + mTrace("db:%s, alter db msg is received from thandle:%p", pAlter->db, pMsg->thandle); + + if (mgmtCheckExpired()) { + mError("db:%s, failed to alter, grant expired", pAlter->db); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_GRANT_EXPIRED); + return; + } - int32_t code; if (!pMsg->pUser->writeAuth) { - code = TSDB_CODE_NO_RIGHTS; - } else { - code = mgmtAlterDb(pMsg->pUser->pAcct, pAlter); - if (code == TSDB_CODE_SUCCESS) { - mLPrint("DB:%s is altered by %s", pAlter->db, pMsg->pUser->user); - } + mError("db:%s, failed to alter, no rights", pAlter->db); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); + return; } - mgmtSendSimpleResp(pMsg->thandle, code); + SDbObj *pDb = mgmtGetDb(pAlter->db); + if (pDb == NULL) { + mError("db:%s, failed to alter, invalid db", pAlter->db); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_DB); + return; + } + + int32_t code = mgmtAlterDb(pDb, pAlter); + if (code != TSDB_CODE_SUCCESS) { + mError("db:%s, failed to alter, invalid db option", pAlter->db); + mgmtSendSimpleResp(pMsg->thandle, code); + } + + SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg)); + memcpy(newMsg, pMsg, sizeof(SQueuedMsg)); + pMsg->pCont = NULL; + + SVgObj *pVgroup = pDb->pHead; + if (pVgroup != NULL) { + mPrint("vgroup:%d, will be altered", pVgroup->vgId); + newMsg->ahandle = pVgroup; + newMsg->expected = pVgroup->numOfVnodes; + mgmtAlterVgroup(pVgroup, newMsg); + return; + } + + mTrace("db:%s, all vgroups is altered", pDb->name); + + mgmtSendSimpleResp(newMsg->thandle, TSDB_CODE_SUCCESS); + rpcFreeCont(newMsg->pCont); + free(newMsg); } static void mgmtDropDb(void *handle, void *tmrId) { @@ -799,7 +833,12 @@ static void mgmtDropDb(void *handle, void *tmrId) { SDbObj *pDb = newMsg->ahandle; mPrint("db:%s, drop db from sdb", pDb->name); - int32_t code = sdbDeleteRow(tsDbSdb, pDb, SDB_OPER_GLOBAL); + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_GLOBAL, + .table = tsDbSdb, + .pObj = pDb + }; + int32_t code = sdbDeleteRow(&oper); if (code != 0) { code = TSDB_CODE_SDB_ERROR; } @@ -846,7 +885,12 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) { return; } - mgmtSetDbDirty(pDb); + int32_t code = mgmtSetDbDirty(pDb); + if (code != TSDB_CODE_SUCCESS) { + mError("db:%s, failed to drop, reason:%s", pDrop->db, tstrerror(code)); + mgmtSendSimpleResp(pMsg->thandle, code); + return; + } SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg)); memcpy(newMsg, pMsg, sizeof(SQueuedMsg)); diff --git a/src/mnode/src/mgmtSdb.c b/src/mnode/src/mgmtSdb.c index ea05cb59f4..eee8c0f1a0 100644 --- a/src/mnode/src/mgmtSdb.c +++ b/src/mnode/src/mgmtSdb.c @@ -104,9 +104,6 @@ static void *(*sdbGetIndexFp[])(void *handle, void *key) = {sdbGetStrHashData, s static void (*sdbCleanUpIndexFp[])(void *handle) = {sdbCloseStrHash, sdbCloseIntHash}; static void *(*sdbFetchRowFp[])(void *handle, void *ptr, void **ppRow) = {sdbFetchStrHashData, sdbFetchIntHashData}; -void sdbResetTable(SSdbTable *pTable); -void sdbSaveSnapShot(void *handle); - uint64_t sdbGetVersion() { return sdbVersion; } int64_t sdbGetId(void *handle) { return ((SSdbTable *)handle)->version; } int64_t sdbGetNumOfRows(void *handle) { return ((SSdbTable *)handle)->numOfRows; } @@ -388,10 +385,6 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) { pTable->autoIndex = maxAutoIndex; } - if (numOfChanged > pTable->hashSessions / 4) { - sdbSaveSnapShot(pTable); - } - tfree(rowHead); return 0; } @@ -538,7 +531,7 @@ int32_t sdbInsertRow(SSdbOperDesc *pOper) { pthread_mutex_unlock(&pTable->mutex); - sdbTrace("table:%s, a record is inserted:%s, sdbversion:%" PRId64 " version:%" PRId64 " rowSize:%d numOfRows:%d fileSize:%" PRId64, + sdbTrace("table:%s, inserte record:%s, sdbversion:%" PRId64 " version:%" PRId64 " rowSize:%d numOfRows:%d fileSize:%" PRId64, pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), sdbVersion, pTable->version, pOper->rowSize, pTable->numOfRows, pTable->fileSize); (*pTable->insertFp)(pOper); @@ -578,7 +571,7 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) { } } - int32_t total_size = sizeof(SRowHead) + pOper->rowSize + sizeof(TSCKSUM); + int32_t total_size = sizeof(SRowHead) + pMeta->rowSize + sizeof(TSCKSUM); SRowHead *rowHead = (SRowHead *)calloc(1, total_size); if (rowHead == NULL) { sdbError("failed to allocate row head memory, sdb:%s", pTable->tableName); @@ -621,7 +614,7 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) { tfree(rowHead); - sdbTrace("table:%s, a record is deleted:%s, sdbversion:%" PRId64 " id:%" PRId64 " numOfRows:%d", + sdbTrace("table:%s, delete record:%s, sdbversion:%" PRId64 " version:%" PRId64 " numOfRows:%d", pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), sdbVersion, pTable->version, pTable->numOfRows); // Delete from current layer @@ -644,7 +637,7 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper) { SRowMeta *pMeta = sdbGetRowMeta(pTable, pOper->pObj); if (pMeta == NULL) { - sdbError("table:%s, failed to update record:%s, record is not there, sdbversion:%" PRId64 " id:%" PRId64, + sdbError("table:%s, failed to update record:%s, record is not there, sdbversion:%" PRId64 " version:%" PRId64, pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), sdbVersion, pTable->version); return -1; } @@ -697,7 +690,7 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper) { rowHead->delimiter = SDB_DELIMITER; rowHead->version = pTable->version; if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, real_size) < 0) { - sdbError("failed to get checksum, sdb:%s version:%d", pTable->tableName, rowHead->version); + sdbError("table:%s, failed to get checksum, version:%d", pTable->tableName, rowHead->version); pTable->version--; sdbVersion--; pthread_mutex_unlock(&pTable->mutex); @@ -709,7 +702,7 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper) { pTable->fileSize += real_size; sdbFinishCommit(pTable); - sdbTrace("table:%s, a record is updated:%s, sdbversion:%" PRId64 " id:%" PRId64 " numOfRows:%" PRId64, + sdbTrace("table:%s, update record:%s, sdbversion:%" PRId64 " version:%" PRId64 " numOfRows:%" PRId64, pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), sdbVersion, pTable->version, pTable->numOfRows); pMeta->version = pTable->version; @@ -750,187 +743,11 @@ void sdbCloseTable(void *handle) { pthread_mutex_destroy(&pTable->mutex); sdbNumOfTables--; - sdbTrace("table:%s is closed, id:%" PRId64 " numOfTables:%d", pTable->tableName, pTable->version, sdbNumOfTables); + sdbTrace("table:%s, is closed, version:%" PRId64 " numOfTables:%d", pTable->tableName, pTable->version, sdbNumOfTables); tfree(pTable); } -void sdbResetTable(SSdbTable *pTable) { - /* - SRowMeta rowMeta; - int32_t bytes; - int32_t total_size = 0; - int32_t real_size = 0; - SRowHead *rowHead = NULL; - void * pMetaRow = NULL; - int64_t oldId = pTable->version; - int32_t oldNumOfRows = pTable->numOfRows; - - if (sdbOpenSdbFile(pTable) < 0) return; - pTable->numOfRows = oldNumOfRows; - - total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM); - rowHead = (SRowHead *)malloc(total_size); - if (rowHead == NULL) { - sdbError("failed to allocate row head memory for reset, sdb:%s", pTable->tableName); - return; - } - - sdbPrint("open sdb file:%s for reset table", pTable->fileName); - - while (1) { - memset(rowHead, 0, total_size); - - bytes = read(pTable->fd, rowHead, sizeof(SRowHead)); - if (bytes < 0) { - sdbError("failed to read sdb file:%s", pTable->fileName); - tfree(rowHead); - return; - } - - if (bytes == 0) break; - - if (bytes < sizeof(SRowHead) || rowHead->delimiter != SDB_DELIMITER) { - pTable->fileSize++; - lseek(pTable->fd, -(bytes - 1), SEEK_CUR); - continue; - } - - if (rowHead->rowSize < 0 || rowHead->rowSize > pTable->maxRowSize) { - sdbError("error row size in sdb file:%s for reset, version:%d rowSize:%d maxRowSize:%d", - pTable->fileName, rowHead->version, rowHead->rowSize, pTable->maxRowSize); - pTable->fileSize += sizeof(SRowHead); - continue; - } - - bytes = read(pTable->fd, rowHead->data, rowHead->rowSize + sizeof(TSCKSUM)); - if (bytes < rowHead->rowSize + sizeof(TSCKSUM)) { - sdbError("failed to read sdb file:%s for reset, version:%d rowSize:%d", pTable->fileName, rowHead->version, rowHead->rowSize); - break; - } - - real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM); - if (!taosCheckChecksumWhole((uint8_t *)rowHead, real_size)) { - sdbError("error sdb checksum, sdb:%s version:%d, skip", pTable->tableName, rowHead->version); - pTable->fileSize += real_size; - continue; - } - - if (abs(rowHead->version) > oldId) { // not operated - pMetaRow = sdbGetRow(pTable, rowHead->data); - if (pMetaRow == NULL) { // New object - if (rowHead->version < 0) { - sdbError("error sdb negative version:%d, sdb:%s, skip", rowHead->version, pTable->tableName); - } else { - rowMeta.version = rowHead->version; - // TODO:Get rid of the rowMeta.offset and rowSize - rowMeta.offset = pTable->fileSize; - rowMeta.rowSize = rowHead->rowSize; - rowMeta.row = (*pTable->decodeFp)(rowHead->data); - (*sdbAddIndexFp[pTable->keyType])(pTable->iHandle, rowMeta.row, &rowMeta); - pTable->numOfRows++; - - (*pTable->insertFp)(rowMeta.row); - } - } else { // already exists - if (rowHead->version < 0) { // Delete the object - (*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, rowHead->data); - (*pTable->destroyFp)(pMetaRow); - pTable->numOfRows--; - } else { // update the object - (*pTable->updateFp)(pMetaRow); - } - } - } - - pTable->fileSize += real_size; - if (pTable->version < abs(rowHead->version)) pTable->version = abs(rowHead->version); - } - - sdbVersion += (pTable->version - oldId); - - tfree(rowHead); - - sdbPrint("table:%s is updated, sdbVerion:%" PRId64 " id:%" PRId64, pTable->tableName, sdbVersion, pTable->version); - */ -} - -// TODO:A problem here :use snapshot file to sync another node will cause problem -void sdbSaveSnapShot(void *handle) { - /* - SSdbTable *pTable = (SSdbTable *)handle; - SRowMeta * pMeta; - void * pNode = NULL; - int32_t total_size = 0; - int32_t real_size = 0; - int32_t size = 0; - int32_t numOfRows = 0; - uint32_t sdbEcommit = SDB_ENDCOMMIT; - char * dirc = NULL; - char * basec = NULL; - - if (pTable == NULL) return; - - sdbTrace("Table:%s, save the snapshop", pTable->tableName); - - char fn[128] = "\0"; - dirc = strdup(pTable->fileName); - basec = strdup(pTable->fileName); - sprintf(fn, "%s/.%s", dirname(dirc), basename(basec)); - int32_t fd = open(fn, O_RDWR | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); - tfree(dirc); - tfree(basec); - - total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM); - SRowHead *rowHead = (SRowHead *)malloc(total_size); - if (rowHead == NULL) { - sdbError("failed to allocate memory while saving SDB snapshot, sdb:%s", pTable->tableName); - return; - } - memset(rowHead, 0, size); - - // Write the header - twrite(fd, &(pTable->header), sizeof(SSdbHeader)); - size += sizeof(SSdbHeader); - twrite(fd, &sdbEcommit, sizeof(sdbEcommit)); - size += sizeof(sdbEcommit); - - while (1) { - pNode = (*sdbFetchRowFp[pTable->keyType])(pTable->iHandle, pNode, (void **)&pMeta); - if (pMeta == NULL) break; - - rowHead->delimiter = SDB_DELIMITER; - rowHead->version = pMeta->id; - rowHead->rowSize = (*pTable->encodeFp)(pMeta->row, rowHead->data, pTable->maxRowSize); - real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM); - if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, real_size) < 0) { - sdbError("failed to get checksum while save sdb %s snapshot", pTable->tableName); - tfree(rowHead); - return; - } - - twrite(fd, rowHead, real_size); - size += real_size; - twrite(fd, &sdbEcommit, sizeof(sdbEcommit)); - size += sizeof(sdbEcommit); - numOfRows++; - } - - tfree(rowHead); - - // Remove the old file - tclose(pTable->fd); - remove(pTable->fileName); - // Rename the .sdb.db file to sdb.db file - rename(fn, pTable->fileName); - pTable->fd = fd; - pTable->fileSize = size; - pTable->numOfRows = numOfRows; - - fdatasync(pTable->fd); - */ -} - void *sdbFetchRow(void *handle, void *pNode, void **ppRow) { SSdbTable *pTable = (SSdbTable *)handle; SRowMeta * pMeta; diff --git a/src/mnode/src/mgmtUser.c b/src/mnode/src/mgmtUser.c index 11285764b8..e55ba9ea60 100644 --- a/src/mnode/src/mgmtUser.c +++ b/src/mnode/src/mgmtUser.c @@ -46,7 +46,15 @@ static int32_t mgmtUserActionDestroy(SSdbOperDesc *pOper) { static int32_t mgmtUserActionInsert(SSdbOperDesc *pOper) { SUserObj *pUser = pOper->pObj; SAcctObj *pAcct = mgmtGetAcct(pUser->acct); - mgmtAddUserIntoAcct(pAcct, pUser); + + if (pAcct != NULL) { + mgmtAddUserIntoAcct(pAcct, pUser); + } + else { + mError("user:%s, acct:%s info not exist in sdb", pUser->user, pUser->acct); + return TSDB_CODE_INVALID_ACCT; + } + return TSDB_CODE_SUCCESS; } @@ -77,7 +85,7 @@ static int32_t mgmtUserActionEncode(SSdbOperDesc *pOper) { static int32_t mgmtUserActionDecode(SSdbOperDesc *pOper) { SUserObj *pUser = (SUserObj *) calloc(1, sizeof(SUserObj)); - if (pUser == NULL) return -1; + if (pUser == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY; memcpy(pUser, pOper->rowData, tsUserUpdateSize); pOper->pObj = pUser; @@ -216,7 +224,6 @@ static int32_t mgmtDropUser(SAcctObj *pAcct, char *name) { int32_t code = sdbDeleteRow(&oper); if (code != TSDB_CODE_SUCCESS) { - tfree(pUser); code = TSDB_CODE_SDB_ERROR; } -- GitLab