diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 1d6c684a01faba490577b9d6864be2091943818e..1adf3f0f0319d9653982a5539ee78d6a9f9a646f 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -309,8 +309,8 @@ do { \ #define TSDB_MAX_DB_REPLICA_OPTION 3 #define TSDB_DEFAULT_DB_REPLICA_OPTION 1 -#define TSDB_MIN_DB_PARTITON_OPTION 1 -#define TSDB_MAX_DB_PARTITON_OPTION 50000 +#define TSDB_MIN_DB_PARTITON_OPTION 0 +#define TSDB_MAX_DB_PARTITON_OPTION 1000 #define TSDB_DEFAULT_DB_PARTITON_OPTION 4 #define TSDB_MIN_DB_QUORUM_OPTION 1 diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 3eb197868b26115ff25206cc8db2bdd809c634cc..af933fa4e9cb194034b68643cdf23dbb6811ee02 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -185,6 +185,9 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_INVALID_DB_OPTION_DAYS TAOS_DEF_ERROR_CODE(0, 0x0390) //"Invalid database option: days out of range") #define TSDB_CODE_MND_INVALID_DB_OPTION_KEEP TAOS_DEF_ERROR_CODE(0, 0x0391) //"Invalid database option: keep >= keep1 >= keep0 >= days") +#define TSDB_CODE_MND_INVALID_TOPIC TAOS_DEF_ERROR_CODE(0, 0x0392) //"Invalid topic name) +#define TSDB_CODE_MND_INVALID_TOPIC_OPTION TAOS_DEF_ERROR_CODE(0, 0x0393) //"Invalid topic option) + // dnode #define TSDB_CODE_DND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0400) //"Message not processed") #define TSDB_CODE_DND_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0401) //"Dnode out of memory") diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 40446e56f56f9fdcad888de25a5d64a6a8f6c3ca..f740575b7a0a59c62cd5819dfac1831f26565bec 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -683,7 +683,8 @@ typedef struct { int8_t cacheLastRow; int32_t vgCfgVersion; int8_t dbReplica; - int8_t reserved[9]; + int8_t dbType; + int8_t reserved[8]; } SVnodeCfg; typedef struct { diff --git a/src/inc/tp.h b/src/inc/tp.h index d2165f1d61380c1cff9acdea28acc169e3589e46..b0b787bf689dafb016416cb82c1ebeb049d7c104 100644 --- a/src/inc/tp.h +++ b/src/inc/tp.h @@ -22,6 +22,7 @@ extern "C" { int32_t tpInit(); void tpCleanUp(); +void tpUpdateTs(int32_t *seq, void *pMsg); #ifdef __cplusplus } diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index 1c6231558f582b9d79db3feaa46c1c175c47bddd..7fbf43f253223dd2076d9a29e27277255ae1dfb6 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -49,12 +49,13 @@ static int32_t mnodeSetDbDropping(SDbObj *pDb); static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg); -static int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg); +int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg); #ifndef _TOPIC int32_t tpInit() {} void tpCleanUp() {} +void tpUpdateTs(int32_t *seq, void *pMsg) {} #endif static void mnodeDestroyDb(SDbObj *pDb) { @@ -180,16 +181,9 @@ int32_t mnodeInitDbs() { mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_DB, mnodeGetDbMeta); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_DB, mnodeRetrieveDbs); mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_DB, mnodeCancelGetNextDb); - - mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_CREATE_TP, mnodeProcessCreateDbMsg); - mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_ALTER_TP, mnodeProcessAlterDbMsg); - mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_DROP_TP, mnodeProcessDropDbMsg); - mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_TP, mnodeGetDbMeta); - mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_TP, mnodeRetrieveDbs); - mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_TP, mnodeCancelGetNextDb); - + mDebug("table:dbs table is created"); - return 0; + return tpInit(); } void *mnodeGetNextDb(void *pIter, SDbObj **pDb) { @@ -345,6 +339,17 @@ static int32_t mnodeCheckDbCfg(SDbCfg *pCfg) { return TSDB_CODE_MND_INVALID_DB_OPTION; } + if (pCfg->dbType < 0 || pCfg->dbType > 1) { + mError("invalid db option dbType:%d valid range: [%d, %d]", pCfg->dbType, 0, 1); + return TSDB_CODE_MND_INVALID_DB_OPTION; + } + + if (pCfg->partitions < TSDB_MIN_DB_PARTITON_OPTION || pCfg->partitions > TSDB_MAX_DB_PARTITON_OPTION) { + mError("invalid db option partitions:%d valid range: [%d, %d]", pCfg->partitions, TSDB_MIN_DB_PARTITON_OPTION, + TSDB_MAX_DB_PARTITON_OPTION); + return TSDB_CODE_MND_INVALID_DB_OPTION; + } + return TSDB_CODE_SUCCESS; } @@ -697,7 +702,7 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void pShow->pIter = mnodeGetNextDb(pShow->pIter, &pDb); if (pDb == NULL) break; - if (pDb->pAcct != pUser->pAcct || pDb->status != TSDB_DB_STATUS_READY /*|| pDb->cfg.dbType != TSDB_DB_TYPE_DEFAULT*/) { + if (pDb->pAcct != pUser->pAcct || pDb->status != TSDB_DB_STATUS_READY) { mnodeDecDbRef(pDb); continue; } @@ -1092,7 +1097,7 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SAlterDbMsg *pAlter, void *pMsg) { return code; } -static int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg) { +int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg) { SAlterDbMsg *pAlter = pMsg->rpcMsg.pCont; mDebug("db:%s, alter db msg is received from thandle:%p", pAlter->db, pMsg->rpcMsg.handle); diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index 67ee11640b04c355f0a15d0df12eacc5f1837d7b..f17f3ad8d902e61bfa4fd5007c37af28911dce9b 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -367,6 +367,11 @@ static int32_t mnodeAllocVgroupIdPool(SVgObj *pInputVgroup) { maxIdPoolSize = MAX(maxIdPoolSize, idPoolSize); } + // create one table each vnode + if (pDb->cfg.dbType == TSDB_DB_TYPE_TOPIC) { + maxIdPoolSize = 1; + } + // new vgroup if (pInputVgroup->idPool == NULL) { pInputVgroup->idPool = taosInitIdPool(maxIdPoolSize); @@ -379,6 +384,11 @@ static int32_t mnodeAllocVgroupIdPool(SVgObj *pInputVgroup) { } } + // create one table each vnode + if (pDb->cfg.dbType == TSDB_DB_TYPE_TOPIC) { + return TSDB_CODE_SUCCESS; + } + // realloc all vgroups in db int32_t newIdPoolSize; if (minIdPoolSize * 4 < tsTableIncStepPerVnode) { @@ -449,6 +459,10 @@ int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSi maxVgroupsPerDb = MIN(maxVgroupsPerDb, TSDB_MAX_VNODES_PER_DB); } + if (pDb->cfg.dbType == TSDB_DB_TYPE_TOPIC && pDb->cfg.partitions > 0) { + maxVgroupsPerDb = pDb->cfg.partitions; + } + int32_t code = TSDB_CODE_MND_NO_ENOUGH_DNODES; if (pDb->numOfVgroups < maxVgroupsPerDb) { mDebug("msg:%p, app:%p db:%s, try to create a new vgroup, numOfVgroups:%d maxVgroupsPerDb:%d", pMsg, @@ -880,6 +894,7 @@ static SCreateVnodeMsg *mnodeBuildVnodeMsg(SVgObj *pVgroup) { pCfg->update = pDb->cfg.update; pCfg->cacheLastRow = pDb->cfg.cacheLastRow; pCfg->dbReplica = pDb->cfg.replications; + pCfg->dbType = pDb->cfg.dbType; SVnodeDesc *pNodes = pVnode->nodes; for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) { diff --git a/src/util/src/terror.c b/src/util/src/terror.c index 4a011b7cc7407acabe249b577280f7bbda58f79d..221e6183909682680b68c7a0debb2a90eca34c37 100644 --- a/src/util/src/terror.c +++ b/src/util/src/terror.c @@ -197,6 +197,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_NOT_READY, "Database unsynced") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB_OPTION_DAYS, "Invalid database option: days out of range") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB_OPTION_KEEP, "Invalid database option: keep >= keep1 >= keep0 >= days") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC, "Invalid topic name") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_OPTION, "Invalid topic option") + // dnode TAOS_DEFINE_ERROR(TSDB_CODE_DND_MSG_NOT_PROCESSED, "Message not processed") TAOS_DEFINE_ERROR(TSDB_CODE_DND_OUT_OF_MEMORY, "Dnode out of memory") diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index 3ec77bbc122341935a804448eca61258c634cf58..91ddf5076b0540613cdac7047f0a1ea0cdd21c0b 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -40,6 +40,7 @@ typedef struct { int32_t queuedWMsg; int32_t queuedRMsg; int32_t flowctrlLevel; + int32_t sequence; // for topic int8_t status; int8_t role; int8_t accessState; @@ -47,7 +48,7 @@ typedef struct { int8_t isCommiting; int8_t dbReplica; int8_t dropped; - int8_t reserved; + int8_t dbType; uint64_t version; // current version uint64_t cversion; // version while commit start uint64_t fversion; // version on saved data file diff --git a/src/vnode/src/vnodeCfg.c b/src/vnode/src/vnodeCfg.c index 03f2b11eec239ad469faec66ddcbd60282e0409b..c9cd366c6406cb3212fc271f924e1c7b4c42995d 100644 --- a/src/vnode/src/vnodeCfg.c +++ b/src/vnode/src/vnodeCfg.c @@ -42,6 +42,7 @@ static void vnodeLoadCfg(SVnodeObj *pVnode, SCreateVnodeMsg* vnodeMsg) { pVnode->syncCfg.replica = vnodeMsg->cfg.vgReplica; pVnode->syncCfg.quorum = vnodeMsg->cfg.quorum; pVnode->dbReplica = vnodeMsg->cfg.dbReplica; + pVnode->dbType = vnodeMsg->cfg.dbType; for (int i = 0; i < pVnode->syncCfg.replica; ++i) { SVnodeDesc *node = &vnodeMsg->nodes[i]; @@ -214,7 +215,7 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) { cJSON *dbReplica = cJSON_GetObjectItem(root, "dbReplica"); if (!dbReplica || dbReplica->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, dbReplica not found", pVnode->vgId, file); + vWarn("vgId:%d, failed to read %s, dbReplica not found", pVnode->vgId, file); vnodeMsg.cfg.dbReplica = vnodeMsg.cfg.vgReplica; vnodeMsg.cfg.vgCfgVersion = 0; } else { @@ -230,7 +231,7 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) { cJSON *update = cJSON_GetObjectItem(root, "update"); if (!update || update->type != cJSON_Number) { - vError("vgId: %d, failed to read %s, update not found", pVnode->vgId, file); + vWarn("vgId: %d, failed to read %s, update not found", pVnode->vgId, file); vnodeMsg.cfg.update = 0; vnodeMsg.cfg.vgCfgVersion = 0; } else { @@ -239,13 +240,21 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) { cJSON *cacheLastRow = cJSON_GetObjectItem(root, "cacheLastRow"); if (!cacheLastRow || cacheLastRow->type != cJSON_Number) { - vError("vgId: %d, failed to read %s, cacheLastRow not found", pVnode->vgId, file); + vWarn("vgId: %d, failed to read %s, cacheLastRow not found", pVnode->vgId, file); vnodeMsg.cfg.cacheLastRow = 0; vnodeMsg.cfg.vgCfgVersion = 0; } else { vnodeMsg.cfg.cacheLastRow = (int8_t)cacheLastRow->valueint; } + cJSON *dbType = cJSON_GetObjectItem(root, "dbType"); + if (!dbType || dbType->type != cJSON_Number) { + vWarn("vgId: %d, failed to read %s, dbType not found", pVnode->vgId, file); + vnodeMsg.cfg.dbType = 0; + } else { + vnodeMsg.cfg.dbType = (int8_t)dbType->valueint; + } + cJSON *nodeInfos = cJSON_GetObjectItem(root, "nodeInfos"); if (!nodeInfos || nodeInfos->type != cJSON_Array) { vError("vgId:%d, failed to read %s, nodeInfos not found", pVnode->vgId, file); @@ -337,6 +346,7 @@ int32_t vnodeWriteCfg(SCreateVnodeMsg *pMsg) { len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pMsg->cfg.quorum); len += snprintf(content + len, maxLen - len, " \"update\": %d,\n", pMsg->cfg.update); len += snprintf(content + len, maxLen - len, " \"cacheLastRow\": %d,\n", pMsg->cfg.cacheLastRow); + len += snprintf(content + len, maxLen - len, " \"dbType\": %d,\n", pMsg->cfg.dbType); len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n"); for (int32_t i = 0; i < pMsg->cfg.vgReplica; i++) { SVnodeDesc *node = &pMsg->nodes[i]; diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 99b7e7b62890c2801835d4a6130d49b9b224ddab..a1d4f50010f1cd3825146dbc2506d53ad3e17c09 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "os.h" +#include "tp.h" #include "taosmsg.h" #include "taoserror.h" #include "tglobal.h" @@ -139,6 +140,10 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR vTrace("vgId:%d, submit msg is processed", pVnode->vgId); + if (pVnode->dbType == TSDB_DB_TYPE_TOPIC && pVnode->role == TAOS_SYNC_ROLE_MASTER) { + tpUpdateTs(&pVnode->sequence, pCont); + } + // save insert result into item SShellSubmitRspMsg *pRsp = NULL; if (pRet) {