提交 9e8379a5 编写于 作者: S Shengliang Guan

TD-3130

上级 32510fa4
......@@ -309,8 +309,8 @@ do { \
#define TSDB_MAX_DB_REPLICA_OPTION 3
#define TSDB_DEFAULT_DB_REPLICA_OPTION 1
#define TSDB_MIN_DB_PARTITON_OPTION 1
#define TSDB_MAX_DB_PARTITON_OPTION 50000
#define TSDB_MIN_DB_PARTITON_OPTION 0
#define TSDB_MAX_DB_PARTITON_OPTION 1000
#define TSDB_DEFAULT_DB_PARTITON_OPTION 4
#define TSDB_MIN_DB_QUORUM_OPTION 1
......
......@@ -185,6 +185,9 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_INVALID_DB_OPTION_DAYS TAOS_DEF_ERROR_CODE(0, 0x0390) //"Invalid database option: days out of range")
#define TSDB_CODE_MND_INVALID_DB_OPTION_KEEP TAOS_DEF_ERROR_CODE(0, 0x0391) //"Invalid database option: keep >= keep1 >= keep0 >= days")
#define TSDB_CODE_MND_INVALID_TOPIC TAOS_DEF_ERROR_CODE(0, 0x0392) //"Invalid topic name)
#define TSDB_CODE_MND_INVALID_TOPIC_OPTION TAOS_DEF_ERROR_CODE(0, 0x0393) //"Invalid topic option)
// dnode
#define TSDB_CODE_DND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0400) //"Message not processed")
#define TSDB_CODE_DND_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0401) //"Dnode out of memory")
......
......@@ -683,7 +683,8 @@ typedef struct {
int8_t cacheLastRow;
int32_t vgCfgVersion;
int8_t dbReplica;
int8_t reserved[9];
int8_t dbType;
int8_t reserved[8];
} SVnodeCfg;
typedef struct {
......
......@@ -22,6 +22,7 @@ extern "C" {
int32_t tpInit();
void tpCleanUp();
void tpUpdateTs(int32_t *seq, void *pMsg);
#ifdef __cplusplus
}
......
......@@ -49,12 +49,13 @@ static int32_t mnodeSetDbDropping(SDbObj *pDb);
static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg);
int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg);
#ifndef _TOPIC
int32_t tpInit() {}
void tpCleanUp() {}
void tpUpdateTs(int32_t *seq, void *pMsg) {}
#endif
static void mnodeDestroyDb(SDbObj *pDb) {
......@@ -180,16 +181,9 @@ int32_t mnodeInitDbs() {
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_DB, mnodeGetDbMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_DB, mnodeRetrieveDbs);
mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_DB, mnodeCancelGetNextDb);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_CREATE_TP, mnodeProcessCreateDbMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_ALTER_TP, mnodeProcessAlterDbMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_DROP_TP, mnodeProcessDropDbMsg);
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_TP, mnodeGetDbMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_TP, mnodeRetrieveDbs);
mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_TP, mnodeCancelGetNextDb);
mDebug("table:dbs table is created");
return 0;
return tpInit();
}
void *mnodeGetNextDb(void *pIter, SDbObj **pDb) {
......@@ -345,6 +339,17 @@ static int32_t mnodeCheckDbCfg(SDbCfg *pCfg) {
return TSDB_CODE_MND_INVALID_DB_OPTION;
}
if (pCfg->dbType < 0 || pCfg->dbType > 1) {
mError("invalid db option dbType:%d valid range: [%d, %d]", pCfg->dbType, 0, 1);
return TSDB_CODE_MND_INVALID_DB_OPTION;
}
if (pCfg->partitions < TSDB_MIN_DB_PARTITON_OPTION || pCfg->partitions > TSDB_MAX_DB_PARTITON_OPTION) {
mError("invalid db option partitions:%d valid range: [%d, %d]", pCfg->partitions, TSDB_MIN_DB_PARTITON_OPTION,
TSDB_MAX_DB_PARTITON_OPTION);
return TSDB_CODE_MND_INVALID_DB_OPTION;
}
return TSDB_CODE_SUCCESS;
}
......@@ -697,7 +702,7 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void
pShow->pIter = mnodeGetNextDb(pShow->pIter, &pDb);
if (pDb == NULL) break;
if (pDb->pAcct != pUser->pAcct || pDb->status != TSDB_DB_STATUS_READY /*|| pDb->cfg.dbType != TSDB_DB_TYPE_DEFAULT*/) {
if (pDb->pAcct != pUser->pAcct || pDb->status != TSDB_DB_STATUS_READY) {
mnodeDecDbRef(pDb);
continue;
}
......@@ -1092,7 +1097,7 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SAlterDbMsg *pAlter, void *pMsg) {
return code;
}
static int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg) {
int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg) {
SAlterDbMsg *pAlter = pMsg->rpcMsg.pCont;
mDebug("db:%s, alter db msg is received from thandle:%p", pAlter->db, pMsg->rpcMsg.handle);
......
......@@ -367,6 +367,11 @@ static int32_t mnodeAllocVgroupIdPool(SVgObj *pInputVgroup) {
maxIdPoolSize = MAX(maxIdPoolSize, idPoolSize);
}
// create one table each vnode
if (pDb->cfg.dbType == TSDB_DB_TYPE_TOPIC) {
maxIdPoolSize = 1;
}
// new vgroup
if (pInputVgroup->idPool == NULL) {
pInputVgroup->idPool = taosInitIdPool(maxIdPoolSize);
......@@ -379,6 +384,11 @@ static int32_t mnodeAllocVgroupIdPool(SVgObj *pInputVgroup) {
}
}
// create one table each vnode
if (pDb->cfg.dbType == TSDB_DB_TYPE_TOPIC) {
return TSDB_CODE_SUCCESS;
}
// realloc all vgroups in db
int32_t newIdPoolSize;
if (minIdPoolSize * 4 < tsTableIncStepPerVnode) {
......@@ -449,6 +459,10 @@ int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSi
maxVgroupsPerDb = MIN(maxVgroupsPerDb, TSDB_MAX_VNODES_PER_DB);
}
if (pDb->cfg.dbType == TSDB_DB_TYPE_TOPIC && pDb->cfg.partitions > 0) {
maxVgroupsPerDb = pDb->cfg.partitions;
}
int32_t code = TSDB_CODE_MND_NO_ENOUGH_DNODES;
if (pDb->numOfVgroups < maxVgroupsPerDb) {
mDebug("msg:%p, app:%p db:%s, try to create a new vgroup, numOfVgroups:%d maxVgroupsPerDb:%d", pMsg,
......@@ -880,6 +894,7 @@ static SCreateVnodeMsg *mnodeBuildVnodeMsg(SVgObj *pVgroup) {
pCfg->update = pDb->cfg.update;
pCfg->cacheLastRow = pDb->cfg.cacheLastRow;
pCfg->dbReplica = pDb->cfg.replications;
pCfg->dbType = pDb->cfg.dbType;
SVnodeDesc *pNodes = pVnode->nodes;
for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) {
......
......@@ -197,6 +197,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_NOT_READY, "Database unsynced")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB_OPTION_DAYS, "Invalid database option: days out of range")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB_OPTION_KEEP, "Invalid database option: keep >= keep1 >= keep0 >= days")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC, "Invalid topic name")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_OPTION, "Invalid topic option")
// dnode
TAOS_DEFINE_ERROR(TSDB_CODE_DND_MSG_NOT_PROCESSED, "Message not processed")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_OUT_OF_MEMORY, "Dnode out of memory")
......
......@@ -40,6 +40,7 @@ typedef struct {
int32_t queuedWMsg;
int32_t queuedRMsg;
int32_t flowctrlLevel;
int32_t sequence; // for topic
int8_t status;
int8_t role;
int8_t accessState;
......@@ -47,7 +48,7 @@ typedef struct {
int8_t isCommiting;
int8_t dbReplica;
int8_t dropped;
int8_t reserved;
int8_t dbType;
uint64_t version; // current version
uint64_t cversion; // version while commit start
uint64_t fversion; // version on saved data file
......
......@@ -42,6 +42,7 @@ static void vnodeLoadCfg(SVnodeObj *pVnode, SCreateVnodeMsg* vnodeMsg) {
pVnode->syncCfg.replica = vnodeMsg->cfg.vgReplica;
pVnode->syncCfg.quorum = vnodeMsg->cfg.quorum;
pVnode->dbReplica = vnodeMsg->cfg.dbReplica;
pVnode->dbType = vnodeMsg->cfg.dbType;
for (int i = 0; i < pVnode->syncCfg.replica; ++i) {
SVnodeDesc *node = &vnodeMsg->nodes[i];
......@@ -214,7 +215,7 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) {
cJSON *dbReplica = cJSON_GetObjectItem(root, "dbReplica");
if (!dbReplica || dbReplica->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, dbReplica not found", pVnode->vgId, file);
vWarn("vgId:%d, failed to read %s, dbReplica not found", pVnode->vgId, file);
vnodeMsg.cfg.dbReplica = vnodeMsg.cfg.vgReplica;
vnodeMsg.cfg.vgCfgVersion = 0;
} else {
......@@ -230,7 +231,7 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) {
cJSON *update = cJSON_GetObjectItem(root, "update");
if (!update || update->type != cJSON_Number) {
vError("vgId: %d, failed to read %s, update not found", pVnode->vgId, file);
vWarn("vgId: %d, failed to read %s, update not found", pVnode->vgId, file);
vnodeMsg.cfg.update = 0;
vnodeMsg.cfg.vgCfgVersion = 0;
} else {
......@@ -239,13 +240,21 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) {
cJSON *cacheLastRow = cJSON_GetObjectItem(root, "cacheLastRow");
if (!cacheLastRow || cacheLastRow->type != cJSON_Number) {
vError("vgId: %d, failed to read %s, cacheLastRow not found", pVnode->vgId, file);
vWarn("vgId: %d, failed to read %s, cacheLastRow not found", pVnode->vgId, file);
vnodeMsg.cfg.cacheLastRow = 0;
vnodeMsg.cfg.vgCfgVersion = 0;
} else {
vnodeMsg.cfg.cacheLastRow = (int8_t)cacheLastRow->valueint;
}
cJSON *dbType = cJSON_GetObjectItem(root, "dbType");
if (!dbType || dbType->type != cJSON_Number) {
vWarn("vgId: %d, failed to read %s, dbType not found", pVnode->vgId, file);
vnodeMsg.cfg.dbType = 0;
} else {
vnodeMsg.cfg.dbType = (int8_t)dbType->valueint;
}
cJSON *nodeInfos = cJSON_GetObjectItem(root, "nodeInfos");
if (!nodeInfos || nodeInfos->type != cJSON_Array) {
vError("vgId:%d, failed to read %s, nodeInfos not found", pVnode->vgId, file);
......@@ -337,6 +346,7 @@ int32_t vnodeWriteCfg(SCreateVnodeMsg *pMsg) {
len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pMsg->cfg.quorum);
len += snprintf(content + len, maxLen - len, " \"update\": %d,\n", pMsg->cfg.update);
len += snprintf(content + len, maxLen - len, " \"cacheLastRow\": %d,\n", pMsg->cfg.cacheLastRow);
len += snprintf(content + len, maxLen - len, " \"dbType\": %d,\n", pMsg->cfg.dbType);
len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
for (int32_t i = 0; i < pMsg->cfg.vgReplica; i++) {
SVnodeDesc *node = &pMsg->nodes[i];
......
......@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "tp.h"
#include "taosmsg.h"
#include "taoserror.h"
#include "tglobal.h"
......@@ -139,6 +140,10 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR
vTrace("vgId:%d, submit msg is processed", pVnode->vgId);
if (pVnode->dbType == TSDB_DB_TYPE_TOPIC && pVnode->role == TAOS_SYNC_ROLE_MASTER) {
tpUpdateTs(&pVnode->sequence, pCont);
}
// save insert result into item
SShellSubmitRspMsg *pRsp = NULL;
if (pRet) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册