提交 ddb9d824 编写于 作者: M Minglei Jin

[TD-2511]<fix>: DB new option cacheLastRow

上级 d48ce512
......@@ -94,6 +94,7 @@ extern int32_t tsFsyncPeriod;
extern int32_t tsReplications;
extern int32_t tsQuorum;
extern int32_t tsUpdate;
extern int32_t tsCacheLastRow;
// balance
extern int32_t tsEnableBalance;
......
......@@ -127,6 +127,7 @@ int32_t tsFsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD;
int32_t tsReplications = TSDB_DEFAULT_DB_REPLICA_OPTION;
int32_t tsQuorum = TSDB_DEFAULT_DB_QUORUM_OPTION;
int32_t tsUpdate = TSDB_DEFAULT_DB_UPDATE_OPTION;
int32_t tsCacheLastRow = TSDB_DEFAULT_CACHE_BLOCK_SIZE;
int32_t tsMaxVgroupsPerDb = 0;
int32_t tsMinTablePerVnode = TSDB_TABLES_STEP;
int32_t tsMaxTablePerVnode = TSDB_DEFAULT_TABLES;
......
......@@ -141,6 +141,7 @@ static SCreateVnodeMsg* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) {
pCreate->cfg.maxRowsPerFileBlock = htonl(pCreate->cfg.maxRowsPerFileBlock);
pCreate->cfg.fsyncPeriod = htonl(pCreate->cfg.fsyncPeriod);
pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime);
pCreate->cfg.cacheLastRow = htonl(pCreate->cfg.cacheLastRow);
for (int32_t j = 0; j < pCreate->cfg.replications; ++j) {
pCreate->nodes[j].nodeId = htonl(pCreate->nodes[j].nodeId);
......@@ -216,4 +217,4 @@ static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg) {
dnodeStartMnode(&pCfg->mnodes);
return TSDB_CODE_SUCCESS;
}
\ No newline at end of file
}
......@@ -369,6 +369,10 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf
#define TSDB_MAX_DB_UPDATE 1
#define TSDB_DEFAULT_DB_UPDATE_OPTION 0
#define TSDB_MIN_DB_CACHE_LAST_ROW 0
#define TSDB_MAX_DB_CACHE_LAST_ROW 1
#define TSDB_DEFAULT_CACHE_LAST_ROW 0
#define TSDB_MIN_FSYNC_PERIOD 0
#define TSDB_MAX_FSYNC_PERIOD 180000 // millisecond
#define TSDB_DEFAULT_FSYNC_PERIOD 3000 // three second
......
......@@ -548,7 +548,8 @@ typedef struct {
int8_t quorum;
int8_t ignoreExist;
int8_t update;
int8_t reserve[9];
int8_t cacheLastRow;
int8_t reserve[8];
} SCreateDbMsg, SAlterDbMsg;
typedef struct {
......@@ -660,7 +661,8 @@ typedef struct {
int8_t wals;
int8_t quorum;
int8_t update;
int8_t reserved[15];
int8_t cacheLastRow;
int8_t reserved[14];
} SVnodeCfg;
typedef struct {
......
......@@ -173,7 +173,8 @@ typedef struct {
int8_t replications;
int8_t quorum;
int8_t update;
int8_t reserved[11];
int8_t cacheLastRow;
int8_t reserved[10];
} SDbCfg;
typedef struct SDbObj {
......
......@@ -322,6 +322,11 @@ static int32_t mnodeCheckDbCfg(SDbCfg *pCfg) {
return TSDB_CODE_MND_INVALID_DB_OPTION;
}
if (pCfg->cacheLastRow < TSDB_MIN_DB_CACHE_LAST_ROW || pCfg->cacheLastRow > TSDB_MAX_DB_CACHE_LAST_ROW) {
mError("invalid db option cacheLastRow:%d valid range: [%d, %d]", pCfg->cacheLastRow, TSDB_MIN_DB_CACHE_LAST_ROW, TSDB_MAX_DB_CACHE_LAST_ROW);
return TSDB_CODE_MND_INVALID_DB_OPTION;
}
return TSDB_CODE_SUCCESS;
}
......@@ -343,6 +348,7 @@ static void mnodeSetDefaultDbCfg(SDbCfg *pCfg) {
if (pCfg->replications < 0) pCfg->replications = tsReplications;
if (pCfg->quorum < 0) pCfg->quorum = tsQuorum;
if (pCfg->update < 0) pCfg->update = tsUpdate;
if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = tsCacheLastRow;
}
static int32_t mnodeCreateDbCb(SMnodeMsg *pMsg, int32_t code) {
......@@ -396,7 +402,8 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg *
.walLevel = pCreate->walLevel,
.replications = pCreate->replications,
.quorum = pCreate->quorum,
.update = pCreate->update
.update = pCreate->update,
.cacheLastRow = pCreate->cacheLastRow
};
mnodeSetDefaultDbCfg(&pDb->cfg);
......@@ -750,6 +757,10 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int8_t *)pWrite = pDb->cfg.compression;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int8_t *)pWrite = pDb->cfg.cacheLastRow;
cols++;
#ifndef __CLOUD_VERSION__
}
#endif
......@@ -864,6 +875,7 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SAlterDbMsg *pAlter) {
int8_t quorum = pAlter->quorum;
int8_t precision = pAlter->precision;
int8_t update = pAlter->update;
int8_t cacheLastRow = pAlter->cacheLastRow;
terrno = TSDB_CODE_SUCCESS;
......@@ -976,6 +988,11 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SAlterDbMsg *pAlter) {
#endif
}
if (cacheLastRow >= 0 && cacheLastRow != pDb->cfg.cacheLastRow) {
mDebug("db:%s, cacheLastRow:%d change to %d", pDb->name, pDb->cfg.cacheLastRow, cacheLastRow);
newCfg.cacheLastRow = cacheLastRow;
}
return newCfg;
}
......
......@@ -859,6 +859,7 @@ static SCreateVnodeMsg *mnodeBuildVnodeMsg(SVgObj *pVgroup) {
pCfg->wals = 3;
pCfg->quorum = pDb->cfg.quorum;
pCfg->update = pDb->cfg.update;
pCfg->cacheLastRow = pDb->cfg.cacheLastRow;
SVnodeDesc *pNodes = pVnode->nodes;
for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) {
......
......@@ -33,6 +33,7 @@ static void vnodeLoadCfg(SVnodeObj *pVnode, SCreateVnodeMsg* vnodeMsg) {
pVnode->tsdbCfg.maxRowsPerFileBlock = vnodeMsg->cfg.maxRowsPerFileBlock;
pVnode->tsdbCfg.precision = vnodeMsg->cfg.precision;
pVnode->tsdbCfg.compression = vnodeMsg->cfg.compression;
pVnode->tsdbCfg.cacheLastRow = vnodeMsg->cfg.cacheLastRow;
pVnode->walCfg.walLevel = vnodeMsg->cfg.walLevel;
pVnode->walCfg.fsyncPeriod = vnodeMsg->cfg.fsyncPeriod;
pVnode->walCfg.keep = TAOS_WAL_NOT_KEEP;
......@@ -207,6 +208,13 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) {
}
vnodeMsg.cfg.quorum = (int8_t)quorum->valueint;
cJSON *cacheLastRow = cJSON_GetObjectItem(root, "cacheLastRow");
if (!cacheLastRow || cacheLastRow->type != cJSON_Number) {
vError("vgId: %d, failed to read %s, cacheLastRow not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR;
}
vnodeMsg.cfg.cacheLastRow = (int8_t)cacheLastRow->valueint;
cJSON *nodeInfos = cJSON_GetObjectItem(root, "nodeInfos");
if (!nodeInfos || nodeInfos->type != cJSON_Array) {
vError("vgId:%d, failed to read %s, nodeInfos not found", pVnode->vgId, file);
......@@ -294,6 +302,7 @@ int32_t vnodeWriteCfg(SCreateVnodeMsg *pMsg) {
len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pMsg->cfg.replications);
len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pMsg->cfg.wals);
len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pMsg->cfg.quorum);
len += snprintf(content + len, maxLen - len, " \"cacheLastRow\": %d,\n", pMsg->cfg.cacheLastRow);
len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
for (int32_t i = 0; i < pMsg->cfg.replications; i++) {
SVnodeDesc *node = &pMsg->nodes[i];
......
......@@ -86,6 +86,7 @@ int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg) {
tsdbCfg.precision = pVnodeCfg->cfg.precision;
tsdbCfg.compression = pVnodeCfg->cfg.compression;
tsdbCfg.update = pVnodeCfg->cfg.update;
tsdbCfg.cacheLastRow = pVnodeCfg->cfg.cacheLastRow;
char tsdbDir[TSDB_FILENAME_LEN] = {0};
sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册