未验证 提交 dc2cbc0f 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1724 from taosdata/feature/alter

Feature/alter
...@@ -46,7 +46,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const ...@@ -46,7 +46,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
pSql->signature = pSql; pSql->signature = pSql;
pSql->param = param; pSql->param = param;
pSql->pTscObj = pObj; pSql->pTscObj = pObj;
pSql->maxRetry = TSDB_REPLICA_MAX_NUM; pSql->maxRetry = TSDB_MAX_REPLICA_NUM;
pSql->fp = fp; pSql->fp = fp;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
......
...@@ -4806,16 +4806,15 @@ static int32_t setTimePrecisionOption(SSqlCmd* pCmd, SCMCreateDbMsg* pMsg, SCrea ...@@ -4806,16 +4806,15 @@ static int32_t setTimePrecisionOption(SSqlCmd* pCmd, SCMCreateDbMsg* pMsg, SCrea
} }
static void setCreateDBOption(SCMCreateDbMsg* pMsg, SCreateDBInfo* pCreateDb) { static void setCreateDBOption(SCMCreateDbMsg* pMsg, SCreateDBInfo* pCreateDb) {
pMsg->blocksPerTable = htons(pCreateDb->numOfBlocksPerTable);
pMsg->compression = pCreateDb->compressionLevel;
pMsg->commitLog = (char)pCreateDb->commitLog;
pMsg->commitTime = htonl(pCreateDb->commitTime);
pMsg->maxSessions = htonl(pCreateDb->tablesPerVnode); pMsg->maxSessions = htonl(pCreateDb->tablesPerVnode);
pMsg->cacheNumOfBlocks.fraction = pCreateDb->numOfAvgCacheBlocks; pMsg->cacheBlockSize = htonl(-1);
pMsg->cacheBlockSize = htonl(pCreateDb->cacheBlockSize); pMsg->totalBlocks = htonl(-1);
pMsg->rowsInFileBlock = htonl(pCreateDb->rowPerFileBlock);
pMsg->daysPerFile = htonl(pCreateDb->daysPerFile); pMsg->daysPerFile = htonl(pCreateDb->daysPerFile);
pMsg->commitTime = htonl(pCreateDb->commitTime);
pMsg->minRowsPerFileBlock = htonl(-1);
pMsg->maxRowsPerFileBlock = htonl(-1);
pMsg->compression = pCreateDb->compressionLevel;
pMsg->commitLog = (char)pCreateDb->commitLog;
pMsg->replications = pCreateDb->replica; pMsg->replications = pCreateDb->replica;
pMsg->ignoreExist = pCreateDb->ignoreExists; pMsg->ignoreExist = pCreateDb->ignoreExists;
} }
...@@ -5348,29 +5347,22 @@ int32_t doLocalQueryProcess(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) { ...@@ -5348,29 +5347,22 @@ int32_t doLocalQueryProcess(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCMCreateDbMsg* pCreate) { int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCMCreateDbMsg* pCreate) {
char msg[512] = {0}; char msg[512] = {0};
if (pCreate->commitLog != -1 && (pCreate->commitLog < 0 || pCreate->commitLog > 2)) { if (pCreate->commitLog != -1 && (pCreate->commitLog < TSDB_MIN_CLOG_LEVEL || pCreate->commitLog > TSDB_MAX_CLOG_LEVEL)) {
snprintf(msg, tListLen(msg), "invalid db option commitLog: %d, only 0-2 allowed", pCreate->commitLog); snprintf(msg, tListLen(msg), "invalid db option commitLog: %d, only 0-2 allowed", pCreate->commitLog);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
} }
if (pCreate->replications != -1 && if (pCreate->replications != -1 &&
(pCreate->replications < TSDB_REPLICA_MIN_NUM || pCreate->replications > TSDB_REPLICA_MAX_NUM)) { (pCreate->replications < TSDB_MIN_REPLICA_NUM || pCreate->replications > TSDB_MAX_REPLICA_NUM)) {
snprintf(msg, tListLen(msg), "invalid db option replications: %d valid range: [%d, %d]", pCreate->replications, snprintf(msg, tListLen(msg), "invalid db option replications: %d valid range: [%d, %d]", pCreate->replications,
TSDB_REPLICA_MIN_NUM, TSDB_REPLICA_MAX_NUM); TSDB_MIN_REPLICA_NUM, TSDB_MAX_REPLICA_NUM);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
} }
int32_t val = htonl(pCreate->daysPerFile); int32_t val = htonl(pCreate->daysPerFile);
if (val != -1 && (val < TSDB_FILE_MIN_PARTITION_RANGE || val > TSDB_FILE_MAX_PARTITION_RANGE)) { if (val != -1 && (val < TSDB_MIN_DAYS_PER_FILE || val > TSDB_MAX_DAYS_PER_FILE)) {
snprintf(msg, tListLen(msg), "invalid db option daysPerFile: %d valid range: [%d, %d]", val, snprintf(msg, tListLen(msg), "invalid db option daysPerFile: %d valid range: [%d, %d]", val,
TSDB_FILE_MIN_PARTITION_RANGE, TSDB_FILE_MAX_PARTITION_RANGE); TSDB_MIN_DAYS_PER_FILE, TSDB_MAX_DAYS_PER_FILE);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
}
val = htonl(pCreate->rowsInFileBlock);
if (val != -1 && (val < TSDB_MIN_ROWS_IN_FILEBLOCK || val > TSDB_MAX_ROWS_IN_FILEBLOCK)) {
snprintf(msg, tListLen(msg), "invalid db option rowsInFileBlock: %d valid range: [%d, %d]", val,
TSDB_MIN_ROWS_IN_FILEBLOCK, TSDB_MAX_ROWS_IN_FILEBLOCK);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
} }
...@@ -5382,9 +5374,9 @@ int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCMCreateDbMsg* pCreate) { ...@@ -5382,9 +5374,9 @@ int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCMCreateDbMsg* pCreate) {
} }
val = htonl(pCreate->maxSessions); val = htonl(pCreate->maxSessions);
if (val != -1 && (val < TSDB_MIN_TABLES_PER_VNODE || val > TSDB_MAX_TABLES_PER_VNODE)) { if (val != -1 && (val < TSDB_MIN_TABLES || val > TSDB_MAX_TABLES)) {
snprintf(msg, tListLen(msg), "invalid db option maxSessions: %d valid range: [%d, %d]", val, snprintf(msg, tListLen(msg), "invalid db option maxSessions: %d valid range: [%d, %d]", val,
TSDB_MIN_TABLES_PER_VNODE, TSDB_MAX_TABLES_PER_VNODE); TSDB_MIN_TABLES, TSDB_MAX_TABLES);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
} }
...@@ -5394,24 +5386,17 @@ int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCMCreateDbMsg* pCreate) { ...@@ -5394,24 +5386,17 @@ int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCMCreateDbMsg* pCreate) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
} }
if (pCreate->cacheNumOfBlocks.fraction != -1 && (pCreate->cacheNumOfBlocks.fraction < TSDB_MIN_AVG_BLOCKS ||
pCreate->cacheNumOfBlocks.fraction > TSDB_MAX_AVG_BLOCKS)) {
snprintf(msg, tListLen(msg), "invalid db option ablocks: %f valid value: [%d, %d]",
pCreate->cacheNumOfBlocks.fraction, TSDB_MIN_AVG_BLOCKS, TSDB_MAX_AVG_BLOCKS);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
}
val = htonl(pCreate->commitTime); val = htonl(pCreate->commitTime);
if (val != -1 && (val < TSDB_MIN_COMMIT_TIME_INTERVAL || val > TSDB_MAX_COMMIT_TIME_INTERVAL)) { if (val != -1 && (val < TSDB_MIN_COMMIT_TIME || val > TSDB_MAX_COMMIT_TIME)) {
snprintf(msg, tListLen(msg), "invalid db option commitTime: %d valid range: [%d, %d]", val, snprintf(msg, tListLen(msg), "invalid db option commitTime: %d valid range: [%d, %d]", val,
TSDB_MIN_COMMIT_TIME_INTERVAL, TSDB_MAX_COMMIT_TIME_INTERVAL); TSDB_MIN_COMMIT_TIME, TSDB_MAX_COMMIT_TIME);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
} }
if (pCreate->compression != -1 && if (pCreate->compression != -1 &&
(pCreate->compression < TSDB_MIN_COMPRESSION_LEVEL || pCreate->compression > TSDB_MAX_COMPRESSION_LEVEL)) { (pCreate->compression < TSDB_MIN_COMP_LEVEL || pCreate->compression > TSDB_MAX_COMP_LEVEL)) {
snprintf(msg, tListLen(msg), "invalid db option compression: %d valid range: [%d, %d]", pCreate->compression, snprintf(msg, tListLen(msg), "invalid db option compression: %d valid range: [%d, %d]", pCreate->compression,
TSDB_MIN_COMPRESSION_LEVEL, TSDB_MAX_COMPRESSION_LEVEL); TSDB_MIN_COMP_LEVEL, TSDB_MAX_COMP_LEVEL);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
} }
......
...@@ -1323,11 +1323,11 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1323,11 +1323,11 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name); strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
pAlterTableMsg->type = htons(pAlterInfo->type); pAlterTableMsg->type = htons(pAlterInfo->type);
pAlterTableMsg->numOfCols = htons(tscNumOfFields(pQueryInfo)); pAlterTableMsg->numOfCols = tscNumOfFields(pQueryInfo);
memcpy(pAlterTableMsg->tagVal, pAlterInfo->tagData.data, TSDB_MAX_TAGS_LEN); memcpy(pAlterTableMsg->tagVal, pAlterInfo->tagData.data, TSDB_MAX_TAGS_LEN);
SSchema *pSchema = pAlterTableMsg->schema; SSchema *pSchema = pAlterTableMsg->schema;
for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) { for (int i = 0; i < pAlterTableMsg->numOfCols; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
pSchema->type = pField->type; pSchema->type = pField->type;
...@@ -1352,11 +1352,6 @@ int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1352,11 +1352,6 @@ int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pCmd->payloadLen = sizeof(SCMAlterDbMsg); pCmd->payloadLen = sizeof(SCMAlterDbMsg);
pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB; pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
tscError("%p failed to malloc for query msg", pSql);
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload; SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
strcpy(pAlterDbMsg->db, pTableMetaInfo->name); strcpy(pAlterDbMsg->db, pTableMetaInfo->name);
......
...@@ -131,7 +131,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con ...@@ -131,7 +131,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
pSql->pTscObj = pObj; pSql->pTscObj = pObj;
pSql->signature = pSql; pSql->signature = pSql;
pSql->maxRetry = TSDB_REPLICA_MAX_NUM; pSql->maxRetry = TSDB_MAX_REPLICA_NUM;
tsem_init(&pSql->rspSem, 0, 0); tsem_init(&pSql->rspSem, 0, 0);
......
...@@ -74,11 +74,13 @@ extern int16_t tsNumOfVnodesPerCore; ...@@ -74,11 +74,13 @@ extern int16_t tsNumOfVnodesPerCore;
extern int16_t tsNumOfTotalVnodes; extern int16_t tsNumOfTotalVnodes;
extern uint32_t tsPublicIpInt; extern uint32_t tsPublicIpInt;
extern int32_t tsMaxCacheSize; extern int32_t tsCacheBlockSize;
extern int32_t tsSessionsPerVnode; extern int32_t tsTotalBlocks;
extern int32_t tsTablesPerVnode;
extern int16_t tsDaysPerFile; extern int16_t tsDaysPerFile;
extern int32_t tsDaysToKeep; extern int32_t tsDaysToKeep;
extern int32_t tsRowsInFileBlock; extern int32_t tsMinRowsInFileBlock;
extern int32_t tsMaxRowsInFileBlock;
extern int16_t tsCommitTime; // seconds extern int16_t tsCommitTime; // seconds
extern int32_t tsTimePrecision; extern int32_t tsTimePrecision;
extern int16_t tsCompression; extern int16_t tsCompression;
......
...@@ -85,20 +85,22 @@ int16_t tsNumOfVnodesPerCore = 8; ...@@ -85,20 +85,22 @@ int16_t tsNumOfVnodesPerCore = 8;
int16_t tsNumOfTotalVnodes = TSDB_INVALID_VNODE_NUM; int16_t tsNumOfTotalVnodes = TSDB_INVALID_VNODE_NUM;
#ifdef _TD_ARM_32_ #ifdef _TD_ARM_32_
int32_t tsSessionsPerVnode = 100; int32_t tsTablesPerVnode = 100;
#else #else
int32_t tsSessionsPerVnode = 1000; int32_t tsTablesPerVnode = TSDB_DEFAULT_TABLES;
#endif #endif
int32_t tsMaxCacheSize = 64; //64M int32_t tsCacheBlockSize = TSDB_DEFAULT_CACHE_BLOCK_SIZE;
int16_t tsDaysPerFile = 10; int32_t tsTotalBlocks = TSDB_DEFAULT_TOTAL_BLOCKS;
int32_t tsDaysToKeep = 3650; int16_t tsDaysPerFile = TSDB_DEFAULT_DAYS_PER_FILE;
int32_t tsRowsInFileBlock = 4096; int32_t tsDaysToKeep = TSDB_DEFAULT_KEEP;
int16_t tsCommitTime = 3600; // seconds int32_t tsMinRowsInFileBlock = TSDB_DEFAULT_MIN_ROW_FBLOCK;
int32_t tsTimePrecision = TSDB_TIME_PRECISION_MILLI; int32_t tsMaxRowsInFileBlock = TSDB_DEFAULT_MAX_ROW_FBLOCK;
int16_t tsCompression = TSDB_MAX_COMPRESSION_LEVEL; int16_t tsCommitTime = TSDB_DEFAULT_COMMIT_TIME; // seconds
int16_t tsCommitLog = 1; int32_t tsTimePrecision = TSDB_DEFAULT_PRECISION;
int32_t tsReplications = TSDB_REPLICA_MIN_NUM; int16_t tsCompression = TSDB_DEFAULT_COMP_LEVEL;
int16_t tsCommitLog = TSDB_DEFAULT_CLOG_LEVEL;
int32_t tsReplications = TSDB_DEFAULT_REPLICA_NUM;
/** /**
* Change the meaning of affected rows: * Change the meaning of affected rows:
...@@ -567,16 +569,6 @@ static void doInitGlobalConfig() { ...@@ -567,16 +569,6 @@ static void doInitGlobalConfig() {
cfg.unitType = TAOS_CFG_UTYPE_SECOND; cfg.unitType = TAOS_CFG_UTYPE_SECOND;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "ctime";
cfg.ptr = &tsCommitTime;
cfg.valType = TAOS_CFG_VTYPE_INT16;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 30;
cfg.maxValue = 40960;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_SECOND;
taosInitConfigOption(cfg);
cfg.option = "statusInterval"; cfg.option = "statusInterval";
cfg.ptr = &tsStatusInterval; cfg.ptr = &tsStatusInterval;
cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.valType = TAOS_CFG_VTYPE_INT32;
...@@ -678,32 +670,42 @@ static void doInitGlobalConfig() { ...@@ -678,32 +670,42 @@ static void doInitGlobalConfig() {
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
// database configs // database configs
cfg.option = "clog"; cfg.option = "tables";
cfg.ptr = &tsCommitLog; cfg.ptr = &tsTablesPerVnode;
cfg.valType = TAOS_CFG_VTYPE_INT16; cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0; cfg.minValue = TSDB_MIN_TABLES;
cfg.maxValue = 2; cfg.maxValue = TSDB_MAX_TABLES;
cfg.ptrLength = 0; cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "comp"; cfg.option = "cache";
cfg.ptr = &tsCompression; cfg.ptr = &tsCacheBlockSize;
cfg.valType = TAOS_CFG_VTYPE_INT16; cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0; cfg.minValue = TSDB_MIN_CACHE_BLOCK_SIZE;
cfg.maxValue = 2; cfg.maxValue = TSDB_MAX_CACHE_BLOCK_SIZE;
cfg.ptrLength = 0; cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_BYTE;
taosInitConfigOption(cfg);
cfg.option = "blocks";
cfg.ptr = &tsTotalBlocks;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = TSDB_MIN_TOTAL_BLOCKS;
cfg.maxValue = TSDB_MAX_TOTAL_BLOCKS;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_BYTE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "days"; cfg.option = "days";
cfg.ptr = &tsDaysPerFile; cfg.ptr = &tsDaysPerFile;
cfg.valType = TAOS_CFG_VTYPE_INT16; cfg.valType = TAOS_CFG_VTYPE_INT16;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 1; cfg.minValue = TSDB_MIN_DAYS_PER_FILE;
cfg.maxValue = 365; cfg.maxValue = TSDB_MAX_DAYS_PER_FILE;
cfg.ptrLength = 0; cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
...@@ -712,48 +714,68 @@ static void doInitGlobalConfig() { ...@@ -712,48 +714,68 @@ static void doInitGlobalConfig() {
cfg.ptr = &tsDaysToKeep; cfg.ptr = &tsDaysToKeep;
cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 1; cfg.minValue = TSDB_MIN_KEEP;
cfg.maxValue = 365000; cfg.maxValue = TSDB_MAX_KEEP;
cfg.ptrLength = 0; cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "replica"; cfg.option = "minRows";
cfg.ptr = &tsReplications; cfg.ptr = &tsMinRowsInFileBlock;
cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 1; cfg.minValue = TSDB_MIN_MIN_ROW_FBLOCK;
cfg.maxValue = 3; cfg.maxValue = TSDB_MAX_MIN_ROW_FBLOCK;
cfg.ptrLength = 0; cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "tables"; cfg.option = "maxRows";
cfg.ptr = &tsSessionsPerVnode; cfg.ptr = &tsMaxRowsInFileBlock;
cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = TSDB_MIN_TABLES_PER_VNODE; cfg.minValue = TSDB_MIN_MAX_ROW_FBLOCK;
cfg.maxValue = TSDB_MAX_TABLES_PER_VNODE; cfg.maxValue = TSDB_MAX_MAX_ROW_FBLOCK;
cfg.ptrLength = 0; cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "cache"; cfg.option = "ctime";
cfg.ptr = &tsMaxCacheSize; cfg.ptr = &tsCommitTime;
cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.valType = TAOS_CFG_VTYPE_INT16;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 1; cfg.minValue = TSDB_MIN_COMMIT_TIME;
cfg.maxValue = 100000; cfg.maxValue = TSDB_MAX_COMMIT_TIME;
cfg.ptrLength = 0; cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_BYTE; cfg.unitType = TAOS_CFG_UTYPE_SECOND;
taosInitConfigOption(cfg);
cfg.option = "comp";
cfg.ptr = &tsCompression;
cfg.valType = TAOS_CFG_VTYPE_INT16;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = TSDB_MIN_COMP_LEVEL;
cfg.maxValue = TSDB_MAX_COMP_LEVEL;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "clog";
cfg.ptr = &tsCommitLog;
cfg.valType = TAOS_CFG_VTYPE_INT16;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = TSDB_MIN_CLOG_LEVEL;
cfg.maxValue = TSDB_MAX_CLOG_LEVEL;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "rows"; cfg.option = "replica";
cfg.ptr = &tsRowsInFileBlock; cfg.ptr = &tsReplications;
cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 200; cfg.minValue = TSDB_MIN_REPLICA_NUM;
cfg.maxValue = 1048576; cfg.maxValue = TSDB_MAX_REPLICA_NUM;
cfg.ptrLength = 0; cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
......
...@@ -128,14 +128,16 @@ static void dnodeCloseVnodes() { ...@@ -128,14 +128,16 @@ static void dnodeCloseVnodes() {
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
SMDCreateVnodeMsg *pCreate = rpcMsg->pCont; SMDCreateVnodeMsg *pCreate = rpcMsg->pCont;
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId); pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
pCreate->cfg.cfgVersion = htonl(pCreate->cfg.cfgVersion);
pCreate->cfg.maxTables = htonl(pCreate->cfg.maxTables); pCreate->cfg.maxTables = htonl(pCreate->cfg.maxTables);
pCreate->cfg.maxCacheSize = htobe64(pCreate->cfg.maxCacheSize); pCreate->cfg.cacheBlockSize = htonl(pCreate->cfg.cacheBlockSize);
pCreate->cfg.minRowsPerFileBlock = htonl(pCreate->cfg.minRowsPerFileBlock); pCreate->cfg.totalBlocks = htonl(pCreate->cfg.totalBlocks);
pCreate->cfg.maxRowsPerFileBlock = htonl(pCreate->cfg.maxRowsPerFileBlock);
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile); pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile);
pCreate->cfg.daysToKeep1 = htonl(pCreate->cfg.daysToKeep1); pCreate->cfg.daysToKeep1 = htonl(pCreate->cfg.daysToKeep1);
pCreate->cfg.daysToKeep2 = htonl(pCreate->cfg.daysToKeep2); pCreate->cfg.daysToKeep2 = htonl(pCreate->cfg.daysToKeep2);
pCreate->cfg.daysToKeep = htonl(pCreate->cfg.daysToKeep); pCreate->cfg.daysToKeep = htonl(pCreate->cfg.daysToKeep);
pCreate->cfg.minRowsPerFileBlock = htonl(pCreate->cfg.minRowsPerFileBlock);
pCreate->cfg.maxRowsPerFileBlock = htonl(pCreate->cfg.maxRowsPerFileBlock);
pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime); pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime);
pCreate->cfg.arbitratorIp = htonl(pCreate->cfg.arbitratorIp); pCreate->cfg.arbitratorIp = htonl(pCreate->cfg.arbitratorIp);
......
...@@ -221,44 +221,56 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); ...@@ -221,44 +221,56 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_MAX_MPEERS 5 #define TSDB_MAX_MPEERS 5
#define TSDB_MAX_MGMT_IPS (TSDB_MAX_MPEERS+1) #define TSDB_MAX_MGMT_IPS (TSDB_MAX_MPEERS+1)
#define TSDB_REPLICA_MIN_NUM 1
#define TSDB_REPLICA_MAX_NUM 3
#define TSDB_TBNAME_COLUMN_INDEX (-1) #define TSDB_TBNAME_COLUMN_INDEX (-1)
#define TSDB_MULTI_METERMETA_MAX_NUM 100000 // maximum batch size allowed to load metermeta #define TSDB_MULTI_METERMETA_MAX_NUM 100000 // maximum batch size allowed to load metermeta
//default value == 10 #define TSDB_MIN_CACHE_BLOCK_SIZE 1
#define TSDB_FILE_MIN_PARTITION_RANGE 1 //minimum partition range of vnode file in days #define TSDB_MAX_CACHE_BLOCK_SIZE 1000000
#define TSDB_FILE_MAX_PARTITION_RANGE 3650 //max partition range of vnode file in days #define TSDB_DEFAULT_CACHE_BLOCK_SIZE 16
#define TSDB_DATA_MIN_RESERVE_DAY 1 // data in db to be reserved. #define TSDB_MIN_TOTAL_BLOCKS 2
#define TSDB_DATA_DEFAULT_RESERVE_DAY 3650 // ten years #define TSDB_MAX_TOTAL_BLOCKS 10000
#define TSDB_DEFAULT_TOTAL_BLOCKS 2
#define TSDB_MIN_COMPRESSION_LEVEL 0 #define TSDB_MIN_TABLES 4
#define TSDB_MAX_COMPRESSION_LEVEL 2 #define TSDB_MAX_TABLES 200000
#define TSDB_DEFAULT_TABLES 1000
#define TSDB_MIN_COMMIT_TIME_INTERVAL 30 #define TSDB_MIN_DAYS_PER_FILE 1
#define TSDB_MAX_COMMIT_TIME_INTERVAL 40960 #define TSDB_MAX_DAYS_PER_FILE 3650
#define TSDB_DEFAULT_DAYS_PER_FILE 10
#define TSDB_MIN_ROWS_IN_FILEBLOCK 200 #define TSDB_MIN_KEEP 1 // data in db to be reserved.
#define TSDB_MAX_ROWS_IN_FILEBLOCK 500000 #define TSDB_MAX_KEEP 365000 // data in db to be reserved.
#define TSDB_DEFAULT_KEEP 3650 // ten years
#define TSDB_MIN_CACHE_BLOCK_SIZE 1 #define TSDB_DEFAULT_MIN_ROW_FBLOCK 100
#define TSDB_MAX_CACHE_BLOCK_SIZE 1000000 #define TSDB_MIN_MIN_ROW_FBLOCK 10
#define TSDB_MAX_MIN_ROW_FBLOCK 1000
#define TSDB_MIN_CACHE_BLOCKS 100 #define TSDB_DEFAULT_MAX_ROW_FBLOCK 4096
#define TSDB_MAX_CACHE_BLOCKS 409600 #define TSDB_MIN_MAX_ROW_FBLOCK 200
#define TSDB_MAX_MAX_ROW_FBLOCK 10000
#define TSDB_MIN_AVG_BLOCKS 2 #define TSDB_MIN_COMMIT_TIME 30
#define TSDB_MAX_AVG_BLOCKS 2048 #define TSDB_MAX_COMMIT_TIME 40960
#define TSDB_DEFAULT_AVG_BLOCKS 4 #define TSDB_DEFAULT_COMMIT_TIME 3600
/* #define TSDB_MIN_PRECISION TSDB_PRECISION_MILLI
* There is a bug in function taosAllocateId. #define TSDB_MAX_PRECISION TSDB_PRECISION_NANO
* When "create database tables 1" is executed, the wrong sid is assigned, so the minimum value is set to 2. #define TSDB_DEFAULT_PRECISION TSDB_PRECISION_MILLI
*/
#define TSDB_MIN_TABLES_PER_VNODE 2 #define TSDB_MIN_COMP_LEVEL 0
#define TSDB_MAX_TABLES_PER_VNODE 220000 #define TSDB_MAX_COMP_LEVEL 2
#define TSDB_DEFAULT_COMP_LEVEL 2
#define TSDB_MIN_CLOG_LEVEL 0
#define TSDB_MAX_CLOG_LEVEL 2
#define TSDB_DEFAULT_CLOG_LEVEL 2
#define TSDB_MIN_REPLICA_NUM 1
#define TSDB_MAX_REPLICA_NUM 3
#define TSDB_DEFAULT_REPLICA_NUM 1
#define TSDB_MAX_JOIN_TABLE_NUM 5 #define TSDB_MAX_JOIN_TABLE_NUM 5
#define TSDB_MAX_UNION_CLAUSE 5 #define TSDB_MAX_UNION_CLAUSE 5
......
...@@ -97,6 +97,13 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TABLE, 0, 203, "invalid table n ...@@ -97,6 +97,13 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TABLE, 0, 203, "invalid table n
TAOS_DEFINE_ERROR(TSDB_CODE_NOT_SUPER_TABLE, 0, 204, "no super table") // operation only available for super table TAOS_DEFINE_ERROR(TSDB_CODE_NOT_SUPER_TABLE, 0, 204, "no super table") // operation only available for super table
TAOS_DEFINE_ERROR(TSDB_CODE_NOT_ACTIVE_TABLE, 0, 205, "not active table") TAOS_DEFINE_ERROR(TSDB_CODE_NOT_ACTIVE_TABLE, 0, 205, "not active table")
TAOS_DEFINE_ERROR(TSDB_CODE_TABLE_ID_MISMATCH, 0, 206, "table id mismatch") TAOS_DEFINE_ERROR(TSDB_CODE_TABLE_ID_MISMATCH, 0, 206, "table id mismatch")
TAOS_DEFINE_ERROR(TSDB_CODE_TAG_ALREAY_EXIST, 0, 207, "tag already exist")
TAOS_DEFINE_ERROR(TSDB_CODE_TAG_NOT_EXIST, 0, 208, "tag not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_FIELD_ALREAY_EXIST, 0, 209, "field already exist")
TAOS_DEFINE_ERROR(TSDB_CODE_FIELD_NOT_EXIST, 0, 210, "field not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_COL_NAME_TOO_LONG, 0, 211, "column name too long")
TAOS_DEFINE_ERROR(TSDB_CODE_TOO_MANY_TAGS, 0, 211, "too many tags")
// dnode & mnode // dnode & mnode
TAOS_DEFINE_ERROR(TSDB_CODE_NO_ENOUGH_DNODES, 0, 250, "no enough dnodes") TAOS_DEFINE_ERROR(TSDB_CODE_NO_ENOUGH_DNODES, 0, 250, "no enough dnodes")
......
...@@ -238,7 +238,7 @@ typedef struct { ...@@ -238,7 +238,7 @@ typedef struct {
typedef struct SSchema { typedef struct SSchema {
uint8_t type; uint8_t type;
char name[TSDB_COL_NAME_LEN]; char name[TSDB_COL_NAME_LEN + 1];
int16_t colId; int16_t colId;
int16_t bytes; int16_t bytes;
} SSchema; } SSchema;
...@@ -256,14 +256,14 @@ typedef struct { ...@@ -256,14 +256,14 @@ typedef struct {
uint64_t uid; uint64_t uid;
uint64_t superTableUid; uint64_t superTableUid;
uint64_t createdTime; uint64_t createdTime;
char tableId[TSDB_TABLE_ID_LEN]; char tableId[TSDB_TABLE_ID_LEN + 1];
char superTableId[TSDB_TABLE_ID_LEN]; char superTableId[TSDB_TABLE_ID_LEN + 1];
char data[]; char data[];
} SMDCreateTableMsg; } SMDCreateTableMsg;
typedef struct { typedef struct {
char tableId[TSDB_TABLE_ID_LEN]; char tableId[TSDB_TABLE_ID_LEN + 1];
char db[TSDB_DB_NAME_LEN]; char db[TSDB_DB_NAME_LEN + 1];
int8_t igExists; int8_t igExists;
int16_t numOfTags; int16_t numOfTags;
int16_t numOfColumns; int16_t numOfColumns;
...@@ -274,13 +274,13 @@ typedef struct { ...@@ -274,13 +274,13 @@ typedef struct {
} SCMCreateTableMsg; } SCMCreateTableMsg;
typedef struct { typedef struct {
char tableId[TSDB_TABLE_ID_LEN]; char tableId[TSDB_TABLE_ID_LEN + 1];
int8_t igNotExists; int8_t igNotExists;
} SCMDropTableMsg; } SCMDropTableMsg;
typedef struct { typedef struct {
char tableId[TSDB_TABLE_ID_LEN]; char tableId[TSDB_TABLE_ID_LEN + 1];
char db[TSDB_DB_NAME_LEN]; char db[TSDB_DB_NAME_LEN + 1];
int16_t type; /* operation type */ int16_t type; /* operation type */
char tagVal[TSDB_MAX_BYTES_PER_ROW]; char tagVal[TSDB_MAX_BYTES_PER_ROW];
int8_t numOfCols; /* number of schema */ int8_t numOfCols; /* number of schema */
...@@ -345,7 +345,6 @@ typedef struct { ...@@ -345,7 +345,6 @@ typedef struct {
} SMDDropTableMsg; } SMDDropTableMsg;
typedef struct { typedef struct {
int32_t contLen;
int32_t vgId; int32_t vgId;
int64_t uid; int64_t uid;
char tableId[TSDB_TABLE_ID_LEN + 1]; char tableId[TSDB_TABLE_ID_LEN + 1];
...@@ -492,6 +491,7 @@ typedef struct SRetrieveTableRsp { ...@@ -492,6 +491,7 @@ typedef struct SRetrieveTableRsp {
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
int32_t cfgVersion;
int64_t totalStorage; int64_t totalStorage;
int64_t compStorage; int64_t compStorage;
int64_t pointsWritten; int64_t pointsWritten;
...@@ -502,29 +502,23 @@ typedef struct { ...@@ -502,29 +502,23 @@ typedef struct {
} SVnodeLoad; } SVnodeLoad;
typedef struct { typedef struct {
char acct[TSDB_USER_LEN]; char acct[TSDB_USER_LEN + 1];
char db[TSDB_DB_NAME_LEN]; char db[TSDB_DB_NAME_LEN + 1];
uint32_t vgId;
int32_t maxSessions; int32_t maxSessions;
int32_t cacheBlockSize; int32_t cacheBlockSize; //MB
union { int32_t totalBlocks;
int32_t totalBlocks; int32_t daysPerFile;
float fraction; int32_t daysToKeep1;
} cacheNumOfBlocks; int32_t daysToKeep2;
int32_t daysPerFile; int32_t daysToKeep;
int32_t daysToKeep1; int32_t commitTime;
int32_t daysToKeep2; int32_t minRowsPerFileBlock;
int32_t daysToKeep; int32_t maxRowsPerFileBlock;
int32_t commitTime; int8_t compression;
int32_t rowsInFileBlock; int8_t commitLog;
int16_t blocksPerTable; int8_t replications;
int8_t compression; uint8_t precision; // time resolution
int8_t commitLog; int8_t ignoreExist;
int8_t replications;
int8_t repStrategy;
int8_t loadLatest; // load into mem or not
uint8_t precision; // time resolution
int8_t ignoreExist;
} SCMCreateDbMsg, SCMAlterDbMsg; } SCMCreateDbMsg, SCMAlterDbMsg;
typedef struct { typedef struct {
...@@ -592,20 +586,22 @@ typedef struct { ...@@ -592,20 +586,22 @@ typedef struct {
typedef struct { typedef struct {
uint32_t vgId; uint32_t vgId;
int32_t cfgVersion;
int32_t cacheBlockSize;
int32_t totalBlocks;
int32_t maxTables; int32_t maxTables;
int64_t maxCacheSize;
int32_t minRowsPerFileBlock;
int32_t maxRowsPerFileBlock;
int32_t daysPerFile; int32_t daysPerFile;
int32_t daysToKeep; int32_t daysToKeep;
int32_t daysToKeep1; int32_t daysToKeep1;
int32_t daysToKeep2; int32_t daysToKeep2;
int32_t minRowsPerFileBlock;
int32_t maxRowsPerFileBlock;
int32_t commitTime; int32_t commitTime;
uint8_t precision; // time resolution int8_t precision;
int8_t compression; int8_t compression;
int8_t wals;
int8_t commitLog; int8_t commitLog;
int8_t replications; int8_t replications;
int8_t wals;
int8_t quorum; int8_t quorum;
uint32_t arbitratorIp; uint32_t arbitratorIp;
int8_t reserved[16]; int8_t reserved[16];
...@@ -640,7 +636,7 @@ typedef struct SCMSTableVgroupMsg { ...@@ -640,7 +636,7 @@ typedef struct SCMSTableVgroupMsg {
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
int8_t numOfIps; int8_t numOfIps;
SIpAddr ipAddr[TSDB_REPLICA_MAX_NUM]; SIpAddr ipAddr[TSDB_MAX_REPLICA_NUM];
} SCMVgroupInfo; } SCMVgroupInfo;
typedef struct { typedef struct {
...@@ -684,7 +680,7 @@ typedef struct { ...@@ -684,7 +680,7 @@ typedef struct {
} SVnodeDesc; } SVnodeDesc;
typedef struct { typedef struct {
SVnodeDesc vpeerDesc[TSDB_REPLICA_MAX_NUM]; SVnodeDesc vpeerDesc[TSDB_MAX_REPLICA_NUM];
int16_t index; // used locally int16_t index; // used locally
int32_t vgId; int32_t vgId;
int32_t numOfSids; int32_t numOfSids;
...@@ -700,8 +696,8 @@ typedef struct { ...@@ -700,8 +696,8 @@ typedef struct {
typedef struct STableMetaMsg { typedef struct STableMetaMsg {
int32_t contLen; int32_t contLen;
char tableId[TSDB_TABLE_ID_LEN]; // table id char tableId[TSDB_TABLE_ID_LEN + 1]; // table id
char stableId[TSDB_TABLE_ID_LEN]; // stable name if it is created according to super table char stableId[TSDB_TABLE_ID_LEN + 1]; // stable name if it is created according to super table
uint8_t numOfTags; uint8_t numOfTags;
uint8_t precision; uint8_t precision;
uint8_t tableType; uint8_t tableType;
......
...@@ -45,15 +45,19 @@ typedef struct { ...@@ -45,15 +45,19 @@ typedef struct {
// --------- TSDB REPOSITORY CONFIGURATION DEFINITION // --------- TSDB REPOSITORY CONFIGURATION DEFINITION
typedef struct { typedef struct {
int8_t precision;
int8_t compression;
int32_t tsdbId; int32_t tsdbId;
int32_t cacheBlockSize;
int32_t totalBlocks;
int32_t maxTables; // maximum number of tables this repository can have int32_t maxTables; // maximum number of tables this repository can have
int32_t daysPerFile; // day per file sharding policy int32_t daysPerFile; // day per file sharding policy
int32_t keep; // day of data to keep
int32_t keep1;
int32_t keep2;
int32_t minRowsPerFileBlock; // minimum rows per file block int32_t minRowsPerFileBlock; // minimum rows per file block
int32_t maxRowsPerFileBlock; // maximum rows per file block int32_t maxRowsPerFileBlock; // maximum rows per file block
int32_t keep; // day of data to keep int32_t commitTime;
int64_t maxCacheSize; // maximum cache size this TSDB can use int8_t precision;
int8_t compression;
} STsdbCfg; } STsdbCfg;
void tsdbSetDefaultCfg(STsdbCfg *pCfg); void tsdbSetDefaultCfg(STsdbCfg *pCfg);
......
...@@ -143,14 +143,15 @@ typedef struct SVgObj { ...@@ -143,14 +143,15 @@ typedef struct SVgObj {
} SVgObj; } SVgObj;
typedef struct { typedef struct {
int64_t maxCacheSize; int32_t cacheBlockSize;
int32_t totalBlocks;
int32_t maxTables; int32_t maxTables;
int32_t daysPerFile; int32_t daysPerFile;
int32_t daysToKeep; int32_t daysToKeep;
int32_t daysToKeep1; int32_t daysToKeep1;
int32_t daysToKeep2; int32_t daysToKeep2;
int32_t minRowsPerFileBlock; // minimum rows per file block int32_t minRowsPerFileBlock;
int32_t maxRowsPerFileBlock; // maximum rows per file block int32_t maxRowsPerFileBlock;
int32_t commitTime; int32_t commitTime;
int8_t precision; int8_t precision;
int8_t compression; int8_t compression;
...@@ -163,6 +164,7 @@ typedef struct SDbObj { ...@@ -163,6 +164,7 @@ typedef struct SDbObj {
char name[TSDB_DB_NAME_LEN + 1]; char name[TSDB_DB_NAME_LEN + 1];
char acct[TSDB_USER_LEN + 1]; char acct[TSDB_USER_LEN + 1];
int64_t createdTime; int64_t createdTime;
int32_t cfgVersion;
SDbCfg cfg; SDbCfg cfg;
int8_t status; int8_t status;
int8_t reserved[14]; int8_t reserved[14];
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include "tglobal.h" #include "tglobal.h"
#include "ttime.h" #include "ttime.h"
#include "tname.h" #include "tname.h"
#include "tbalance.h"
#include "mgmtDef.h" #include "mgmtDef.h"
#include "mgmtLog.h" #include "mgmtLog.h"
#include "mgmtAcct.h" #include "mgmtAcct.h"
...@@ -177,78 +178,81 @@ SDbObj *mgmtGetDbByTableId(char *tableId) { ...@@ -177,78 +178,81 @@ SDbObj *mgmtGetDbByTableId(char *tableId) {
} }
static int32_t mgmtCheckDbCfg(SDbCfg *pCfg) { static int32_t mgmtCheckDbCfg(SDbCfg *pCfg) {
if (pCfg->maxCacheSize < TSDB_MIN_CACHE_BLOCK_SIZE || pCfg->maxCacheSize > TSDB_MAX_CACHE_BLOCK_SIZE) { if (pCfg->cacheBlockSize < TSDB_MIN_CACHE_BLOCK_SIZE || pCfg->cacheBlockSize > TSDB_MAX_CACHE_BLOCK_SIZE) {
mError("invalid db option maxCacheSize:%d valid range: [%d, %d]", pCfg->maxCacheSize, TSDB_MIN_CACHE_BLOCK_SIZE, mError("invalid db option cacheBlockSize:%d valid range: [%d, %d]", pCfg->cacheBlockSize, TSDB_MIN_CACHE_BLOCK_SIZE,
TSDB_MAX_CACHE_BLOCK_SIZE); TSDB_MAX_CACHE_BLOCK_SIZE);
} }
if (pCfg->maxTables < TSDB_MIN_TABLES_PER_VNODE || pCfg->maxTables > TSDB_MAX_TABLES_PER_VNODE) { if (pCfg->totalBlocks < TSDB_MIN_TOTAL_BLOCKS || pCfg->totalBlocks > TSDB_MAX_TOTAL_BLOCKS) {
mError("invalid db option maxTables:%d valid range: [%d, %d]", pCfg->maxTables, TSDB_MIN_TABLES_PER_VNODE, mError("invalid db option totalBlocks:%d valid range: [%d, %d]", pCfg->totalBlocks, TSDB_MIN_TOTAL_BLOCKS,
TSDB_MAX_TABLES_PER_VNODE); TSDB_MAX_TOTAL_BLOCKS);
}
if (pCfg->maxTables < TSDB_MIN_TABLES || pCfg->maxTables > TSDB_MAX_TABLES) {
mError("invalid db option maxTables:%d valid range: [%d, %d]", pCfg->maxTables, TSDB_MIN_TABLES, TSDB_MAX_TABLES);
return TSDB_CODE_INVALID_OPTION; return TSDB_CODE_INVALID_OPTION;
} }
if (pCfg->daysPerFile < TSDB_FILE_MIN_PARTITION_RANGE || pCfg->daysPerFile > TSDB_FILE_MAX_PARTITION_RANGE) { if (pCfg->daysPerFile < TSDB_MIN_DAYS_PER_FILE || pCfg->daysPerFile > TSDB_MAX_DAYS_PER_FILE) {
mError("invalid db option daysPerFile:%d valid range: [%d, %d]", pCfg->daysPerFile, TSDB_FILE_MIN_PARTITION_RANGE, mError("invalid db option daysPerFile:%d valid range: [%d, %d]", pCfg->daysPerFile, TSDB_MIN_DAYS_PER_FILE,
TSDB_FILE_MAX_PARTITION_RANGE); TSDB_MAX_DAYS_PER_FILE);
return TSDB_CODE_INVALID_OPTION; return TSDB_CODE_INVALID_OPTION;
} }
if (pCfg->daysToKeep1 < TSDB_FILE_MIN_PARTITION_RANGE || pCfg->daysToKeep1 < pCfg->daysPerFile) { if (pCfg->daysToKeep < TSDB_MIN_KEEP || pCfg->daysToKeep > TSDB_MAX_KEEP) {
mError("invalid db option daystokeep:%d", pCfg->daysToKeep); mError("invalid db option daysToKeep:%d", pCfg->daysToKeep);
return TSDB_CODE_INVALID_OPTION; return TSDB_CODE_INVALID_OPTION;
} }
if (pCfg->daysToKeep2 > pCfg->daysToKeep || pCfg->daysToKeep2 < pCfg->daysToKeep1) { if (pCfg->daysToKeep < pCfg->daysPerFile) {
mError("invalid db option daystokeep1:%d, daystokeep2:%d, daystokeep:%d", pCfg->daysToKeep1, mError("invalid db option daysToKeep:%d daysPerFile:%d", pCfg->daysToKeep, pCfg->daysPerFile);
pCfg->daysToKeep2, pCfg->daysToKeep);
return TSDB_CODE_INVALID_OPTION; return TSDB_CODE_INVALID_OPTION;
} }
if (pCfg->minRowsPerFileBlock < TSDB_MIN_ROWS_IN_FILEBLOCK || pCfg->minRowsPerFileBlock > TSDB_MAX_ROWS_IN_FILEBLOCK) { if (pCfg->minRowsPerFileBlock < TSDB_MIN_MIN_ROW_FBLOCK || pCfg->minRowsPerFileBlock > TSDB_MAX_MIN_ROW_FBLOCK) {
mError("invalid db option minRowsPerFileBlock:%d valid range: [%d, %d]", pCfg->minRowsPerFileBlock, mError("invalid db option minRowsPerFileBlock:%d valid range: [%d, %d]", pCfg->minRowsPerFileBlock,
TSDB_MIN_ROWS_IN_FILEBLOCK, TSDB_MAX_ROWS_IN_FILEBLOCK); TSDB_MIN_MIN_ROW_FBLOCK, TSDB_MAX_MIN_ROW_FBLOCK);
return TSDB_CODE_INVALID_OPTION; return TSDB_CODE_INVALID_OPTION;
} }
if (pCfg->maxRowsPerFileBlock < TSDB_MIN_ROWS_IN_FILEBLOCK || pCfg->maxRowsPerFileBlock > TSDB_MAX_ROWS_IN_FILEBLOCK) { if (pCfg->maxRowsPerFileBlock < TSDB_MIN_MAX_ROW_FBLOCK || pCfg->maxRowsPerFileBlock > TSDB_MAX_MAX_ROW_FBLOCK) {
mError("invalid db option maxRowsPerFileBlock:%d valid range: [%d, %d]", pCfg->maxRowsPerFileBlock, mError("invalid db option maxRowsPerFileBlock:%d valid range: [%d, %d]", pCfg->maxRowsPerFileBlock,
TSDB_MIN_ROWS_IN_FILEBLOCK, TSDB_MAX_ROWS_IN_FILEBLOCK); TSDB_MIN_MAX_ROW_FBLOCK, TSDB_MAX_MAX_ROW_FBLOCK);
return TSDB_CODE_INVALID_OPTION; return TSDB_CODE_INVALID_OPTION;
} }
if (pCfg->maxRowsPerFileBlock < pCfg->minRowsPerFileBlock) { if (pCfg->minRowsPerFileBlock > pCfg->maxRowsPerFileBlock) {
mError("invalid db option minRowsPerFileBlock:%d maxRowsPerFileBlock:%d", pCfg->minRowsPerFileBlock, mError("invalid db option minRowsPerFileBlock:%d maxRowsPerFileBlock:%d", pCfg->minRowsPerFileBlock,
pCfg->maxRowsPerFileBlock); pCfg->maxRowsPerFileBlock);
return TSDB_CODE_INVALID_OPTION; return TSDB_CODE_INVALID_OPTION;
} }
if (pCfg->commitTime < TSDB_MIN_COMMIT_TIME_INTERVAL || pCfg->commitTime > TSDB_MAX_COMMIT_TIME_INTERVAL) { if (pCfg->commitTime < TSDB_MIN_COMMIT_TIME || pCfg->commitTime > TSDB_MAX_COMMIT_TIME) {
mError("invalid db option commitTime:%d valid range: [%d, %d]", pCfg->commitTime, TSDB_MIN_COMMIT_TIME_INTERVAL, mError("invalid db option commitTime:%d valid range: [%d, %d]", pCfg->commitTime, TSDB_MIN_COMMIT_TIME,
TSDB_MAX_COMMIT_TIME_INTERVAL); TSDB_MAX_COMMIT_TIME);
return TSDB_CODE_INVALID_OPTION; return TSDB_CODE_INVALID_OPTION;
} }
if (pCfg->precision != TSDB_TIME_PRECISION_MILLI && pCfg->precision != TSDB_TIME_PRECISION_MICRO) { if (pCfg->precision != TSDB_MIN_PRECISION && pCfg->precision != TSDB_MAX_PRECISION) {
mError("invalid db option timePrecision:%d valid value: [%d, %d]", pCfg->precision, TSDB_TIME_PRECISION_MILLI, mError("invalid db option timePrecision:%d valid value: [%d, %d]", pCfg->precision, TSDB_MIN_PRECISION,
TSDB_TIME_PRECISION_MICRO); TSDB_MAX_PRECISION);
return TSDB_CODE_INVALID_OPTION; return TSDB_CODE_INVALID_OPTION;
} }
if (pCfg->compression < TSDB_MIN_COMPRESSION_LEVEL || pCfg->compression > TSDB_MAX_COMPRESSION_LEVEL) { if (pCfg->compression < TSDB_MIN_COMP_LEVEL || pCfg->compression > TSDB_MAX_COMP_LEVEL) {
mError("invalid db option compression:%d valid range: [%d, %d]", pCfg->compression, TSDB_MIN_COMPRESSION_LEVEL, mError("invalid db option compression:%d valid range: [%d, %d]", pCfg->compression, TSDB_MIN_COMP_LEVEL,
TSDB_MAX_COMPRESSION_LEVEL); TSDB_MAX_COMP_LEVEL);
return TSDB_CODE_INVALID_OPTION; return TSDB_CODE_INVALID_OPTION;
} }
if (pCfg->commitLog < 0 || pCfg->commitLog > 2) { if (pCfg->commitLog < TSDB_MIN_CLOG_LEVEL || pCfg->commitLog > TSDB_MAX_CLOG_LEVEL) {
mError("invalid db option commitLog:%d, only 0-2 allowed", pCfg->commitLog); mError("invalid db option commitLog:%d, only 0-2 allowed", pCfg->commitLog);
return TSDB_CODE_INVALID_OPTION; return TSDB_CODE_INVALID_OPTION;
} }
if (pCfg->replications < TSDB_REPLICA_MIN_NUM || pCfg->replications > TSDB_REPLICA_MAX_NUM) { if (pCfg->replications < TSDB_MIN_REPLICA_NUM || pCfg->replications > TSDB_MAX_REPLICA_NUM) {
mError("invalid db option replications:%d valid range: [%d, %d]", pCfg->replications, TSDB_REPLICA_MIN_NUM, mError("invalid db option replications:%d valid range: [%d, %d]", pCfg->replications, TSDB_MIN_REPLICA_NUM,
TSDB_REPLICA_MAX_NUM); TSDB_MAX_REPLICA_NUM);
return TSDB_CODE_INVALID_OPTION; return TSDB_CODE_INVALID_OPTION;
} }
...@@ -256,14 +260,15 @@ static int32_t mgmtCheckDbCfg(SDbCfg *pCfg) { ...@@ -256,14 +260,15 @@ static int32_t mgmtCheckDbCfg(SDbCfg *pCfg) {
} }
static void mgmtSetDefaultDbCfg(SDbCfg *pCfg) { static void mgmtSetDefaultDbCfg(SDbCfg *pCfg) {
if (pCfg->maxCacheSize < 0) pCfg->maxCacheSize = tsMaxCacheSize; if (pCfg->cacheBlockSize < 0) pCfg->cacheBlockSize = tsCacheBlockSize;
if (pCfg->maxTables < 0) pCfg->maxTables = tsSessionsPerVnode; if (pCfg->totalBlocks < 0) pCfg->totalBlocks = tsTotalBlocks;
if (pCfg->maxTables < 0) pCfg->maxTables = tsTablesPerVnode;
if (pCfg->daysPerFile < 0) pCfg->daysPerFile = tsDaysPerFile; if (pCfg->daysPerFile < 0) pCfg->daysPerFile = tsDaysPerFile;
if (pCfg->daysToKeep < 0) pCfg->daysToKeep = tsDaysToKeep; if (pCfg->daysToKeep < 0) pCfg->daysToKeep = tsDaysToKeep;
if (pCfg->daysToKeep1 < 0) pCfg->daysToKeep1 = pCfg->daysToKeep; if (pCfg->daysToKeep1 < 0) pCfg->daysToKeep1 = pCfg->daysToKeep;
if (pCfg->daysToKeep2 < 0) pCfg->daysToKeep2 = pCfg->daysToKeep; if (pCfg->daysToKeep2 < 0) pCfg->daysToKeep2 = pCfg->daysToKeep;
if (pCfg->minRowsPerFileBlock < 0) pCfg->minRowsPerFileBlock = tsRowsInFileBlock; if (pCfg->minRowsPerFileBlock < 0) pCfg->minRowsPerFileBlock = tsMinRowsInFileBlock;
if (pCfg->maxRowsPerFileBlock < 0) pCfg->maxRowsPerFileBlock = pCfg->minRowsPerFileBlock * 2; if (pCfg->maxRowsPerFileBlock < 0) pCfg->maxRowsPerFileBlock = tsMaxRowsInFileBlock;
if (pCfg->commitTime < 0) pCfg->commitTime = tsCommitTime; if (pCfg->commitTime < 0) pCfg->commitTime = tsCommitTime;
if (pCfg->precision < 0) pCfg->precision = tsTimePrecision; if (pCfg->precision < 0) pCfg->precision = tsTimePrecision;
if (pCfg->compression < 0) pCfg->compression = tsCompression; if (pCfg->compression < 0) pCfg->compression = tsCompression;
...@@ -293,14 +298,15 @@ static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) { ...@@ -293,14 +298,15 @@ static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) {
strncpy(pDb->acct, pAcct->user, TSDB_USER_LEN); strncpy(pDb->acct, pAcct->user, TSDB_USER_LEN);
pDb->createdTime = taosGetTimestampMs(); pDb->createdTime = taosGetTimestampMs();
pDb->cfg = (SDbCfg) { pDb->cfg = (SDbCfg) {
.maxCacheSize = 64,//(int64_t)pCreate->cacheBlockSize * pCreate->cacheNumOfBlocks.totalBlocks, .cacheBlockSize = pCreate->cacheBlockSize,
.totalBlocks = pCreate->totalBlocks,
.maxTables = pCreate->maxSessions, .maxTables = pCreate->maxSessions,
.daysPerFile = pCreate->daysPerFile, .daysPerFile = pCreate->daysPerFile,
.daysToKeep = pCreate->daysToKeep, .daysToKeep = pCreate->daysToKeep,
.daysToKeep1 = pCreate->daysToKeep1, .daysToKeep1 = pCreate->daysToKeep1,
.daysToKeep2 = pCreate->daysToKeep2, .daysToKeep2 = pCreate->daysToKeep2,
.minRowsPerFileBlock = pCreate->rowsInFileBlock * 1, .minRowsPerFileBlock = pCreate->maxRowsPerFileBlock,
.maxRowsPerFileBlock = pCreate->rowsInFileBlock * 2, .maxRowsPerFileBlock = pCreate->maxRowsPerFileBlock,
.commitTime = pCreate->commitTime, .commitTime = pCreate->commitTime,
.precision = pCreate->precision, .precision = pCreate->precision,
.compression = pCreate->compression, .compression = pCreate->compression,
...@@ -459,13 +465,25 @@ static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) ...@@ -459,13 +465,25 @@ static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn)
pShow->bytes[cols] = 4; pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT; pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "rows"); strcpy(pSchema[cols].name, "cache(MB)");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "blocks");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "minrows");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = 4; pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT; pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "cache(Mb)"); strcpy(pSchema[cols].name, "maxrows");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
...@@ -492,7 +510,7 @@ static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) ...@@ -492,7 +510,7 @@ static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn)
pShow->bytes[cols] = 3; pShow->bytes[cols] = 3;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY; pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "time precision"); strcpy(pSchema[cols].name, "precision");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
...@@ -584,13 +602,21 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void * ...@@ -584,13 +602,21 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pDb->cfg.minRowsPerFileBlock; *(int32_t *)pWrite = pDb->cfg.cacheBlockSize;
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pDb->cfg.maxCacheSize; *(int32_t *)pWrite = pDb->cfg.totalBlocks;
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pDb->cfg.minRowsPerFileBlock;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pDb->cfg.maxRowsPerFileBlock;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pDb->cfg.commitTime; *(int32_t *)pWrite = pDb->cfg.commitTime;
cols++; cols++;
...@@ -664,14 +690,15 @@ static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg) { ...@@ -664,14 +690,15 @@ static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg) {
SCMCreateDbMsg *pCreate = pMsg->pCont; SCMCreateDbMsg *pCreate = pMsg->pCont;
pCreate->maxSessions = htonl(pCreate->maxSessions); pCreate->maxSessions = htonl(pCreate->maxSessions);
pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize); pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize);
pCreate->totalBlocks = htonl(pCreate->totalBlocks);
pCreate->daysPerFile = htonl(pCreate->daysPerFile); pCreate->daysPerFile = htonl(pCreate->daysPerFile);
pCreate->daysToKeep = htonl(pCreate->daysToKeep); pCreate->daysToKeep = htonl(pCreate->daysToKeep);
pCreate->daysToKeep1 = htonl(pCreate->daysToKeep1); pCreate->daysToKeep1 = htonl(pCreate->daysToKeep1);
pCreate->daysToKeep2 = htonl(pCreate->daysToKeep2); pCreate->daysToKeep2 = htonl(pCreate->daysToKeep2);
pCreate->commitTime = htonl(pCreate->commitTime); pCreate->commitTime = htonl(pCreate->commitTime);
pCreate->blocksPerTable = htons(pCreate->blocksPerTable); pCreate->minRowsPerFileBlock = htonl(pCreate->minRowsPerFileBlock);
pCreate->rowsInFileBlock = htonl(pCreate->rowsInFileBlock); pCreate->maxRowsPerFileBlock = htonl(pCreate->maxRowsPerFileBlock);
int32_t code; int32_t code;
if (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS) { if (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_GRANT_EXPIRED; code = TSDB_CODE_GRANT_EXPIRED;
...@@ -688,40 +715,66 @@ static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg) { ...@@ -688,40 +715,66 @@ static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg) {
} }
static SDbCfg mgmtGetAlterDbOption(SDbObj *pDb, SCMAlterDbMsg *pAlter) { static SDbCfg mgmtGetAlterDbOption(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
SDbCfg newCfg = pDb->cfg; SDbCfg newCfg = pDb->cfg;
int32_t daysToKeep = htonl(pAlter->daysToKeep); int32_t cacheBlockSize = htonl(pAlter->daysToKeep);
int32_t maxTables = htonl(pAlter->maxSessions); int32_t totalBlocks = htonl(pAlter->totalBlocks);
int32_t maxTables = htonl(pAlter->maxSessions);
int32_t daysToKeep = htonl(pAlter->daysToKeep);
int32_t daysToKeep1 = htonl(pAlter->daysToKeep1);
int32_t daysToKeep2 = htonl(pAlter->daysToKeep2);
int8_t compression = pAlter->compression;
int8_t replications = pAlter->replications; int8_t replications = pAlter->replications;
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
if (cacheBlockSize > 0 && cacheBlockSize != pDb->cfg.cacheBlockSize) {
mTrace("db:%s, cache:%d change to %d", pDb->name, pDb->cfg.cacheBlockSize, cacheBlockSize);
newCfg.cacheBlockSize = cacheBlockSize;
}
if (totalBlocks > 0 && totalBlocks != pDb->cfg.totalBlocks) {
mTrace("db:%s, blocks:%d change to %d", pDb->name, pDb->cfg.totalBlocks, totalBlocks);
newCfg.totalBlocks = totalBlocks;
}
if (maxTables > 0 && maxTables != pDb->cfg.maxTables) {
mTrace("db:%s, tables:%d change to %d", pDb->name, pDb->cfg.maxTables, maxTables);
newCfg.maxTables = maxTables;
if (newCfg.maxTables < pDb->cfg.maxTables) {
mTrace("db:%s, tables:%d should larger than origin:%d", pDb->name, newCfg.maxTables, pDb->cfg.maxTables);
terrno = TSDB_CODE_INVALID_OPTION;
}
}
if (daysToKeep > 0 && daysToKeep != pDb->cfg.daysToKeep) { if (daysToKeep > 0 && daysToKeep != pDb->cfg.daysToKeep) {
mTrace("db:%s, daysToKeep:%d change to %d", pDb->name, pDb->cfg.daysToKeep, daysToKeep); mTrace("db:%s, daysToKeep:%d change to %d", pDb->name, pDb->cfg.daysToKeep, daysToKeep);
newCfg.daysToKeep = daysToKeep; newCfg.daysToKeep = daysToKeep;
} }
if (daysToKeep1 > 0 && daysToKeep1 != pDb->cfg.daysToKeep1) {
mTrace("db:%s, daysToKeep1:%d change to %d", pDb->name, pDb->cfg.daysToKeep1, daysToKeep1);
newCfg.daysToKeep1 = daysToKeep1;
}
if (daysToKeep2 > 0 && daysToKeep2 != pDb->cfg.daysToKeep2) {
mTrace("db:%s, daysToKeep2:%d change to %d", pDb->name, pDb->cfg.daysToKeep2, daysToKeep2);
newCfg.daysToKeep2 = daysToKeep2;
}
if (compression > 0 && compression != pDb->cfg.compression) {
mTrace("db:%s, compression:%d change to %d", pDb->name, pDb->cfg.compression, compression);
newCfg.compression = compression;
}
if (replications > 0 && replications != pDb->cfg.replications) { if (replications > 0 && replications != pDb->cfg.replications) {
mTrace("db:%s, replica:%d change to %d", pDb->name, pDb->cfg.replications, replications); mTrace("db:%s, replications:%d change to %d", pDb->name, pDb->cfg.replications, replications);
if (replications < TSDB_REPLICA_MIN_NUM || replications > TSDB_REPLICA_MAX_NUM) {
mError("invalid db option replica: %d valid range: %d--%d", replications, TSDB_REPLICA_MIN_NUM, TSDB_REPLICA_MAX_NUM);
terrno = TSDB_CODE_INVALID_OPTION;
}
newCfg.replications = replications; newCfg.replications = replications;
} }
if (replications > mgmtGetDnodesNum()) {
if (maxTables > 0 && maxTables != pDb->cfg.maxTables) { mError("db:%s, no enough dnode to change replica:%d", pDb->name, replications);
mTrace("db:%s, tables:%d change to %d", pDb->name, pDb->cfg.maxTables, maxTables); terrno = TSDB_CODE_NO_ENOUGH_DNODES;
if (maxTables < TSDB_MIN_TABLES_PER_VNODE || maxTables > TSDB_MAX_TABLES_PER_VNODE) {
mError("invalid db option tables: %d valid range: %d--%d", maxTables, TSDB_MIN_TABLES_PER_VNODE, TSDB_MAX_TABLES_PER_VNODE);
terrno = TSDB_CODE_INVALID_OPTION;
}
if (maxTables < pDb->cfg.maxTables) {
mError("invalid db option tables: %d should larger than original:%d", maxTables, pDb->cfg.maxTables);
terrno = TSDB_CODE_INVALID_OPTION;
}
newCfg.maxTables = maxTables;
} }
return newCfg; return newCfg;
} }
...@@ -731,8 +784,16 @@ static int32_t mgmtAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) { ...@@ -731,8 +784,16 @@ static int32_t mgmtAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
return terrno; return terrno;
} }
int32_t code = mgmtCheckDbCfg(&newCfg);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int32_t oldReplica = pDb->cfg.replications;
if (memcmp(&newCfg, &pDb->cfg, sizeof(SDbCfg)) != 0) { if (memcmp(&newCfg, &pDb->cfg, sizeof(SDbCfg)) != 0) {
pDb->cfg = newCfg; pDb->cfg = newCfg;
pDb->cfgVersion++;
SSdbOper oper = { SSdbOper oper = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsDbSdb, .table = tsDbSdb,
...@@ -745,7 +806,20 @@ static int32_t mgmtAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) { ...@@ -745,7 +806,20 @@ static int32_t mgmtAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
return TSDB_CODE_SDB_ERROR; return TSDB_CODE_SDB_ERROR;
} }
} }
void *pNode = NULL;
while (1) {
SVgObj *pVgroup = NULL;
pNode = mgmtGetNextVgroup(pNode, &pVgroup);
if (pVgroup == NULL) break;
mgmtSendCreateVgroupMsg(pVgroup, NULL);
mgmtDecVgroupRef(pVgroup);
}
if (oldReplica != pDb->cfg.replications) {
balanceNotify();
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -773,16 +847,6 @@ static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) { ...@@ -773,16 +847,6 @@ static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) {
return; return;
} }
SVgObj *pVgroup = pDb->pHead;
if (pVgroup != NULL) {
mPrint("vgroup:%d, will be altered", pVgroup->vgId);
SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg);
newMsg->ahandle = pVgroup;
newMsg->expected = pVgroup->numOfVnodes;
mgmtAlterVgroup(pVgroup, newMsg);
return;
}
mTrace("db:%s, all vgroups is altered", pDb->name); mTrace("db:%s, all vgroups is altered", pDb->name);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS);
} }
......
...@@ -336,13 +336,14 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { ...@@ -336,13 +336,14 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
if (pStatus->dnodeId == 0) { if (pStatus->dnodeId == 0) {
mTrace("dnode:%d, first access, privateIp:%s, name:%s", pDnode->dnodeId, taosIpStr(pDnode->privateIp), pDnode->dnodeName); mTrace("dnode:%d, first access, privateIp:%s, name:%s", pDnode->dnodeId, taosIpStr(pDnode->privateIp), pDnode->dnodeName);
} else { } else {
//mTrace("dnode:%d, status received, access times %d", pDnode->dnodeId, pDnode->lastAccess); mTrace("dnode:%d, status received, access times %d", pDnode->dnodeId, pDnode->lastAccess);
} }
int32_t openVnodes = htons(pStatus->openVnodes); int32_t openVnodes = htons(pStatus->openVnodes);
for (int32_t j = 0; j < openVnodes; ++j) { for (int32_t j = 0; j < openVnodes; ++j) {
SVnodeLoad *pVload = &pStatus->load[j]; SVnodeLoad *pVload = &pStatus->load[j];
pVload->vgId = htonl(pVload->vgId); pVload->vgId = htonl(pVload->vgId);
pVload->cfgVersion = htonl(pVload->cfgVersion);
SVgObj *pVgroup = mgmtGetVgroup(pVload->vgId); SVgObj *pVgroup = mgmtGetVgroup(pVload->vgId);
if (pVgroup == NULL) { if (pVgroup == NULL) {
......
...@@ -78,6 +78,8 @@ static void mgmtGetChildTableMeta(SQueuedMsg *pMsg); ...@@ -78,6 +78,8 @@ static void mgmtGetChildTableMeta(SQueuedMsg *pMsg);
static void mgmtProcessAlterTableMsg(SQueuedMsg *queueMsg); static void mgmtProcessAlterTableMsg(SQueuedMsg *queueMsg);
static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg); static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg);
static int32_t mgmtFindSuperTableColumnIndex(SSuperTableObj *pStable, char *colName);
static void mgmtDestroyChildTable(SChildTableObj *pTable) { static void mgmtDestroyChildTable(SChildTableObj *pTable) {
tfree(pTable->schema); tfree(pTable->schema);
tfree(pTable->sql); tfree(pTable->sql);
...@@ -756,8 +758,26 @@ static void mgmtProcessCreateSuperTableMsg(SQueuedMsg *pMsg) { ...@@ -756,8 +758,26 @@ static void mgmtProcessCreateSuperTableMsg(SQueuedMsg *pMsg) {
static void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg) { static void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg) {
SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable;
if (pStable->numOfTables != 0) { if (pStable->numOfTables != 0) {
mError("stable:%s, numOfTables:%d not 0", pStable->info.tableId, pStable->numOfTables); mgmtDropAllChildTablesInStable(pStable);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_OTHERS); for (int32_t vg = 0; vg < pStable->vgLen; ++vg) {
int32_t vgId = pStable->vgList[vg];
if (vgId == 0) break;
SMDDropSTableMsg *pDrop = rpcMallocCont(sizeof(SMDDropSTableMsg));
pDrop->vgId = htonl(vgId);
pDrop->uid = htobe64(pStable->uid);
mgmtExtractTableName(pStable->info.tableId, pDrop->tableId);
SVgObj *pVgroup = mgmtGetVgroup(vgId);
if (pVgroup != NULL) {
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
SRpcMsg rpcMsg = {.pCont = pDrop, .contLen = sizeof(SMDDropSTableMsg), .msgType = TSDB_MSG_TYPE_MD_DROP_STABLE};
mgmtSendMsgToDnode(&ipSet, &rpcMsg);
mgmtDecVgroupRef(pVgroup);
}
}
//mError("stable:%s, numOfTables:%d not 0", pStable->info.tableId, pStable->numOfTables);
//mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_OTHERS);
} else { } else {
SSdbOper oper = { SSdbOper oper = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
...@@ -783,31 +803,33 @@ static int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pStable, const char *t ...@@ -783,31 +803,33 @@ static int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pStable, const char *t
static int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], int32_t ntags) { static int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], int32_t ntags) {
if (pStable->numOfTags + ntags > TSDB_MAX_TAGS) { if (pStable->numOfTags + ntags > TSDB_MAX_TAGS) {
return TSDB_CODE_APP_ERROR; mError("stable:%s, add tag, too many tags", pStable->info.tableId);
return TSDB_CODE_TOO_MANY_TAGS;
} }
// check if schemas have the same name for (int32_t i = 0; i < ntags; i++) {
for (int32_t i = 1; i < ntags; i++) { if (mgmtFindSuperTableColumnIndex(pStable, schema[i].name) > 0) {
for (int32_t j = 0; j < i; j++) { mError("stable:%s, add tag, column:%s already exist", pStable->info.tableId, schema[i].name);
if (strcasecmp(schema[i].name, schema[j].name) == 0) { return TSDB_CODE_TAG_ALREAY_EXIST;
return TSDB_CODE_APP_ERROR; }
}
if (mgmtFindSuperTableTagIndex(pStable, schema[i].name) > 0) {
mError("stable:%s, add tag, tag:%s already exist", pStable->info.tableId, schema[i].name);
return TSDB_CODE_FIELD_ALREAY_EXIST;
} }
} }
int32_t schemaSize = sizeof(SSchema) * (pStable->numOfTags + pStable->numOfColumns); int32_t schemaSize = sizeof(SSchema) * (pStable->numOfTags + pStable->numOfColumns);
pStable->schema = realloc(pStable->schema, schemaSize + sizeof(SSchema) * ntags); pStable->schema = realloc(pStable->schema, schemaSize + sizeof(SSchema) * ntags);
memmove(pStable->schema + sizeof(SSchema) * (pStable->numOfColumns + ntags), memcpy(pStable->schema + pStable->numOfColumns + pStable->numOfTags, schema, sizeof(SSchema) * ntags);
pStable->schema + sizeof(SSchema) * pStable->numOfColumns, sizeof(SSchema) * pStable->numOfTags);
memcpy(pStable->schema + sizeof(SSchema) * pStable->numOfColumns, schema, sizeof(SSchema) * ntags);
SSchema *tschema = (SSchema *) (pStable->schema + sizeof(SSchema) * pStable->numOfColumns); SSchema *tschema = (SSchema *)(pStable->schema + pStable->numOfColumns + pStable->numOfTags);
for (int32_t i = 0; i < ntags; i++) { for (int32_t i = 0; i < ntags; i++) {
tschema[i].colId = pStable->nextColId++; tschema[i].colId = pStable->nextColId++;
} }
pStable->numOfColumns += ntags; pStable->numOfTags += ntags;
pStable->sversion++; pStable->sversion++;
SSdbOper oper = { SSdbOper oper = {
...@@ -822,25 +844,22 @@ static int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], i ...@@ -822,25 +844,22 @@ static int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], i
return TSDB_CODE_SDB_ERROR; return TSDB_CODE_SDB_ERROR;
} }
mPrint("table %s, succeed to add tag %s", pStable->info.tableId, schema[0].name); mPrint("stable %s, succeed to add tag %s", pStable->info.tableId, schema[0].name);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mgmtDropSuperTableTag(SSuperTableObj *pStable, char *tagName) { static int32_t mgmtDropSuperTableTag(SSuperTableObj *pStable, char *tagName) {
int32_t col = mgmtFindSuperTableTagIndex(pStable, tagName); int32_t col = mgmtFindSuperTableTagIndex(pStable, tagName);
if (col <= 0 || col >= pStable->numOfTags) { if (col < 0) {
return TSDB_CODE_APP_ERROR; mError("stable:%s, drop tag, tag:%s not exist", pStable->info.tableId, tagName);
return TSDB_CODE_TAG_NOT_EXIST;
} }
memmove(pStable->schema + sizeof(SSchema) * col, pStable->schema + sizeof(SSchema) * (col + 1), memmove(pStable->schema + pStable->numOfColumns + col, pStable->schema + pStable->numOfColumns + col + 1,
sizeof(SSchema) * (pStable->numOfColumns + pStable->numOfTags - col - 1)); sizeof(SSchema) * (pStable->numOfTags - col - 1));
pStable->numOfTags--; pStable->numOfTags--;
pStable->sversion++; pStable->sversion++;
int32_t schemaSize = sizeof(SSchema) * (pStable->numOfTags + pStable->numOfColumns);
pStable->schema = realloc(pStable->schema, schemaSize);
SSdbOper oper = { SSdbOper oper = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsSuperTableSdb, .table = tsSuperTableSdb,
...@@ -853,27 +872,29 @@ static int32_t mgmtDropSuperTableTag(SSuperTableObj *pStable, char *tagName) { ...@@ -853,27 +872,29 @@ static int32_t mgmtDropSuperTableTag(SSuperTableObj *pStable, char *tagName) {
return TSDB_CODE_SDB_ERROR; return TSDB_CODE_SDB_ERROR;
} }
mPrint("table %s, succeed to drop tag %s", pStable->info.tableId, tagName); mPrint("stable %s, succeed to drop tag %s", pStable->info.tableId, tagName);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mgmtModifySuperTableTagName(SSuperTableObj *pStable, char *oldTagName, char *newTagName) { static int32_t mgmtModifySuperTableTagName(SSuperTableObj *pStable, char *oldTagName, char *newTagName) {
int32_t col = mgmtFindSuperTableTagIndex(pStable, oldTagName); int32_t col = mgmtFindSuperTableTagIndex(pStable, oldTagName);
if (col < 0) { if (col < 0) {
// Tag name does not exist mError("stable:%s, failed to modify table tag, oldName: %s, newName: %s", pStable->info.tableId, oldTagName, newTagName);
mError("table:%s, failed to modify table tag, oldName: %s, newName: %s", pStable->info.tableId, oldTagName, newTagName); return TSDB_CODE_TAG_NOT_EXIST;
return TSDB_CODE_INVALID_MSG_TYPE;
} }
// int32_t rowSize = 0; // int32_t rowSize = 0;
uint32_t len = strlen(newTagName); uint32_t len = strlen(newTagName);
if (len >= TSDB_COL_NAME_LEN) {
return TSDB_CODE_COL_NAME_TOO_LONG;
}
if (col >= pStable->numOfTags || len >= TSDB_COL_NAME_LEN || mgmtFindSuperTableTagIndex(pStable, newTagName) >= 0) { if (mgmtFindSuperTableTagIndex(pStable, newTagName) >= 0) {
return TSDB_CODE_APP_ERROR; return TSDB_CODE_TAG_ALREAY_EXIST;
} }
// update // update
SSchema *schema = (SSchema *) (pStable->schema + (pStable->numOfColumns + col) * sizeof(SSchema)); SSchema *schema = (SSchema *) (pStable->schema + pStable->numOfColumns + col);
strncpy(schema->name, newTagName, TSDB_COL_NAME_LEN); strncpy(schema->name, newTagName, TSDB_COL_NAME_LEN);
SSdbOper oper = { SSdbOper oper = {
...@@ -888,15 +909,15 @@ static int32_t mgmtModifySuperTableTagName(SSuperTableObj *pStable, char *oldTag ...@@ -888,15 +909,15 @@ static int32_t mgmtModifySuperTableTagName(SSuperTableObj *pStable, char *oldTag
return TSDB_CODE_SDB_ERROR; return TSDB_CODE_SDB_ERROR;
} }
mPrint("table %s, succeed to modify tag %s to %s", pStable->info.tableId, oldTagName, newTagName); mPrint("stable %s, succeed to modify tag %s to %s", pStable->info.tableId, oldTagName, newTagName);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mgmtFindSuperTableColumnIndex(SSuperTableObj *pStable, char *colName) { static int32_t mgmtFindSuperTableColumnIndex(SSuperTableObj *pStable, char *colName) {
SSchema *schema = (SSchema *) pStable->schema; SSchema *schema = (SSchema *) pStable->schema;
for (int32_t i = 0; i < pStable->numOfColumns; i++) { for (int32_t col = 0; col < pStable->numOfColumns; col++) {
if (strcasecmp(schema[i].name, colName) == 0) { if (strcasecmp(schema[col].name, colName) == 0) {
return i; return col;
} }
} }
...@@ -905,21 +926,28 @@ static int32_t mgmtFindSuperTableColumnIndex(SSuperTableObj *pStable, char *colN ...@@ -905,21 +926,28 @@ static int32_t mgmtFindSuperTableColumnIndex(SSuperTableObj *pStable, char *colN
static int32_t mgmtAddSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, SSchema schema[], int32_t ncols) { static int32_t mgmtAddSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, SSchema schema[], int32_t ncols) {
if (ncols <= 0) { if (ncols <= 0) {
mError("stable:%s, add column, ncols:%d <= 0", pStable->info.tableId);
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
for (int32_t i = 0; i < ncols; i++) { for (int32_t i = 0; i < ncols; i++) {
if (mgmtFindSuperTableColumnIndex(pStable, schema[i].name) > 0) { if (mgmtFindSuperTableColumnIndex(pStable, schema[i].name) > 0) {
return TSDB_CODE_APP_ERROR; mError("stable:%s, add column, column:%s already exist", pStable->info.tableId, schema[i].name);
return TSDB_CODE_FIELD_ALREAY_EXIST;
}
if (mgmtFindSuperTableTagIndex(pStable, schema[i].name) > 0) {
mError("stable:%s, add column, tag:%s already exist", pStable->info.tableId, schema[i].name);
return TSDB_CODE_TAG_ALREAY_EXIST;
} }
} }
int32_t schemaSize = sizeof(SSchema) * (pStable->numOfTags + pStable->numOfColumns); int32_t schemaSize = sizeof(SSchema) * (pStable->numOfTags + pStable->numOfColumns);
pStable->schema = realloc(pStable->schema, schemaSize + sizeof(SSchema) * ncols); pStable->schema = realloc(pStable->schema, schemaSize + sizeof(SSchema) * ncols);
memmove(pStable->schema + sizeof(SSchema) * (pStable->numOfColumns + ncols), memmove(pStable->schema + pStable->numOfColumns + ncols, pStable->schema + pStable->numOfColumns,
pStable->schema + sizeof(SSchema) * pStable->numOfColumns, sizeof(SSchema) * pStable->numOfTags); sizeof(SSchema) * pStable->numOfTags);
memcpy(pStable->schema + sizeof(SSchema) * pStable->numOfColumns, schema, sizeof(SSchema) * ncols); memcpy(pStable->schema + pStable->numOfColumns, schema, sizeof(SSchema) * ncols);
SSchema *tschema = (SSchema *) (pStable->schema + sizeof(SSchema) * pStable->numOfColumns); SSchema *tschema = (SSchema *) (pStable->schema + sizeof(SSchema) * pStable->numOfColumns);
for (int32_t i = 0; i < ncols; i++) { for (int32_t i = 0; i < ncols; i++) {
...@@ -947,17 +975,18 @@ static int32_t mgmtAddSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, SSc ...@@ -947,17 +975,18 @@ static int32_t mgmtAddSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, SSc
return TSDB_CODE_SDB_ERROR; return TSDB_CODE_SDB_ERROR;
} }
mPrint("table %s, succeed to add column", pStable->info.tableId); mPrint("stable %s, succeed to add column", pStable->info.tableId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mgmtDropSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, char *colName) { static int32_t mgmtDropSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, char *colName) {
int32_t col = mgmtFindSuperTableColumnIndex(pStable, colName); int32_t col = mgmtFindSuperTableColumnIndex(pStable, colName);
if (col < 0) { if (col <= 0) {
return TSDB_CODE_APP_ERROR; mError("stable:%s, drop column, column:%s not exist", pStable->info.tableId, colName);
return TSDB_CODE_FIELD_NOT_EXIST;
} }
memmove(pStable->schema + sizeof(SSchema) * col, pStable->schema + sizeof(SSchema) * (col + 1), memmove(pStable->schema + col, pStable->schema + col + 1,
sizeof(SSchema) * (pStable->numOfColumns + pStable->numOfTags - col - 1)); sizeof(SSchema) * (pStable->numOfColumns + pStable->numOfTags - col - 1));
pStable->numOfColumns--; pStable->numOfColumns--;
...@@ -984,7 +1013,7 @@ static int32_t mgmtDropSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, ch ...@@ -984,7 +1013,7 @@ static int32_t mgmtDropSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, ch
return TSDB_CODE_SDB_ERROR; return TSDB_CODE_SDB_ERROR;
} }
mPrint("table %s, succeed to delete column", pStable->info.tableId); mPrint("stable %s, succeed to delete column", pStable->info.tableId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1472,9 +1501,9 @@ static int32_t mgmtModifyChildTableTagValue(SChildTableObj *pTable, char *tagNam ...@@ -1472,9 +1501,9 @@ static int32_t mgmtModifyChildTableTagValue(SChildTableObj *pTable, char *tagNam
static int32_t mgmtFindNormalTableColumnIndex(SChildTableObj *pTable, char *colName) { static int32_t mgmtFindNormalTableColumnIndex(SChildTableObj *pTable, char *colName) {
SSchema *schema = (SSchema *) pTable->schema; SSchema *schema = (SSchema *) pTable->schema;
for (int32_t i = 0; i < pTable->numOfColumns; i++) { for (int32_t col = 0; col < pTable->numOfColumns; col++) {
if (strcasecmp(schema[i].name, colName) == 0) { if (strcasecmp(schema[col].name, colName) == 0) {
return i; return col;
} }
} }
...@@ -1483,21 +1512,23 @@ static int32_t mgmtFindNormalTableColumnIndex(SChildTableObj *pTable, char *colN ...@@ -1483,21 +1512,23 @@ static int32_t mgmtFindNormalTableColumnIndex(SChildTableObj *pTable, char *colN
static int32_t mgmtAddNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, SSchema schema[], int32_t ncols) { static int32_t mgmtAddNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, SSchema schema[], int32_t ncols) {
if (ncols <= 0) { if (ncols <= 0) {
mError("table:%s, add column, ncols:%d <= 0", pTable->info.tableId);
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
for (int32_t i = 0; i < ncols; i++) { for (int32_t i = 0; i < ncols; i++) {
if (mgmtFindNormalTableColumnIndex(pTable, schema[i].name) > 0) { if (mgmtFindNormalTableColumnIndex(pTable, schema[i].name) > 0) {
return TSDB_CODE_APP_ERROR; mError("table:%s, add column, column:%s already exist", pTable->info.tableId, schema[i].name);
return TSDB_CODE_FIELD_ALREAY_EXIST;
} }
} }
int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema);
pTable->schema = realloc(pTable->schema, schemaSize + sizeof(SSchema) * ncols); pTable->schema = realloc(pTable->schema, schemaSize + sizeof(SSchema) * ncols);
memcpy(pTable->schema + schemaSize, schema, sizeof(SSchema) * ncols); memcpy(pTable->schema + pTable->numOfColumns, schema, sizeof(SSchema) * ncols);
SSchema *tschema = (SSchema *) (pTable->schema + sizeof(SSchema) * pTable->numOfColumns); SSchema *tschema = (SSchema *) (pTable->schema + pTable->numOfColumns);
for (int32_t i = 0; i < ncols; i++) { for (int32_t i = 0; i < ncols; i++) {
tschema[i].colId = pTable->nextColId++; tschema[i].colId = pTable->nextColId++;
} }
...@@ -1507,7 +1538,7 @@ static int32_t mgmtAddNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, SSc ...@@ -1507,7 +1538,7 @@ static int32_t mgmtAddNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, SSc
SAcctObj *pAcct = mgmtGetAcct(pDb->acct); SAcctObj *pAcct = mgmtGetAcct(pDb->acct);
if (pAcct != NULL) { if (pAcct != NULL) {
pAcct->acctInfo.numOfTimeSeries += ncols; pAcct->acctInfo.numOfTimeSeries += ncols;
mgmtDecAcctRef(pAcct); mgmtDecAcctRef(pAcct);
} }
...@@ -1529,13 +1560,12 @@ static int32_t mgmtAddNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, SSc ...@@ -1529,13 +1560,12 @@ static int32_t mgmtAddNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, SSc
static int32_t mgmtDropNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, char *colName) { static int32_t mgmtDropNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, char *colName) {
int32_t col = mgmtFindNormalTableColumnIndex(pTable, colName); int32_t col = mgmtFindNormalTableColumnIndex(pTable, colName);
if (col < 0) { if (col <= 0) {
return TSDB_CODE_APP_ERROR; mError("table:%s, drop column, column:%s not exist", pTable->info.tableId, colName);
return TSDB_CODE_FIELD_NOT_EXIST;
} }
memmove(pTable->schema + sizeof(SSchema) * col, pTable->schema + sizeof(SSchema) * (col + 1), memmove(pTable->schema + col, pTable->schema + col + 1, sizeof(SSchema) * (pTable->numOfColumns - col - 1));
sizeof(SSchema) * (pTable->numOfColumns - col - 1));
pTable->numOfColumns--; pTable->numOfColumns--;
pTable->sversion++; pTable->sversion++;
...@@ -1557,7 +1587,7 @@ static int32_t mgmtDropNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, ch ...@@ -1557,7 +1587,7 @@ static int32_t mgmtDropNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, ch
return TSDB_CODE_SDB_ERROR; return TSDB_CODE_SDB_ERROR;
} }
mPrint("table %s, succeed to add column %s", pTable->info.tableId, colName); mPrint("table %s, succeed to drop column %s", pTable->info.tableId, colName);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2078,7 +2108,8 @@ static void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) { ...@@ -2078,7 +2108,8 @@ static void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) {
return; return;
} }
pAlter->numOfCols = htons(pAlter->numOfCols); pAlter->type = htons(pAlter->type);
if (pAlter->numOfCols > 2) { if (pAlter->numOfCols > 2) {
mError("table:%s, error numOfCols:%d in alter table", pAlter->tableId, pAlter->numOfCols); mError("table:%s, error numOfCols:%d in alter table", pAlter->tableId, pAlter->numOfCols);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_APP_ERROR); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_APP_ERROR);
......
...@@ -272,8 +272,9 @@ void mgmtUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVlo ...@@ -272,8 +272,9 @@ void mgmtUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVlo
pVgroup->pointsWritten = htobe64(pVload->pointsWritten); pVgroup->pointsWritten = htobe64(pVload->pointsWritten);
} }
if (pVload->replica != pVgroup->numOfVnodes) { if (pVload->cfgVersion != pVgroup->pDb->cfgVersion || pVload->replica != pVgroup->numOfVnodes) {
mError("dnode:%d, vgroup:%d replica:%d not match with mgmt:%d", pDnode->dnodeId, pVload->vgId, pVload->replica, mError("dnode:%d, vgroup:%d, vnode cfgVersion:%d repica:%d not match with mgmt cfgVersion:%d replica:%d",
pDnode->dnodeId, pVload->vgId, pVload->cfgVersion, pVload->replica, pVgroup->pDb->cfgVersion,
pVgroup->numOfVnodes); pVgroup->numOfVnodes);
mgmtSendCreateVgroupMsg(pVgroup, NULL); mgmtSendCreateVgroupMsg(pVgroup, NULL);
} }
...@@ -535,23 +536,22 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) { ...@@ -535,23 +536,22 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) {
SMDVnodeCfg *pCfg = &pVnode->cfg; SMDVnodeCfg *pCfg = &pVnode->cfg;
pCfg->vgId = htonl(pVgroup->vgId); pCfg->vgId = htonl(pVgroup->vgId);
pCfg->cfgVersion = htonl(pDb->cfgVersion);
pCfg->cacheBlockSize = htonl(pDb->cfg.cacheBlockSize);
pCfg->totalBlocks = htonl(pDb->cfg.totalBlocks);
pCfg->maxTables = htonl(pDb->cfg.maxTables); pCfg->maxTables = htonl(pDb->cfg.maxTables);
pCfg->maxCacheSize = htobe64(pDb->cfg.maxCacheSize);
pCfg->maxCacheSize = htobe64(-1); //TODO
pCfg->minRowsPerFileBlock = htonl(-1); //TODO
pCfg->maxRowsPerFileBlock = htonl(-1); //TODO
pCfg->daysPerFile = htonl(pDb->cfg.daysPerFile); pCfg->daysPerFile = htonl(pDb->cfg.daysPerFile);
pCfg->daysToKeep1 = htonl(pDb->cfg.daysToKeep1);
pCfg->daysToKeep2 = htonl(pDb->cfg.daysToKeep2);
pCfg->daysToKeep = htonl(pDb->cfg.daysToKeep); pCfg->daysToKeep = htonl(pDb->cfg.daysToKeep);
pCfg->daysToKeep = htonl(-1); //TODO pCfg->daysToKeep1 = htonl(pDb->cfg.daysToKeep1);
pCfg->daysToKeep2 = htonl(pDb->cfg.daysToKeep2);
pCfg->minRowsPerFileBlock = htonl(pDb->cfg.minRowsPerFileBlock);
pCfg->maxRowsPerFileBlock = htonl(pDb->cfg.maxRowsPerFileBlock);
pCfg->commitTime = htonl(pDb->cfg.commitTime); pCfg->commitTime = htonl(pDb->cfg.commitTime);
pCfg->precision = pDb->cfg.precision; pCfg->precision = pDb->cfg.precision;
pCfg->compression = pDb->cfg.compression; pCfg->compression = pDb->cfg.compression;
pCfg->compression = -1;
pCfg->wals = 3;
pCfg->commitLog = pDb->cfg.commitLog; pCfg->commitLog = pDb->cfg.commitLog;
pCfg->replications = (int8_t) pVgroup->numOfVnodes; pCfg->replications = (int8_t) pVgroup->numOfVnodes;
pCfg->wals = 3;
pCfg->quorum = 1; pCfg->quorum = 1;
SMDVnodeDesc *pNodes = pVnode->nodes; SMDVnodeDesc *pNodes = pVnode->nodes;
...@@ -771,15 +771,3 @@ void mgmtDropAllVgroups(SDbObj *pDropDb) { ...@@ -771,15 +771,3 @@ void mgmtDropAllVgroups(SDbObj *pDropDb) {
mPrint("db:%s, all vgroups:%d is dropped from sdb", pDropDb->name, numOfVgroups); mPrint("db:%s, all vgroups:%d is dropped from sdb", pDropDb->name, numOfVgroups);
} }
void mgmtAlterVgroup(SVgObj *pVgroup, void *ahandle) {
assert(ahandle != NULL);
if (pVgroup->numOfVnodes != pVgroup->pDb->cfg.replications) {
// TODO:
// mgmtSendAlterVgroupMsg(pVgroup, NULL);
} else {
mgmtAddToShellQueue(ahandle);
}
}
...@@ -132,9 +132,6 @@ STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId); ...@@ -132,9 +132,6 @@ STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId);
STable *tsdbGetTableByUid(STsdbMeta *pMeta, int64_t uid); STable *tsdbGetTableByUid(STsdbMeta *pMeta, int64_t uid);
char * getTupleKey(const void *data); char * getTupleKey(const void *data);
// ------------------------------ TSDB CACHE INTERFACES ------------------------------
#define TSDB_DEFAULT_CACHE_BLOCK_SIZE 16 * 1024 * 1024 /* 16M */
typedef struct { typedef struct {
int blockId; int blockId;
int offset; int offset;
......
...@@ -26,6 +26,9 @@ STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize, TsdbRepoT *pRepo) { ...@@ -26,6 +26,9 @@ STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize, TsdbRepoT *pRepo) {
if (pCache == NULL) return NULL; if (pCache == NULL) return NULL;
if (cacheBlockSize < 0) cacheBlockSize = TSDB_DEFAULT_CACHE_BLOCK_SIZE; if (cacheBlockSize < 0) cacheBlockSize = TSDB_DEFAULT_CACHE_BLOCK_SIZE;
cacheBlockSize *= (1024 * 1024);
if (maxBytes < 0) maxBytes = cacheBlockSize * TSDB_DEFAULT_TOTAL_BLOCKS;
pCache->maxBytes = maxBytes; pCache->maxBytes = maxBytes;
pCache->cacheBlockSize = cacheBlockSize; pCache->cacheBlockSize = cacheBlockSize;
......
#include "os.h" #include "os.h"
#include "taosdef.h"
#include "tulog.h" #include "tulog.h"
#include "talgo.h" #include "talgo.h"
#include "tsdb.h" #include "tsdb.h"
...@@ -11,24 +12,6 @@ ...@@ -11,24 +12,6 @@
#define IS_VALID_COMPRESSION(compression) (((compression) >= NO_COMPRESSION) && ((compression) <= TWO_STAGE_COMP)) #define IS_VALID_COMPRESSION(compression) (((compression) >= NO_COMPRESSION) && ((compression) <= TWO_STAGE_COMP))
#define TSDB_MIN_ID 0 #define TSDB_MIN_ID 0
#define TSDB_MAX_ID INT_MAX #define TSDB_MAX_ID INT_MAX
#define TSDB_MIN_TABLES 4
#define TSDB_MAX_TABLES 100000
#define TSDB_DEFAULT_TABLES 1000
#define TSDB_DEFAULT_DAYS_PER_FILE 10
#define TSDB_MIN_DAYS_PER_FILE 1
#define TSDB_MAX_DAYS_PER_FILE 60
#define TSDB_DEFAULT_MIN_ROW_FBLOCK 100
#define TSDB_MIN_MIN_ROW_FBLOCK 10
#define TSDB_MAX_MIN_ROW_FBLOCK 1000
#define TSDB_DEFAULT_MAX_ROW_FBLOCK 4096
#define TSDB_MIN_MAX_ROW_FBLOCK 200
#define TSDB_MAX_MAX_ROW_FBLOCK 10000
#define TSDB_DEFAULT_KEEP 3650
#define TSDB_MIN_KEEP 1
#define TSDB_MAX_KEEP INT_MAX
#define TSDB_DEFAULT_CACHE_SIZE (16 * 1024 * 1024) // 16M
#define TSDB_MIN_CACHE_SIZE (4 * 1024 * 1024) // 4M
#define TSDB_MAX_CACHE_SIZE (1024 * 1024 * 1024) // 1G
#define TSDB_CFG_FILE_NAME "CONFIG" #define TSDB_CFG_FILE_NAME "CONFIG"
#define TSDB_DATA_DIR_NAME "data" #define TSDB_DATA_DIR_NAME "data"
...@@ -70,7 +53,6 @@ void tsdbSetDefaultCfg(STsdbCfg *pCfg) { ...@@ -70,7 +53,6 @@ void tsdbSetDefaultCfg(STsdbCfg *pCfg) {
pCfg->minRowsPerFileBlock = -1; pCfg->minRowsPerFileBlock = -1;
pCfg->maxRowsPerFileBlock = -1; pCfg->maxRowsPerFileBlock = -1;
pCfg->keep = -1; pCfg->keep = -1;
pCfg->maxCacheSize = -1;
pCfg->compression = TWO_STAGE_COMP; pCfg->compression = TWO_STAGE_COMP;
} }
...@@ -220,7 +202,7 @@ TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) { ...@@ -220,7 +202,7 @@ TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) {
return NULL; return NULL;
} }
pRepo->tsdbCache = tsdbInitCache(pRepo->config.maxCacheSize, -1, (TsdbRepoT *)pRepo); pRepo->tsdbCache = tsdbInitCache(-1, -1, (TsdbRepoT *)pRepo);
if (pRepo->tsdbCache == NULL) { if (pRepo->tsdbCache == NULL) {
tsdbFreeMeta(pRepo->tsdbMeta); tsdbFreeMeta(pRepo->tsdbMeta);
free(pRepo->rootDir); free(pRepo->rootDir);
...@@ -650,13 +632,6 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { ...@@ -650,13 +632,6 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
if (pCfg->keep < TSDB_MIN_KEEP || pCfg->keep > TSDB_MAX_KEEP) return -1; if (pCfg->keep < TSDB_MIN_KEEP || pCfg->keep > TSDB_MAX_KEEP) return -1;
} }
// Check maxCacheSize
if (pCfg->maxCacheSize == -1) {
pCfg->maxCacheSize = TSDB_DEFAULT_CACHE_SIZE;
} else {
if (pCfg->maxCacheSize < TSDB_MIN_CACHE_SIZE || pCfg->maxCacheSize > TSDB_MAX_CACHE_SIZE) return -1;
}
return 0; return 0;
} }
......
...@@ -36,6 +36,7 @@ typedef struct { ...@@ -36,6 +36,7 @@ typedef struct {
void *sync; void *sync;
void *events; void *events;
void *cq; // continuous query void *cq; // continuous query
int32_t cfgVersion;
STsdbCfg tsdbCfg; STsdbCfg tsdbCfg;
SSyncCfg syncCfg; SSyncCfg syncCfg;
SWalCfg walCfg; SWalCfg walCfg;
......
...@@ -104,8 +104,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -104,8 +104,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
tsdbCfg.minRowsPerFileBlock = pVnodeCfg->cfg.minRowsPerFileBlock; tsdbCfg.minRowsPerFileBlock = pVnodeCfg->cfg.minRowsPerFileBlock;
tsdbCfg.maxRowsPerFileBlock = pVnodeCfg->cfg.maxRowsPerFileBlock; tsdbCfg.maxRowsPerFileBlock = pVnodeCfg->cfg.maxRowsPerFileBlock;
tsdbCfg.keep = pVnodeCfg->cfg.daysToKeep; tsdbCfg.keep = pVnodeCfg->cfg.daysToKeep;
tsdbCfg.maxCacheSize = pVnodeCfg->cfg.maxCacheSize;
char tsdbDir[TSDB_FILENAME_LEN] = {0}; char tsdbDir[TSDB_FILENAME_LEN] = {0};
sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId); sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);
code = tsdbCreateRepo(tsdbDir, &tsdbCfg, NULL); code = tsdbCreateRepo(tsdbDir, &tsdbCfg, NULL);
...@@ -332,6 +331,7 @@ static void vnodeBuildVloadMsg(char *pNode, void * param) { ...@@ -332,6 +331,7 @@ static void vnodeBuildVloadMsg(char *pNode, void * param) {
SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++]; SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++];
pLoad->vgId = htonl(pVnode->vgId); pLoad->vgId = htonl(pVnode->vgId);
pLoad->cfgVersion = htonl(pVnode->cfgVersion);
pLoad->totalStorage = htobe64(pLoad->totalStorage); pLoad->totalStorage = htobe64(pLoad->totalStorage);
pLoad->compStorage = htobe64(pLoad->compStorage); pLoad->compStorage = htobe64(pLoad->compStorage);
pLoad->pointsWritten = htobe64(pLoad->pointsWritten); pLoad->pointsWritten = htobe64(pLoad->pointsWritten);
...@@ -390,26 +390,28 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -390,26 +390,28 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnodeCfg->cfg.precision); len += snprintf(content + len, maxLen - len, " \"cfgVersion\": %d,\n", pVnodeCfg->cfg.cfgVersion);
len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnodeCfg->cfg.compression); len += snprintf(content + len, maxLen - len, " \"cacheBlockSize\": %d,\n", pVnodeCfg->cfg.cacheBlockSize);
len += snprintf(content + len, maxLen - len, " \"totalBlocks\": %d,\n", pVnodeCfg->cfg.totalBlocks);
len += snprintf(content + len, maxLen - len, " \"maxTables\": %d,\n", pVnodeCfg->cfg.maxTables); len += snprintf(content + len, maxLen - len, " \"maxTables\": %d,\n", pVnodeCfg->cfg.maxTables);
len += snprintf(content + len, maxLen - len, " \"daysPerFile\": %d,\n", pVnodeCfg->cfg.daysPerFile); len += snprintf(content + len, maxLen - len, " \"daysPerFile\": %d,\n", pVnodeCfg->cfg.daysPerFile);
len += snprintf(content + len, maxLen - len, " \"daysToKeep\": %d,\n", pVnodeCfg->cfg.daysToKeep);
len += snprintf(content + len, maxLen - len, " \"daysToKeep1\": %d,\n", pVnodeCfg->cfg.daysToKeep1);
len += snprintf(content + len, maxLen - len, " \"daysToKeep2\": %d,\n", pVnodeCfg->cfg.daysToKeep2);
len += snprintf(content + len, maxLen - len, " \"minRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.minRowsPerFileBlock); len += snprintf(content + len, maxLen - len, " \"minRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.minRowsPerFileBlock);
len += snprintf(content + len, maxLen - len, " \"maxRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.maxRowsPerFileBlock); len += snprintf(content + len, maxLen - len, " \"maxRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.maxRowsPerFileBlock);
len += snprintf(content + len, maxLen - len, " \"daysToKeep\": %d,\n", pVnodeCfg->cfg.daysToKeep); len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnodeCfg->cfg.commitTime);
len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnodeCfg->cfg.precision);
len += snprintf(content + len, maxLen - len, " \"maxCacheSize\": %" PRId64 ",\n", pVnodeCfg->cfg.maxCacheSize); len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnodeCfg->cfg.compression);
len += snprintf(content + len, maxLen - len, " \"commitLog\": %d,\n", pVnodeCfg->cfg.commitLog); len += snprintf(content + len, maxLen - len, " \"commitLog\": %d,\n", pVnodeCfg->cfg.commitLog);
len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pVnodeCfg->cfg.replications);
len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pVnodeCfg->cfg.wals); len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pVnodeCfg->cfg.wals);
len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pVnodeCfg->cfg.quorum);
uint32_t ipInt = pVnodeCfg->cfg.arbitratorIp; uint32_t ipInt = pVnodeCfg->cfg.arbitratorIp;
sprintf(ipStr, "%u.%u.%u.%u", ipInt & 0xFF, (ipInt >> 8) & 0xFF, (ipInt >> 16) & 0xFF, (uint8_t)(ipInt >> 24)); sprintf(ipStr, "%u.%u.%u.%u", ipInt & 0xFF, (ipInt >> 8) & 0xFF, (ipInt >> 16) & 0xFF, (uint8_t)(ipInt >> 24));
len += snprintf(content + len, maxLen - len, " \"arbitratorIp\": \"%s\",\n", ipStr); len += snprintf(content + len, maxLen - len, " \"arbitratorIp\": \"%s\",\n", ipStr);
len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pVnodeCfg->cfg.quorum);
len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pVnodeCfg->cfg.replications);
len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n"); len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
for (int32_t i = 0; i < pVnodeCfg->cfg.replications; i++) { for (int32_t i = 0; i < pVnodeCfg->cfg.replications; i++) {
len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", pVnodeCfg->nodes[i].nodeId); len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", pVnodeCfg->nodes[i].nodeId);
...@@ -463,19 +465,26 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) { ...@@ -463,19 +465,26 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON *precision = cJSON_GetObjectItem(root, "precision"); cJSON *cfgVersion = cJSON_GetObjectItem(root, "cfgVersion");
if (!precision || precision->type != cJSON_Number) { if (!cfgVersion || cfgVersion->type != cJSON_Number) {
dError("pVnode:%p vgId:%d, failed to read vnode cfg, precision not found", pVnode, pVnode->vgId); dError("pVnode:%p vgId:%d, failed to read vnode cfg, cfgVersion not found", pVnode, pVnode->vgId);
goto PARSE_OVER; goto PARSE_OVER;
} }
pVnode->tsdbCfg.precision = (int8_t)precision->valueint; pVnode->cfgVersion = cfgVersion->valueint;
cJSON *compression = cJSON_GetObjectItem(root, "compression"); cJSON *cacheBlockSize = cJSON_GetObjectItem(root, "cacheBlockSize");
if (!compression || compression->type != cJSON_Number) { if (!cacheBlockSize || cacheBlockSize->type != cJSON_Number) {
dError("pVnode:%p vgId:%d, failed to read vnode cfg, compression not found", pVnode, pVnode->vgId); dError("pVnode:%p vgId:%d, failed to read vnode cfg, cacheBlockSize not found", pVnode, pVnode->vgId);
goto PARSE_OVER; goto PARSE_OVER;
} }
pVnode->tsdbCfg.compression = (int8_t)compression->valueint; pVnode->tsdbCfg.cacheBlockSize = cacheBlockSize->valueint;
cJSON *totalBlocks = cJSON_GetObjectItem(root, "totalBlocks");
if (!totalBlocks || totalBlocks->type != cJSON_Number) {
dError("pVnode:%p vgId:%d, failed to read vnode cfg, totalBlocks not found", pVnode, pVnode->vgId);
goto PARSE_OVER;
}
pVnode->tsdbCfg.totalBlocks = totalBlocks->valueint;
cJSON *maxTables = cJSON_GetObjectItem(root, "maxTables"); cJSON *maxTables = cJSON_GetObjectItem(root, "maxTables");
if (!maxTables || maxTables->type != cJSON_Number) { if (!maxTables || maxTables->type != cJSON_Number) {
...@@ -484,13 +493,34 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) { ...@@ -484,13 +493,34 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
} }
pVnode->tsdbCfg.maxTables = maxTables->valueint; pVnode->tsdbCfg.maxTables = maxTables->valueint;
cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile"); cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile");
if (!daysPerFile || daysPerFile->type != cJSON_Number) { if (!daysPerFile || daysPerFile->type != cJSON_Number) {
dError("pVnode:%p vgId:%d, failed to read vnode cfg, daysPerFile not found", pVnode, pVnode->vgId); dError("pVnode:%p vgId:%d, failed to read vnode cfg, daysPerFile not found", pVnode, pVnode->vgId);
goto PARSE_OVER; goto PARSE_OVER;
} }
pVnode->tsdbCfg.daysPerFile = daysPerFile->valueint; pVnode->tsdbCfg.daysPerFile = daysPerFile->valueint;
cJSON *daysToKeep = cJSON_GetObjectItem(root, "daysToKeep");
if (!daysToKeep || daysToKeep->type != cJSON_Number) {
dError("pVnode:%p vgId:%d, failed to read vnode cfg, daysToKeep not found", pVnode, pVnode->vgId);
goto PARSE_OVER;
}
pVnode->tsdbCfg.keep = daysToKeep->valueint;
cJSON *daysToKeep1 = cJSON_GetObjectItem(root, "daysToKeep1");
if (!daysToKeep1 || daysToKeep1->type != cJSON_Number) {
dError("pVnode:%p vgId:%d, failed to read vnode cfg, daysToKeep1 not found", pVnode, pVnode->vgId);
goto PARSE_OVER;
}
pVnode->tsdbCfg.keep1 = daysToKeep1->valueint;
cJSON *daysToKeep2 = cJSON_GetObjectItem(root, "daysToKeep2");
if (!daysToKeep2 || daysToKeep2->type != cJSON_Number) {
dError("pVnode:%p vgId:%d, failed to read vnode cfg, daysToKeep2 not found", pVnode, pVnode->vgId);
goto PARSE_OVER;
}
pVnode->tsdbCfg.keep2 = daysToKeep2->valueint;
cJSON *minRowsPerFileBlock = cJSON_GetObjectItem(root, "minRowsPerFileBlock"); cJSON *minRowsPerFileBlock = cJSON_GetObjectItem(root, "minRowsPerFileBlock");
if (!minRowsPerFileBlock || minRowsPerFileBlock->type != cJSON_Number) { if (!minRowsPerFileBlock || minRowsPerFileBlock->type != cJSON_Number) {
dError("pVnode:%p vgId:%d, failed to read vnode cfg, minRowsPerFileBlock not found", pVnode, pVnode->vgId); dError("pVnode:%p vgId:%d, failed to read vnode cfg, minRowsPerFileBlock not found", pVnode, pVnode->vgId);
...@@ -505,19 +535,26 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) { ...@@ -505,19 +535,26 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
} }
pVnode->tsdbCfg.maxRowsPerFileBlock = maxRowsPerFileBlock->valueint; pVnode->tsdbCfg.maxRowsPerFileBlock = maxRowsPerFileBlock->valueint;
cJSON *daysToKeep = cJSON_GetObjectItem(root, "daysToKeep"); cJSON *commitTime = cJSON_GetObjectItem(root, "commitTime");
if (!daysToKeep || daysToKeep->type != cJSON_Number) { if (!commitTime || commitTime->type != cJSON_Number) {
dError("pVnode:%p vgId:%d, failed to read vnode cfg, daysToKeep not found", pVnode, pVnode->vgId); dError("pVnode:%p vgId:%d, failed to read vnode cfg, commitTime not found", pVnode, pVnode->vgId);
goto PARSE_OVER; goto PARSE_OVER;
} }
pVnode->tsdbCfg.keep = daysToKeep->valueint; pVnode->tsdbCfg.commitTime = (int8_t)commitTime->valueint;
cJSON *maxCacheSize = cJSON_GetObjectItem(root, "maxCacheSize"); cJSON *precision = cJSON_GetObjectItem(root, "precision");
if (!maxCacheSize || maxCacheSize->type != cJSON_Number) { if (!precision || precision->type != cJSON_Number) {
dError("pVnode:%p vgId:%d, failed to read vnode cfg, maxCacheSize not found", pVnode, pVnode->vgId); dError("pVnode:%p vgId:%d, failed to read vnode cfg, precision not found", pVnode, pVnode->vgId);
goto PARSE_OVER;
}
pVnode->tsdbCfg.precision = (int8_t)precision->valueint;
cJSON *compression = cJSON_GetObjectItem(root, "compression");
if (!compression || compression->type != cJSON_Number) {
dError("pVnode:%p vgId:%d, failed to read vnode cfg, compression not found", pVnode, pVnode->vgId);
goto PARSE_OVER; goto PARSE_OVER;
} }
pVnode->tsdbCfg.maxCacheSize = maxCacheSize->valueint; pVnode->tsdbCfg.compression = (int8_t)compression->valueint;
cJSON *commitLog = cJSON_GetObjectItem(root, "commitLog"); cJSON *commitLog = cJSON_GetObjectItem(root, "commitLog");
if (!commitLog || commitLog->type != cJSON_Number) { if (!commitLog || commitLog->type != cJSON_Number) {
...@@ -534,12 +571,12 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) { ...@@ -534,12 +571,12 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
pVnode->walCfg.wals = (int8_t)wals->valueint; pVnode->walCfg.wals = (int8_t)wals->valueint;
pVnode->walCfg.keep = 0; pVnode->walCfg.keep = 0;
cJSON *arbitratorIp = cJSON_GetObjectItem(root, "arbitratorIp"); cJSON *replica = cJSON_GetObjectItem(root, "replica");
if (!arbitratorIp || arbitratorIp->type != cJSON_String || arbitratorIp->valuestring == NULL) { if (!replica || replica->type != cJSON_Number) {
dError("pVnode:%p vgId:%d, failed to read vnode cfg, arbitratorIp not found", pVnode, pVnode->vgId); dError("pVnode:%p vgId:%d, failed to read vnode cfg, replica not found", pVnode, pVnode->vgId);
goto PARSE_OVER; goto PARSE_OVER;
} }
pVnode->syncCfg.arbitratorIp = inet_addr(arbitratorIp->valuestring); pVnode->syncCfg.replica = (int8_t)replica->valueint;
cJSON *quorum = cJSON_GetObjectItem(root, "quorum"); cJSON *quorum = cJSON_GetObjectItem(root, "quorum");
if (!quorum || quorum->type != cJSON_Number) { if (!quorum || quorum->type != cJSON_Number) {
...@@ -548,12 +585,12 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) { ...@@ -548,12 +585,12 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
} }
pVnode->syncCfg.quorum = (int8_t)quorum->valueint; pVnode->syncCfg.quorum = (int8_t)quorum->valueint;
cJSON *replica = cJSON_GetObjectItem(root, "replica"); cJSON *arbitratorIp = cJSON_GetObjectItem(root, "arbitratorIp");
if (!replica || replica->type != cJSON_Number) { if (!arbitratorIp || arbitratorIp->type != cJSON_String || arbitratorIp->valuestring == NULL) {
dError("pVnode:%p vgId:%d, failed to read vnode cfg, replica not found", pVnode, pVnode->vgId); dError("pVnode:%p vgId:%d, failed to read vnode cfg, arbitratorIp not found", pVnode, pVnode->vgId);
goto PARSE_OVER; goto PARSE_OVER;
} }
pVnode->syncCfg.replica = (int8_t)replica->valueint; pVnode->syncCfg.arbitratorIp = inet_addr(arbitratorIp->valuestring);
cJSON *nodeInfos = cJSON_GetObjectItem(root, "nodeInfos"); cJSON *nodeInfos = cJSON_GetObjectItem(root, "nodeInfos");
if (!nodeInfos || nodeInfos->type != cJSON_Array) { if (!nodeInfos || nodeInfos->type != cJSON_Array) {
......
...@@ -224,11 +224,10 @@ static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet ...@@ -224,11 +224,10 @@ static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet
int32_t code = 0; int32_t code = 0;
dTrace("pVnode:%p vgId:%d, stable:%s, start to drop", pVnode, pVnode->vgId, pTable->tableId); dTrace("pVnode:%p vgId:%d, stable:%s, start to drop", pVnode, pVnode->vgId, pTable->tableId);
// int64_t uid = htobe64(pTable->uid);
// TODO: drop stable in vvnode // TODO: drop stable in vvnode
//int64_t uid = htobe64(pTable->uid);
//void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode); //void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode);
//rpcRsp.code = tsdbDropSTable(pTsdb, pTable->uid); //rpcRsp.code = tsdbDropTable(pTsdb, pTable->uid);
code = TSDB_CODE_SUCCESS; code = TSDB_CODE_SUCCESS;
dTrace("pVnode:%p vgId:%d, stable:%s, drop stable result:%x", pVnode, pTable->tableId, code); dTrace("pVnode:%p vgId:%d, stable:%s, drop stable result:%x", pVnode, pTable->tableId, code);
......
run general/http/restful.sim run general/http/restful.sim
run general/http/restful_full.sim run general/http/restful_insert.sim
#run general/http/restful_limit.sim
#run general/http/restful_full.sim
#run general/http/prepare.sim
run general/http/telegraf.sim run general/http/telegraf.sim
run general/http/prepare.sim
run general/http/grafana_bug.sim run general/http/grafana_bug.sim
run general/http/grafana.sim run general/http/grafana.sim
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册