提交 0a3b4046 编写于 作者: G Ganlin Zhao

feat: add audit db for DDL storage

上级 09261647
...@@ -160,7 +160,7 @@ static int32_t mnodeDbActionEncode(SSdbRow *pRow) { ...@@ -160,7 +160,7 @@ static int32_t mnodeDbActionEncode(SSdbRow *pRow) {
static int32_t mnodeDbActionDecode(SSdbRow *pRow) { static int32_t mnodeDbActionDecode(SSdbRow *pRow) {
SDbObj *pDb = (SDbObj *) calloc(1, sizeof(SDbObj)); SDbObj *pDb = (SDbObj *) calloc(1, sizeof(SDbObj));
if (pDb == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; if (pDb == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
memcpy(pDb, pRow->rowData, tsDbUpdateSize); memcpy(pDb, pRow->rowData, tsDbUpdateSize);
pRow->pObj = pDb; pRow->pObj = pDb;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -205,8 +205,8 @@ int32_t mnodeInitDbs() { ...@@ -205,8 +205,8 @@ int32_t mnodeInitDbs() {
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_DB, mnodeGetDbMeta); mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_DB, mnodeGetDbMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_DB, mnodeRetrieveDbs); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_DB, mnodeRetrieveDbs);
mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_DB, mnodeCancelGetNextDb); mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_DB, mnodeCancelGetNextDb);
mDebug("table:dbs table is created"); mDebug("table:dbs table is created");
return tpInit(); return tpInit();
} }
...@@ -224,11 +224,11 @@ SDbObj *mnodeGetDb(char *db) { ...@@ -224,11 +224,11 @@ SDbObj *mnodeGetDb(char *db) {
} }
void mnodeIncDbRef(SDbObj *pDb) { void mnodeIncDbRef(SDbObj *pDb) {
sdbIncRef(tsDbSdb, pDb); sdbIncRef(tsDbSdb, pDb);
} }
void mnodeDecDbRef(SDbObj *pDb) { void mnodeDecDbRef(SDbObj *pDb) {
sdbDecRef(tsDbSdb, pDb); sdbDecRef(tsDbSdb, pDb);
} }
SDbObj *mnodeGetDbByTableName(char *tableName) { SDbObj *mnodeGetDbByTableName(char *tableName) {
...@@ -420,7 +420,7 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg * ...@@ -420,7 +420,7 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg *
SDbObj *pDb = mnodeGetDb(pCreate->db); SDbObj *pDb = mnodeGetDb(pCreate->db);
if (pDb != NULL) { if (pDb != NULL) {
mnodeDecDbRef(pDb); mnodeDecDbRef(pDb);
if (pCreate->ignoreExist) { if (pCreate->ignoreExist) {
mDebug("db:%s, already exist, ignore exist is set", pCreate->db); mDebug("db:%s, already exist, ignore exist is set", pCreate->db);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -435,8 +435,8 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg * ...@@ -435,8 +435,8 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg *
pDb = calloc(1, sizeof(SDbObj)); pDb = calloc(1, sizeof(SDbObj));
tstrncpy(pDb->name, pCreate->db, sizeof(pDb->name)); tstrncpy(pDb->name, pCreate->db, sizeof(pDb->name));
tstrncpy(pDb->acct, pAcct->user, sizeof(pDb->acct)); tstrncpy(pDb->acct, pAcct->user, sizeof(pDb->acct));
pDb->createdTime = taosGetTimestampMs(); pDb->createdTime = taosGetTimestampMs();
pDb->cfg = (SDbCfg) { pDb->cfg = (SDbCfg) {
.cacheBlockSize = pCreate->cacheBlockSize, .cacheBlockSize = pCreate->cacheBlockSize,
.totalBlocks = pCreate->totalBlocks, .totalBlocks = pCreate->totalBlocks,
...@@ -500,7 +500,7 @@ bool mnodeCheckIsMonitorDB(char *db, char *monitordb) { ...@@ -500,7 +500,7 @@ bool mnodeCheckIsMonitorDB(char *db, char *monitordb) {
#if 0 #if 0
void mnodePrintVgroups(SDbObj *pDb, char *row) { void mnodePrintVgroups(SDbObj *pDb, char *row) {
mInfo("db:%s, vgroup link from head, row:%s", pDb->name, row); mInfo("db:%s, vgroup link from head, row:%s", pDb->name, row);
SVgObj *pVgroup = pDb->pHead; SVgObj *pVgroup = pDb->pHead;
while (pVgroup != NULL) { while (pVgroup != NULL) {
mInfo("vgId:%d", pVgroup->vgId); mInfo("vgId:%d", pVgroup->vgId);
...@@ -622,7 +622,7 @@ static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn ...@@ -622,7 +622,7 @@ static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn
pShow->bytes[cols] = 24 + VARSTR_HEADER_SIZE; pShow->bytes[cols] = 24 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY; pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
#ifdef _STORAGE #ifdef _STORAGE
strcpy(pSchema[cols].name, "keep0,keep1,keep2"); strcpy(pSchema[cols].name, "keep0,keep1,keep2");
#else #else
strcpy(pSchema[cols].name, "keep"); strcpy(pSchema[cols].name, "keep");
...@@ -639,13 +639,13 @@ static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn ...@@ -639,13 +639,13 @@ static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn
strcpy(pSchema[cols].name, "cache(MB)"); strcpy(pSchema[cols].name, "cache(MB)");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = 4; pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT; pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "blocks"); strcpy(pSchema[cols].name, "blocks");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = 4; pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT; pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "minrows"); strcpy(pSchema[cols].name, "minrows");
...@@ -744,7 +744,7 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void ...@@ -744,7 +744,7 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void
cols = 0; cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
char* name = mnodeGetDbStr(pDb->name); char* name = mnodeGetDbStr(pDb->name);
if (name != NULL) { if (name != NULL) {
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, name, pShow->bytes[cols]); STR_WITH_MAXSIZE_TO_VARSTR(pWrite, name, pShow->bytes[cols]);
...@@ -790,10 +790,10 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void ...@@ -790,10 +790,10 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void
#endif #endif
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
char tmp[128] = {0}; char tmp[128] = {0};
#ifdef _STORAGE #ifdef _STORAGE
if (pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep1 || pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep2) { if (pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep1 || pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep2) {
sprintf(tmp, "%d,%d,%d", pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2, pDb->cfg.daysToKeep0); sprintf(tmp, "%d,%d,%d", pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2, pDb->cfg.daysToKeep0);
} else { } else {
sprintf(tmp, "%d,%d,%d", pDb->cfg.daysToKeep0, pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2); sprintf(tmp, "%d,%d,%d", pDb->cfg.daysToKeep0, pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2);
...@@ -822,7 +822,7 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void ...@@ -822,7 +822,7 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pDb->cfg.maxRowsPerFileBlock; *(int32_t *)pWrite = pDb->cfg.maxRowsPerFileBlock;
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int8_t *)pWrite = pDb->cfg.walLevel; *(int8_t *)pWrite = pDb->cfg.walLevel;
cols++; cols++;
...@@ -913,7 +913,7 @@ static int32_t mnodeSetDbDropping(SDbObj *pDb) { ...@@ -913,7 +913,7 @@ static int32_t mnodeSetDbDropping(SDbObj *pDb) {
} }
static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg) { static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg) {
SCreateDbMsg *pCreate = pMsg->rpcMsg.pCont; SCreateDbMsg *pCreate = pMsg->rpcMsg.pCont;
pCreate->maxTables = htonl(pCreate->maxTables); pCreate->maxTables = htonl(pCreate->maxTables);
pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize); pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize);
pCreate->totalBlocks = htonl(pCreate->totalBlocks); pCreate->totalBlocks = htonl(pCreate->totalBlocks);
...@@ -926,7 +926,7 @@ static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg) { ...@@ -926,7 +926,7 @@ static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg) {
pCreate->partitions = htons(pCreate->partitions); pCreate->partitions = htons(pCreate->partitions);
pCreate->minRowsPerFileBlock = htonl(pCreate->minRowsPerFileBlock); pCreate->minRowsPerFileBlock = htonl(pCreate->minRowsPerFileBlock);
pCreate->maxRowsPerFileBlock = htonl(pCreate->maxRowsPerFileBlock); pCreate->maxRowsPerFileBlock = htonl(pCreate->maxRowsPerFileBlock);
int32_t code; int32_t code;
#ifdef GRANT_CHECK_WRITE #ifdef GRANT_CHECK_WRITE
if (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS) { if (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS) {
...@@ -964,7 +964,7 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SAlterDbMsg *pAlter) { ...@@ -964,7 +964,7 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SAlterDbMsg *pAlter) {
int8_t cacheLastRow = pAlter->cacheLastRow; int8_t cacheLastRow = pAlter->cacheLastRow;
int8_t dbType = pAlter->dbType; int8_t dbType = pAlter->dbType;
int16_t partitions = htons(pAlter->partitions); int16_t partitions = htons(pAlter->partitions);
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
//UPGRATE FROM LOW VERSION, reorder it //UPGRATE FROM LOW VERSION, reorder it
...@@ -1198,7 +1198,7 @@ int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg) { ...@@ -1198,7 +1198,7 @@ int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg) {
mError("db:%s, failed to alter, invalid db", pAlter->db); mError("db:%s, failed to alter, invalid db", pAlter->db);
return TSDB_CODE_MND_INVALID_DB; return TSDB_CODE_MND_INVALID_DB;
} }
if (pMsg->pDb->status != TSDB_DB_STATUS_READY) { if (pMsg->pDb->status != TSDB_DB_STATUS_READY) {
mError("db:%s, status:%d, in dropping", pAlter->db, pMsg->pDb->status); mError("db:%s, status:%d, in dropping", pAlter->db, pMsg->pDb->status);
return TSDB_CODE_MND_DB_IN_DROPPING; return TSDB_CODE_MND_DB_IN_DROPPING;
...@@ -1220,7 +1220,7 @@ static int32_t mnodeDropDbCb(SMnodeMsg *pMsg, int32_t code) { ...@@ -1220,7 +1220,7 @@ static int32_t mnodeDropDbCb(SMnodeMsg *pMsg, int32_t code) {
static int32_t mnodeDropDb(SMnodeMsg *pMsg) { static int32_t mnodeDropDb(SMnodeMsg *pMsg) {
if (pMsg == NULL) return TSDB_CODE_MND_APP_ERROR; if (pMsg == NULL) return TSDB_CODE_MND_APP_ERROR;
SDbObj *pDb = pMsg->pDb; SDbObj *pDb = pMsg->pDb;
mInfo("db:%s, drop db from sdb", pDb->name); mInfo("db:%s, drop db from sdb", pDb->name);
...@@ -1260,7 +1260,7 @@ static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg) { ...@@ -1260,7 +1260,7 @@ static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg) {
mError("db:%s, can't drop monitor database", pDrop->db); mError("db:%s, can't drop monitor database", pDrop->db);
return TSDB_CODE_MND_MONITOR_DB_FORBIDDEN; return TSDB_CODE_MND_MONITOR_DB_FORBIDDEN;
} }
#endif #endif
int32_t code = mnodeSetDbDropping(pMsg->pDb); int32_t code = mnodeSetDbDropping(pMsg->pDb);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
...@@ -1293,15 +1293,15 @@ static int32_t mnodeSyncDb(SDbObj *pDb, SMnodeMsg *pMsg) { ...@@ -1293,15 +1293,15 @@ static int32_t mnodeSyncDb(SDbObj *pDb, SMnodeMsg *pMsg) {
static int32_t mnodeCompact(SDbObj *pDb, SCompactMsg *pCompactMsg) { static int32_t mnodeCompact(SDbObj *pDb, SCompactMsg *pCompactMsg) {
int32_t count = ntohs(pCompactMsg->numOfVgroup); int32_t count = ntohs(pCompactMsg->numOfVgroup);
int32_t *buf = malloc(sizeof(int32_t) * count); int32_t *buf = malloc(sizeof(int32_t) * count);
if (buf == NULL) { if (buf == NULL) {
return TSDB_CODE_MND_OUT_OF_MEMORY; return TSDB_CODE_MND_OUT_OF_MEMORY;
} }
for (int32_t i = 0; i < count; i++) { for (int32_t i = 0; i < count; i++) {
buf[i] = ntohs(pCompactMsg->vgid[i]); buf[i] = ntohs(pCompactMsg->vgid[i]);
} }
// copy from mnodeSyncDb, so ugly // copy from mnodeSyncDb, so ugly
for (int32_t i = 0; i < count; i++) { for (int32_t i = 0; i < count; i++) {
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
...@@ -1313,7 +1313,7 @@ static int32_t mnodeCompact(SDbObj *pDb, SCompactMsg *pCompactMsg) { ...@@ -1313,7 +1313,7 @@ static int32_t mnodeCompact(SDbObj *pDb, SCompactMsg *pCompactMsg) {
if (pVgroup->pDb == pDb && pVgroup->vgId == buf[i]) { if (pVgroup->pDb == pDb && pVgroup->vgId == buf[i]) {
mnodeSendCompactVgroupMsg(pVgroup); mnodeSendCompactVgroupMsg(pVgroup);
mnodeDecVgroupRef(pVgroup); mnodeDecVgroupRef(pVgroup);
valid = true; valid = true;
break; break;
} }
mnodeDecVgroupRef(pVgroup); mnodeDecVgroupRef(pVgroup);
...@@ -1322,7 +1322,7 @@ static int32_t mnodeCompact(SDbObj *pDb, SCompactMsg *pCompactMsg) { ...@@ -1322,7 +1322,7 @@ static int32_t mnodeCompact(SDbObj *pDb, SCompactMsg *pCompactMsg) {
mLError("db:%s, cannot find valid vgId: %d", pDb->name, buf[i]); mLError("db:%s, cannot find valid vgId: %d", pDb->name, buf[i]);
} }
} }
free(buf); free(buf);
mLInfo("db:%s, trigger compact", pDb->name); mLInfo("db:%s, trigger compact", pDb->name);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1353,14 +1353,14 @@ static int32_t mnodeProcessSyncDbMsg(SMnodeMsg *pMsg) { ...@@ -1353,14 +1353,14 @@ static int32_t mnodeProcessSyncDbMsg(SMnodeMsg *pMsg) {
static int32_t mnodeProcessCompactMsg(SMnodeMsg *pMsg) { static int32_t mnodeProcessCompactMsg(SMnodeMsg *pMsg) {
SCompactMsg *pCompact = pMsg->rpcMsg.pCont; SCompactMsg *pCompact = pMsg->rpcMsg.pCont;
mDebug("db:%s, compact is received from thandle:%p", pCompact->db, pMsg->rpcMsg.handle); mDebug("db:%s, compact is received from thandle:%p", pCompact->db, pMsg->rpcMsg.handle);
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(pCompact->db); if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(pCompact->db);
if (pMsg->pDb == NULL) return TSDB_CODE_MND_DB_NOT_SELECTED; if (pMsg->pDb == NULL) return TSDB_CODE_MND_DB_NOT_SELECTED;
if (pMsg->pDb->status != TSDB_DB_STATUS_READY) { if (pMsg->pDb->status != TSDB_DB_STATUS_READY) {
mError("db:%s, status:%d, in dropping, ignore compact request", pCompact->db, pMsg->pDb->status); mError("db:%s, status:%d, in dropping, ignore compact request", pCompact->db, pMsg->pDb->status);
return TSDB_CODE_MND_DB_IN_DROPPING; return TSDB_CODE_MND_DB_IN_DROPPING;
} }
return mnodeCompact(pMsg->pDb, pCompact); return mnodeCompact(pMsg->pDb, pCompact);
} }
...@@ -1383,7 +1383,7 @@ void mnodeDropAllDbs(SAcctObj *pAcct) { ...@@ -1383,7 +1383,7 @@ void mnodeDropAllDbs(SAcctObj *pAcct) {
.pTable = tsDbSdb, .pTable = tsDbSdb,
.pObj = pDb .pObj = pDb
}; };
sdbDeleteRow(&row); sdbDeleteRow(&row);
numOfDbs++; numOfDbs++;
} }
...@@ -1411,11 +1411,11 @@ int32_t mnodeCompactDbs() { ...@@ -1411,11 +1411,11 @@ int32_t mnodeCompactDbs() {
}; };
mInfo("compact dbs %s", pDb->name); mInfo("compact dbs %s", pDb->name);
sdbInsertCompactRow(&row); sdbInsertCompactRow(&row);
} }
mInfo("end to compact dbs table..."); mInfo("end to compact dbs table...");
return 0; return 0;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册