提交 4271952d 编写于 作者: S slguan

[TD-15] refact the code to delete the database

上级 ddd05d9c
......@@ -15,11 +15,15 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "mgmtDb.h"
#include "taoserror.h"
#include "tstatus.h"
#include "tutil.h"
#include "name.h"
#include "mnode.h"
#include "mgmtAcct.h"
#include "mgmtBalance.h"
#include "mgmtChildTable.h"
#include "mgmtDb.h"
#include "mgmtDnode.h"
#include "mgmtGrant.h"
#include "mgmtShell.h"
......@@ -31,19 +35,13 @@
#include "mgmtTable.h"
#include "mgmtUser.h"
#include "mgmtVgroup.h"
#include "mnode.h"
#include "taoserror.h"
#include "tstatus.h"
#include "tutil.h"
#include "name.h"
static void *tsDbSdb = NULL;
static int32_t tsDbUpdateSize;
static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate);
static void mgmtDropDb(void *handle, void *tmrId);
static void mgmtSetDbDirty(SDbObj *pDb);
static int32_t mgmtSetDbDirty(SDbObj *pDb);
static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn);
......@@ -51,64 +49,73 @@ static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg);
static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg);
static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg);
static int32_t mgmtDbActionDestroy(void *pObj) {
tfree(pObj);
static int32_t mgmtDbActionDestroy(SSdbOperDesc *pOper) {
tfree(pOper->pObj);
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtDbActionInsert(void *pObj) {
SDbObj *pDb = (SDbObj *) pObj;
static int32_t mgmtDbActionInsert(SSdbOperDesc *pOper) {
SDbObj *pDb = pOper->pObj;
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
pDb->pHead = NULL;
pDb->pTail = NULL;
pDb->prev = NULL;
pDb->next = NULL;
pDb->numOfVgroups = 0;
pDb->numOfTables = 0;
mgmtAddDbIntoAcct(pAcct, pDb);
pDb->numOfSuperTables = 0;
if (pAcct != NULL) {
mgmtAddDbIntoAcct(pAcct, pDb);
}
else {
mError("db:%s, acct:%s info not exist in sdb", pDb->name, pDb->cfg.acct);
return TSDB_CODE_INVALID_ACCT;
}
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtDbActionDelete(void *pObj) {
SDbObj *pDb = (SDbObj *) pObj;
static int32_t mgmtDbActionDelete(SSdbOperDesc *pOper) {
SDbObj *pDb = pOper->pObj;
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
mgmtRemoveDbFromAcct(pAcct, pDb);
mgmtRemoveDbFromAcct(pAcct, pDb);
mgmtDropAllNormalTables(pDb);
mgmtDropAllChildTables(pDb);
mgmtDropAllSuperTables(pDb);
mgmtDropAllVgroups(pDb);
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtDbActionUpdate(void *pObj) {
static int32_t mgmtDbActionUpdate(SSdbOperDesc *pOper) {
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtDbActionEncode(void *pObj, void *pData, int32_t maxRowSize) {
SDbObj *pDb = (SDbObj *)pObj;
if (maxRowSize < tsDbUpdateSize) {
static int32_t mgmtDbActionEncode(SSdbOperDesc *pOper) {
SDbObj *pDb = pOper->pObj;
if (pOper->maxRowSize < tsDbUpdateSize) {
return -1;
} else {
memcpy(pData, pDb, tsDbUpdateSize);
return tsDbUpdateSize;
memcpy(pOper->rowData, pDb, tsDbUpdateSize);
pOper->rowSize = tsDbUpdateSize;
return TSDB_CODE_SUCCESS;
}
}
static void *mgmtDbActionDecode(void *pData) {
SDbObj *pDb = (SDbObj *) malloc(sizeof(SDbObj));
if (pDb == NULL) return NULL;
memset(pDb, 0, sizeof(SDbObj));
memcpy(pDb, pData, tsDbUpdateSize);
return (void *)pDb;
static int32_t mgmtDbActionDecode(SSdbOperDesc *pOper) {
SDbObj *pDb = (SDbObj *) calloc(1, sizeof(SDbObj));
if (pDb == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY;
memcpy(pDb, pOper->rowData, tsDbUpdateSize);
pOper->pObj = pDb;
return TSDB_CODE_SUCCESS;
}
int32_t mgmtInitDbs() {
void * pNode = NULL;
SDbObj * pDb = NULL;
SAcctObj *pAcct = NULL;
SDbObj tObj;
tsDbUpdateSize = tObj.updateEnd - (char *)&tObj;
......@@ -116,7 +123,7 @@ int32_t mgmtInitDbs() {
.tableName = "dbs",
.hashSessions = TSDB_MAX_DBS,
.maxRowSize = tsDbUpdateSize,
.keyType = SDB_KEYTYPE_STRING,
.keyType = SDB_KEY_TYPE_STRING,
.insertFp = mgmtDbActionInsert,
.deleteFp = mgmtDbActionDelete,
.updateFp = mgmtDbActionUpdate,
......@@ -131,25 +138,6 @@ int32_t mgmtInitDbs() {
return -1;
}
while (1) {
pNode = sdbFetchRow(tsDbSdb, pNode, (void **)&pDb);
if (pDb == NULL) break;
pDb->pHead = NULL;
pDb->pTail = NULL;
pDb->prev = NULL;
pDb->next = NULL;
pDb->numOfTables = 0;
pDb->numOfVgroups = 0;
pDb->numOfSuperTables = 0;
pAcct = mgmtGetAcct(pDb->cfg.acct);
if (pAcct != NULL)
mgmtAddDbIntoAcct(pAcct, pDb);
else {
mError("db:%s acct:%s info not exist in sdb", pDb->name, pDb->cfg.acct);
}
}
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CREATE_DB, mgmtProcessCreateDbMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_ALTER_DB, mgmtProcessAlterDbMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_DROP_DB, mgmtProcessDropDbMsg);
......@@ -250,14 +238,14 @@ static int32_t mgmtCheckDBParams(SCMCreateDbMsg *pCreate) {
static int32_t mgmtCheckDbParams(SCMCreateDbMsg *pCreate) {
// assign default parameters
if (pCreate->maxSessions < 0) pCreate->maxSessions = tsSessionsPerVnode; //
if (pCreate->cacheBlockSize < 0) pCreate->cacheBlockSize = tsCacheBlockSize; //
if (pCreate->daysPerFile < 0) pCreate->daysPerFile = tsDaysPerFile; //
if (pCreate->daysToKeep < 0) pCreate->daysToKeep = tsDaysToKeep; //
if (pCreate->daysToKeep1 < 0) pCreate->daysToKeep1 = pCreate->daysToKeep; //
if (pCreate->daysToKeep2 < 0) pCreate->daysToKeep2 = pCreate->daysToKeep; //
if (pCreate->commitTime < 0) pCreate->commitTime = tsCommitTime; //
if (pCreate->compression < 0) pCreate->compression = tsCompression; //
if (pCreate->maxSessions < 0) pCreate->maxSessions = tsSessionsPerVnode;
if (pCreate->cacheBlockSize < 0) pCreate->cacheBlockSize = tsCacheBlockSize;
if (pCreate->daysPerFile < 0) pCreate->daysPerFile = tsDaysPerFile;
if (pCreate->daysToKeep < 0) pCreate->daysToKeep = tsDaysToKeep;
if (pCreate->daysToKeep1 < 0) pCreate->daysToKeep1 = pCreate->daysToKeep;
if (pCreate->daysToKeep2 < 0) pCreate->daysToKeep2 = pCreate->daysToKeep;
if (pCreate->commitTime < 0) pCreate->commitTime = tsCommitTime;
if (pCreate->compression < 0) pCreate->compression = tsCompression;
if (pCreate->commitLog < 0) pCreate->commitLog = tsCommitLog;
if (pCreate->replications < 0) pCreate->replications = tsReplications; //
if (pCreate->rowsInFileBlock < 0) pCreate->rowsInFileBlock = tsRowsInFileBlock; //
......@@ -321,9 +309,17 @@ static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) {
pDb->createdTime = taosGetTimestampMs();
pDb->cfg = *pCreate;
if (sdbInsertRow(tsDbSdb, pDb, SDB_OPER_GLOBAL) < 0) {
code = TSDB_CODE_SDB_ERROR;
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_GLOBAL,
.table = tsDbSdb,
.pObj = pDb,
.rowSize = sizeof(SDbObj)
};
code = sdbInsertRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
tfree(pDb);
code = TSDB_CODE_SDB_ERROR;
}
return code;
......@@ -337,72 +333,6 @@ bool mgmtCheckIsMonitorDB(char *db, char *monitordb) {
return (strncasecmp(dbName, monitordb, len) == 0 && len == strlen(monitordb));
}
static int32_t mgmtAlterDb(SAcctObj *pAcct, SCMAlterDbMsg *pAlter) {
return 0;
// int32_t code = TSDB_CODE_SUCCESS;
//
// SDbObj *pDb = (SDbObj *) sdbGetRow(tsDbSdb, pAlter->db);
// if (pDb == NULL) {
// mTrace("db:%s is not exist", pAlter->db);
// return TSDB_CODE_INVALID_DB;
// }
//
// int32_t oldReplicaNum = pDb->cfg.replications;
// if (pAlter->daysToKeep > 0) {
// mTrace("db:%s daysToKeep:%d change to %d", pDb->name, pDb->cfg.daysToKeep, pAlter->daysToKeep);
// pDb->cfg.daysToKeep = pAlter->daysToKeep;
// } else if (pAlter->replications > 0) {
// mTrace("db:%s replica:%d change to %d", pDb->name, pDb->cfg.replications, pAlter->replications);
// if (pAlter->replications < TSDB_REPLICA_MIN_NUM || pAlter->replications > TSDB_REPLICA_MAX_NUM) {
// mError("invalid db option replica: %d valid range: %d--%d", pAlter->replications, TSDB_REPLICA_MIN_NUM, TSDB_REPLICA_MAX_NUM);
// return TSDB_CODE_INVALID_OPTION;
// }
// pDb->cfg.replications = pAlter->replications;
// } else if (pAlter->maxSessions > 0) {
// mTrace("db:%s tables:%d change to %d", pDb->name, pDb->cfg.maxSessions, pAlter->maxSessions);
// if (pAlter->maxSessions < TSDB_MIN_TABLES_PER_VNODE || pAlter->maxSessions > TSDB_MAX_TABLES_PER_VNODE) {
// mError("invalid db option tables: %d valid range: %d--%d", pAlter->maxSessions, TSDB_MIN_TABLES_PER_VNODE, TSDB_MAX_TABLES_PER_VNODE);
// return TSDB_CODE_INVALID_OPTION;
// }
// if (pAlter->maxSessions < pDb->cfg.maxSessions) {
// mError("invalid db option tables: %d should larger than original:%d", pAlter->maxSessions, pDb->cfg.maxSessions);
// return TSDB_CODE_INVALID_OPTION;
// }
// return TSDB_CODE_INVALID_OPTION;
// //The modification of tables needs to rewrite the head file, so disable this option
// //pDb->cfg.maxSessions = pAlter->maxSessions;
// } else {
// mError("db:%s alter msg, replica:%d, keep:%d, tables:%d, origin replica:%d keep:%d", pDb->name,
// pAlter->replications, pAlter->maxSessions, pAlter->daysToKeep,
// pDb->cfg.replications, pDb->cfg.daysToKeep);
// return TSDB_CODE_INVALID_OPTION;
// }
//
// if (sdbUpdateRow(tsDbSdb, pDb, tsDbUpdateSize, 1) < 0) {
// return TSDB_CODE_SDB_ERROR;
// }
//
// SVgObj *pVgroup = pDb->pHead;
// while (pVgroup != NULL) {
// balanceUpdateVgroupState(pVgroup, TSDB_VG_LB_STATUS_UPDATE, 0);
// if (oldReplicaNum < pDb->cfg.replications) {
// if (!balanceAddVnode(pVgroup, NULL, NULL)) {
// mWarn("db:%s vgroup:%d not enough dnode to add vnode", pAlter->db, pVgroup->vgId);
// code = TSDB_CODE_NO_ENOUGH_DNODES;
// }
// }
// if (pAlter->maxSessions > 0) {
// //rebuild meterList in mgmtVgroup.c
// mgmtUpdateVgroup(pVgroup);
// }
//// mgmtSendCreateVnodeMsg(pVgroup);
// pVgroup = pVgroup->next;
// }
// mgmtStartBalanceTimer(10);
//
// return code;
}
int32_t mgmtAddVgroupIntoDb(SDbObj *pDb, SVgObj *pVgroup) {
pVgroup->next = pDb->pHead;
pVgroup->prev = NULL;
......@@ -472,7 +402,7 @@ static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn)
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "created_time");
strcpy(pSchema[cols].name, "create time");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
......@@ -600,7 +530,7 @@ static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn)
return 0;
}
char *mgmtGetDbStr(char *src) {
static char *mgmtGetDbStr(char *src) {
char *pos = strstr(src, TS_PATH_DELIMITER);
return ++pos;
......@@ -740,8 +670,23 @@ void mgmtRemoveTableFromDb(SDbObj *pDb) {
atomic_add_fetch_32(&pDb->numOfTables, -1);
}
static void mgmtSetDbDirty(SDbObj *pDb) {
static int32_t mgmtSetDbDirty(SDbObj *pDb) {
if (pDb->dirty) return TSDB_CODE_SUCCESS;
pDb->dirty = true;
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_GLOBAL,
.table = tsDbSdb,
.pObj = pDb,
.rowSize = tsDbUpdateSize
};
int32_t code = sdbUpdateRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
return TSDB_CODE_SDB_ERROR;
}
return code;
}
static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg) {
......@@ -766,32 +711,121 @@ static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg) {
} else {
code = mgmtCreateDb(pMsg->pUser->pAcct, pCreate);
if (code == TSDB_CODE_SUCCESS) {
mLPrint("DB:%s is created by %s", pCreate->db, pMsg->pUser->user);
mLPrint("db:%s, is created by %s", pCreate->db, pMsg->pUser->user);
}
}
mgmtSendSimpleResp(pMsg->thandle, code);
}
static SDbCfg mgmtGetAlterDbOption(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
SDbCfg newCfg = pDb->cfg;
int32_t daysToKeep = htonl(pAlter->daysToKeep);
int32_t maxSessions = htonl(pAlter->maxSessions) + 1;
int8_t replications = pAlter->replications;
terrno = TSDB_CODE_SUCCESS;
if (daysToKeep > 0 && daysToKeep != pDb->cfg.daysToKeep) {
mTrace("db:%s, daysToKeep:%d change to %d", pDb->name, pDb->cfg.daysToKeep, daysToKeep);
newCfg.daysToKeep = daysToKeep;
} else if (replications > 0 && replications != pDb->cfg.replications) {
mTrace("db:%s, replica:%d change to %d", pDb->name, pDb->cfg.replications, replications);
if (replications < TSDB_REPLICA_MIN_NUM || replications > TSDB_REPLICA_MAX_NUM) {
mError("invalid db option replica: %d valid range: %d--%d", replications, TSDB_REPLICA_MIN_NUM, TSDB_REPLICA_MAX_NUM);
terrno = TSDB_CODE_INVALID_OPTION;
}
newCfg.replications = replications;
} else if (maxSessions > 0 && maxSessions != pDb->cfg.maxSessions) {
mTrace("db:%s, tables:%d change to %d", pDb->name, pDb->cfg.maxSessions, maxSessions);
if (maxSessions < TSDB_MIN_TABLES_PER_VNODE || maxSessions > TSDB_MAX_TABLES_PER_VNODE) {
mError("invalid db option tables: %d valid range: %d--%d", maxSessions, TSDB_MIN_TABLES_PER_VNODE, TSDB_MAX_TABLES_PER_VNODE);
terrno = TSDB_CODE_INVALID_OPTION;
}
if (maxSessions < pDb->cfg.maxSessions) {
mError("invalid db option tables: %d should larger than original:%d", maxSessions, pDb->cfg.maxSessions);
terrno = TSDB_CODE_INVALID_OPTION;
}
newCfg.maxSessions = maxSessions;
} else {
}
return newCfg;
}
static int32_t mgmtAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
SDbCfg newCfg = mgmtGetAlterDbOption(pDb, pAlter);
if (terrno != TSDB_CODE_SUCCESS) {
return terrno;
}
if (memcmp(&newCfg, &pDb->cfg, sizeof(SDbCfg)) != 0) {
pDb->cfg = newCfg;
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_GLOBAL,
.table = tsDbSdb,
.pObj = pDb,
.rowSize = tsDbUpdateSize
};
int32_t code = sdbUpdateRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
return TSDB_CODE_SDB_ERROR;
}
}
return TSDB_CODE_SUCCESS;
}
static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) {
if (mgmtCheckRedirect(pMsg->thandle)) return;
SCMAlterDbMsg *pAlter = pMsg->pCont;
pAlter->daysPerFile = htonl(pAlter->daysPerFile);
pAlter->daysToKeep = htonl(pAlter->daysToKeep);
pAlter->maxSessions = htonl(pAlter->maxSessions) + 1;
mTrace("db:%s, alter db msg is received from thandle:%p", pAlter->db, pMsg->thandle);
if (mgmtCheckExpired()) {
mError("db:%s, failed to alter, grant expired", pAlter->db);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_GRANT_EXPIRED);
return;
}
int32_t code;
if (!pMsg->pUser->writeAuth) {
code = TSDB_CODE_NO_RIGHTS;
} else {
code = mgmtAlterDb(pMsg->pUser->pAcct, pAlter);
if (code == TSDB_CODE_SUCCESS) {
mLPrint("DB:%s is altered by %s", pAlter->db, pMsg->pUser->user);
}
mError("db:%s, failed to alter, no rights", pAlter->db);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS);
return;
}
mgmtSendSimpleResp(pMsg->thandle, code);
SDbObj *pDb = mgmtGetDb(pAlter->db);
if (pDb == NULL) {
mError("db:%s, failed to alter, invalid db", pAlter->db);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_DB);
return;
}
int32_t code = mgmtAlterDb(pDb, pAlter);
if (code != TSDB_CODE_SUCCESS) {
mError("db:%s, failed to alter, invalid db option", pAlter->db);
mgmtSendSimpleResp(pMsg->thandle, code);
}
SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg));
memcpy(newMsg, pMsg, sizeof(SQueuedMsg));
pMsg->pCont = NULL;
SVgObj *pVgroup = pDb->pHead;
if (pVgroup != NULL) {
mPrint("vgroup:%d, will be altered", pVgroup->vgId);
newMsg->ahandle = pVgroup;
newMsg->expected = pVgroup->numOfVnodes;
mgmtAlterVgroup(pVgroup, newMsg);
return;
}
mTrace("db:%s, all vgroups is altered", pDb->name);
mgmtSendSimpleResp(newMsg->thandle, TSDB_CODE_SUCCESS);
rpcFreeCont(newMsg->pCont);
free(newMsg);
}
static void mgmtDropDb(void *handle, void *tmrId) {
......@@ -799,7 +833,12 @@ static void mgmtDropDb(void *handle, void *tmrId) {
SDbObj *pDb = newMsg->ahandle;
mPrint("db:%s, drop db from sdb", pDb->name);
int32_t code = sdbDeleteRow(tsDbSdb, pDb, SDB_OPER_GLOBAL);
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_GLOBAL,
.table = tsDbSdb,
.pObj = pDb
};
int32_t code = sdbDeleteRow(&oper);
if (code != 0) {
code = TSDB_CODE_SDB_ERROR;
}
......@@ -846,7 +885,12 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) {
return;
}
mgmtSetDbDirty(pDb);
int32_t code = mgmtSetDbDirty(pDb);
if (code != TSDB_CODE_SUCCESS) {
mError("db:%s, failed to drop, reason:%s", pDrop->db, tstrerror(code));
mgmtSendSimpleResp(pMsg->thandle, code);
return;
}
SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg));
memcpy(newMsg, pMsg, sizeof(SQueuedMsg));
......
......@@ -104,9 +104,6 @@ static void *(*sdbGetIndexFp[])(void *handle, void *key) = {sdbGetStrHashData, s
static void (*sdbCleanUpIndexFp[])(void *handle) = {sdbCloseStrHash, sdbCloseIntHash};
static void *(*sdbFetchRowFp[])(void *handle, void *ptr, void **ppRow) = {sdbFetchStrHashData, sdbFetchIntHashData};
void sdbResetTable(SSdbTable *pTable);
void sdbSaveSnapShot(void *handle);
uint64_t sdbGetVersion() { return sdbVersion; }
int64_t sdbGetId(void *handle) { return ((SSdbTable *)handle)->version; }
int64_t sdbGetNumOfRows(void *handle) { return ((SSdbTable *)handle)->numOfRows; }
......@@ -388,10 +385,6 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) {
pTable->autoIndex = maxAutoIndex;
}
if (numOfChanged > pTable->hashSessions / 4) {
sdbSaveSnapShot(pTable);
}
tfree(rowHead);
return 0;
}
......@@ -538,7 +531,7 @@ int32_t sdbInsertRow(SSdbOperDesc *pOper) {
pthread_mutex_unlock(&pTable->mutex);
sdbTrace("table:%s, a record is inserted:%s, sdbversion:%" PRId64 " version:%" PRId64 " rowSize:%d numOfRows:%d fileSize:%" PRId64,
sdbTrace("table:%s, inserte record:%s, sdbversion:%" PRId64 " version:%" PRId64 " rowSize:%d numOfRows:%d fileSize:%" PRId64,
pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), sdbVersion, pTable->version, pOper->rowSize, pTable->numOfRows, pTable->fileSize);
(*pTable->insertFp)(pOper);
......@@ -578,7 +571,7 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) {
}
}
int32_t total_size = sizeof(SRowHead) + pOper->rowSize + sizeof(TSCKSUM);
int32_t total_size = sizeof(SRowHead) + pMeta->rowSize + sizeof(TSCKSUM);
SRowHead *rowHead = (SRowHead *)calloc(1, total_size);
if (rowHead == NULL) {
sdbError("failed to allocate row head memory, sdb:%s", pTable->tableName);
......@@ -621,7 +614,7 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) {
tfree(rowHead);
sdbTrace("table:%s, a record is deleted:%s, sdbversion:%" PRId64 " id:%" PRId64 " numOfRows:%d",
sdbTrace("table:%s, delete record:%s, sdbversion:%" PRId64 " version:%" PRId64 " numOfRows:%d",
pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), sdbVersion, pTable->version, pTable->numOfRows);
// Delete from current layer
......@@ -644,7 +637,7 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper) {
SRowMeta *pMeta = sdbGetRowMeta(pTable, pOper->pObj);
if (pMeta == NULL) {
sdbError("table:%s, failed to update record:%s, record is not there, sdbversion:%" PRId64 " id:%" PRId64,
sdbError("table:%s, failed to update record:%s, record is not there, sdbversion:%" PRId64 " version:%" PRId64,
pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), sdbVersion, pTable->version);
return -1;
}
......@@ -697,7 +690,7 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper) {
rowHead->delimiter = SDB_DELIMITER;
rowHead->version = pTable->version;
if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, real_size) < 0) {
sdbError("failed to get checksum, sdb:%s version:%d", pTable->tableName, rowHead->version);
sdbError("table:%s, failed to get checksum, version:%d", pTable->tableName, rowHead->version);
pTable->version--;
sdbVersion--;
pthread_mutex_unlock(&pTable->mutex);
......@@ -709,7 +702,7 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper) {
pTable->fileSize += real_size;
sdbFinishCommit(pTable);
sdbTrace("table:%s, a record is updated:%s, sdbversion:%" PRId64 " id:%" PRId64 " numOfRows:%" PRId64,
sdbTrace("table:%s, update record:%s, sdbversion:%" PRId64 " version:%" PRId64 " numOfRows:%" PRId64,
pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), sdbVersion, pTable->version, pTable->numOfRows);
pMeta->version = pTable->version;
......@@ -750,187 +743,11 @@ void sdbCloseTable(void *handle) {
pthread_mutex_destroy(&pTable->mutex);
sdbNumOfTables--;
sdbTrace("table:%s is closed, id:%" PRId64 " numOfTables:%d", pTable->tableName, pTable->version, sdbNumOfTables);
sdbTrace("table:%s, is closed, version:%" PRId64 " numOfTables:%d", pTable->tableName, pTable->version, sdbNumOfTables);
tfree(pTable);
}
void sdbResetTable(SSdbTable *pTable) {
/*
SRowMeta rowMeta;
int32_t bytes;
int32_t total_size = 0;
int32_t real_size = 0;
SRowHead *rowHead = NULL;
void * pMetaRow = NULL;
int64_t oldId = pTable->version;
int32_t oldNumOfRows = pTable->numOfRows;
if (sdbOpenSdbFile(pTable) < 0) return;
pTable->numOfRows = oldNumOfRows;
total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM);
rowHead = (SRowHead *)malloc(total_size);
if (rowHead == NULL) {
sdbError("failed to allocate row head memory for reset, sdb:%s", pTable->tableName);
return;
}
sdbPrint("open sdb file:%s for reset table", pTable->fileName);
while (1) {
memset(rowHead, 0, total_size);
bytes = read(pTable->fd, rowHead, sizeof(SRowHead));
if (bytes < 0) {
sdbError("failed to read sdb file:%s", pTable->fileName);
tfree(rowHead);
return;
}
if (bytes == 0) break;
if (bytes < sizeof(SRowHead) || rowHead->delimiter != SDB_DELIMITER) {
pTable->fileSize++;
lseek(pTable->fd, -(bytes - 1), SEEK_CUR);
continue;
}
if (rowHead->rowSize < 0 || rowHead->rowSize > pTable->maxRowSize) {
sdbError("error row size in sdb file:%s for reset, version:%d rowSize:%d maxRowSize:%d",
pTable->fileName, rowHead->version, rowHead->rowSize, pTable->maxRowSize);
pTable->fileSize += sizeof(SRowHead);
continue;
}
bytes = read(pTable->fd, rowHead->data, rowHead->rowSize + sizeof(TSCKSUM));
if (bytes < rowHead->rowSize + sizeof(TSCKSUM)) {
sdbError("failed to read sdb file:%s for reset, version:%d rowSize:%d", pTable->fileName, rowHead->version, rowHead->rowSize);
break;
}
real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM);
if (!taosCheckChecksumWhole((uint8_t *)rowHead, real_size)) {
sdbError("error sdb checksum, sdb:%s version:%d, skip", pTable->tableName, rowHead->version);
pTable->fileSize += real_size;
continue;
}
if (abs(rowHead->version) > oldId) { // not operated
pMetaRow = sdbGetRow(pTable, rowHead->data);
if (pMetaRow == NULL) { // New object
if (rowHead->version < 0) {
sdbError("error sdb negative version:%d, sdb:%s, skip", rowHead->version, pTable->tableName);
} else {
rowMeta.version = rowHead->version;
// TODO:Get rid of the rowMeta.offset and rowSize
rowMeta.offset = pTable->fileSize;
rowMeta.rowSize = rowHead->rowSize;
rowMeta.row = (*pTable->decodeFp)(rowHead->data);
(*sdbAddIndexFp[pTable->keyType])(pTable->iHandle, rowMeta.row, &rowMeta);
pTable->numOfRows++;
(*pTable->insertFp)(rowMeta.row);
}
} else { // already exists
if (rowHead->version < 0) { // Delete the object
(*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, rowHead->data);
(*pTable->destroyFp)(pMetaRow);
pTable->numOfRows--;
} else { // update the object
(*pTable->updateFp)(pMetaRow);
}
}
}
pTable->fileSize += real_size;
if (pTable->version < abs(rowHead->version)) pTable->version = abs(rowHead->version);
}
sdbVersion += (pTable->version - oldId);
tfree(rowHead);
sdbPrint("table:%s is updated, sdbVerion:%" PRId64 " id:%" PRId64, pTable->tableName, sdbVersion, pTable->version);
*/
}
// TODO:A problem here :use snapshot file to sync another node will cause problem
void sdbSaveSnapShot(void *handle) {
/*
SSdbTable *pTable = (SSdbTable *)handle;
SRowMeta * pMeta;
void * pNode = NULL;
int32_t total_size = 0;
int32_t real_size = 0;
int32_t size = 0;
int32_t numOfRows = 0;
uint32_t sdbEcommit = SDB_ENDCOMMIT;
char * dirc = NULL;
char * basec = NULL;
if (pTable == NULL) return;
sdbTrace("Table:%s, save the snapshop", pTable->tableName);
char fn[128] = "\0";
dirc = strdup(pTable->fileName);
basec = strdup(pTable->fileName);
sprintf(fn, "%s/.%s", dirname(dirc), basename(basec));
int32_t fd = open(fn, O_RDWR | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
tfree(dirc);
tfree(basec);
total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM);
SRowHead *rowHead = (SRowHead *)malloc(total_size);
if (rowHead == NULL) {
sdbError("failed to allocate memory while saving SDB snapshot, sdb:%s", pTable->tableName);
return;
}
memset(rowHead, 0, size);
// Write the header
twrite(fd, &(pTable->header), sizeof(SSdbHeader));
size += sizeof(SSdbHeader);
twrite(fd, &sdbEcommit, sizeof(sdbEcommit));
size += sizeof(sdbEcommit);
while (1) {
pNode = (*sdbFetchRowFp[pTable->keyType])(pTable->iHandle, pNode, (void **)&pMeta);
if (pMeta == NULL) break;
rowHead->delimiter = SDB_DELIMITER;
rowHead->version = pMeta->id;
rowHead->rowSize = (*pTable->encodeFp)(pMeta->row, rowHead->data, pTable->maxRowSize);
real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM);
if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, real_size) < 0) {
sdbError("failed to get checksum while save sdb %s snapshot", pTable->tableName);
tfree(rowHead);
return;
}
twrite(fd, rowHead, real_size);
size += real_size;
twrite(fd, &sdbEcommit, sizeof(sdbEcommit));
size += sizeof(sdbEcommit);
numOfRows++;
}
tfree(rowHead);
// Remove the old file
tclose(pTable->fd);
remove(pTable->fileName);
// Rename the .sdb.db file to sdb.db file
rename(fn, pTable->fileName);
pTable->fd = fd;
pTable->fileSize = size;
pTable->numOfRows = numOfRows;
fdatasync(pTable->fd);
*/
}
void *sdbFetchRow(void *handle, void *pNode, void **ppRow) {
SSdbTable *pTable = (SSdbTable *)handle;
SRowMeta * pMeta;
......
......@@ -46,7 +46,15 @@ static int32_t mgmtUserActionDestroy(SSdbOperDesc *pOper) {
static int32_t mgmtUserActionInsert(SSdbOperDesc *pOper) {
SUserObj *pUser = pOper->pObj;
SAcctObj *pAcct = mgmtGetAcct(pUser->acct);
mgmtAddUserIntoAcct(pAcct, pUser);
if (pAcct != NULL) {
mgmtAddUserIntoAcct(pAcct, pUser);
}
else {
mError("user:%s, acct:%s info not exist in sdb", pUser->user, pUser->acct);
return TSDB_CODE_INVALID_ACCT;
}
return TSDB_CODE_SUCCESS;
}
......@@ -77,7 +85,7 @@ static int32_t mgmtUserActionEncode(SSdbOperDesc *pOper) {
static int32_t mgmtUserActionDecode(SSdbOperDesc *pOper) {
SUserObj *pUser = (SUserObj *) calloc(1, sizeof(SUserObj));
if (pUser == NULL) return -1;
if (pUser == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY;
memcpy(pUser, pOper->rowData, tsUserUpdateSize);
pOper->pObj = pUser;
......@@ -216,7 +224,6 @@ static int32_t mgmtDropUser(SAcctObj *pAcct, char *name) {
int32_t code = sdbDeleteRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
tfree(pUser);
code = TSDB_CODE_SDB_ERROR;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册