diff --git a/CMakeLists.txt b/CMakeLists.txt index 7bb36fe1b001473cf5641ad195959581affeb2cb..be97e679d1c2ca6229013a96e8946d6a18068ed3 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -16,6 +16,7 @@ SET(TD_GRANT FALSE) SET(TD_MQTT FALSE) SET(TD_TSDB_PLUGINS FALSE) SET(TD_STORAGE FALSE) +SET(TD_TOPIC FALSE) SET(TD_COVER FALSE) SET(TD_MEM_CHECK FALSE) diff --git a/cmake/define.inc b/cmake/define.inc index ae90410f2d6873b60ee0e355f18462983e615545..ff4583d02bd924f59701a8302a2e9d8dbb32a14f 100755 --- a/cmake/define.inc +++ b/cmake/define.inc @@ -25,6 +25,10 @@ IF (TD_STORAGE) ADD_DEFINITIONS(-D_STORAGE) ENDIF () +IF (TD_TOPIC) + ADD_DEFINITIONS(-D_TOPIC) +ENDIF () + IF (TD_GODLL) ADD_DEFINITIONS(-D_TD_GO_DLL_) ENDIF () diff --git a/cmake/input.inc b/cmake/input.inc index e8324887a051ae6ac27eff6b748c824d1f0a3fa5..b1a993c996724d6f8948d98de3600db856a09c86 100755 --- a/cmake/input.inc +++ b/cmake/input.inc @@ -9,6 +9,14 @@ ELSEIF (${ACCOUNT} MATCHES "false") MESSAGE(STATUS "Build without account plugins") ENDIF () +IF (${TOPIC} MATCHES "true") + SET(TD_TOPIC TRUE) + MESSAGE(STATUS "Build with topic plugins") +ELSEIF (${TOPIC} MATCHES "false") + SET(TD_TOPIC FALSE) + MESSAGE(STATUS "Build without topic plugins") +ENDIF () + IF (${COVER} MATCHES "true") SET(TD_COVER TRUE) MESSAGE(STATUS "Build with test coverage") diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index c6d0226244b9b64d21fcc6c7939d61fa27a55525..df1a622101afa032001ed499495656186b40754f 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -95,6 +95,7 @@ extern int8_t tsCompression; extern int8_t tsWAL; extern int32_t tsFsyncPeriod; extern int32_t tsReplications; +extern int16_t tsPartitons; extern int32_t tsQuorum; extern int8_t tsUpdate; extern int8_t tsCacheLastRow; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 80bf48364c2c634a47122b0834f07d8c229c0ef7..78b90113a545cd42dbd80dbe870da53af69d7e3a 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -125,6 +125,7 @@ int8_t tsCompression = TSDB_DEFAULT_COMP_LEVEL; int8_t tsWAL = TSDB_DEFAULT_WAL_LEVEL; int32_t tsFsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD; int32_t tsReplications = TSDB_DEFAULT_DB_REPLICA_OPTION; +int16_t tsPartitons = TSDB_DEFAULT_DB_PARTITON_OPTION; int32_t tsQuorum = TSDB_DEFAULT_DB_QUORUM_OPTION; int8_t tsUpdate = TSDB_DEFAULT_DB_UPDATE_OPTION; int8_t tsCacheLastRow = TSDB_DEFAULT_CACHE_BLOCK_SIZE; @@ -853,6 +854,16 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); + cfg.option = "partitions"; + cfg.ptr = &tsPartitons; + cfg.valType = TAOS_CFG_VTYPE_INT16; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; + cfg.minValue = TSDB_MIN_DB_PARTITON_OPTION; + cfg.maxValue = TSDB_MAX_DB_PARTITON_OPTION; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + cfg.option = "quorum"; cfg.ptr = &tsQuorum; cfg.valType = TAOS_CFG_VTYPE_INT32; diff --git a/src/dnode/CMakeLists.txt b/src/dnode/CMakeLists.txt index 644a4e875d62622c07034639a4e08e584e99fdfb..516e5b4d1f59f432bc7cac402854b491ba0569ff 100644 --- a/src/dnode/CMakeLists.txt +++ b/src/dnode/CMakeLists.txt @@ -31,6 +31,10 @@ IF (TD_MQTT) TARGET_LINK_LIBRARIES(taosd mqtt) ENDIF () +IF (TD_TOPIC) + TARGET_LINK_LIBRARIES(taosd topic) +ENDIF () + SET(PREPARE_ENV_CMD "prepare_env_cmd") SET(PREPARE_ENV_TARGET "prepare_env_target") ADD_CUSTOM_COMMAND(OUTPUT ${PREPARE_ENV_CMD} diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 9f3c31f22595d6dc6007dc9517390b970b781943..1d6c684a01faba490577b9d6864be2091943818e 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -222,6 +222,9 @@ do { \ #define TSDB_MQTT_TOPIC_LEN 64 #define TSDB_MQTT_CLIENT_ID_LEN 32 +#define TSDB_DB_TYPE_DEFAULT 0 +#define TSDB_DB_TYPE_TOPIC 1 + #define TSDB_DEFAULT_PKT_SIZE 65480 //same as RPC_MAX_UDP_SIZE #define TSDB_PAYLOAD_SIZE TSDB_DEFAULT_PKT_SIZE @@ -306,6 +309,10 @@ do { \ #define TSDB_MAX_DB_REPLICA_OPTION 3 #define TSDB_DEFAULT_DB_REPLICA_OPTION 1 +#define TSDB_MIN_DB_PARTITON_OPTION 1 +#define TSDB_MAX_DB_PARTITON_OPTION 50000 +#define TSDB_DEFAULT_DB_PARTITON_OPTION 4 + #define TSDB_MIN_DB_QUORUM_OPTION 1 #define TSDB_MAX_DB_QUORUM_OPTION 2 #define TSDB_DEFAULT_DB_QUORUM_OPTION 1 diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 721b9ca6054d7a1fca12304eef33a2a0ce00ddfc..40446e56f56f9fdcad888de25a5d64a6a8f6c3ca 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -107,6 +107,12 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY13, "dummy13" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY14, "dummy14" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" ) +// message for topic +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_TP, "create-tp" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TP, "drop-tp" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_TP, "use-tp" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" ) + #ifndef TAOS_MESSAGE_C TSDB_MSG_TYPE_MAX // 105 #endif @@ -141,6 +147,7 @@ enum _mgmt_table { TSDB_MGMT_TABLE_VNODES, TSDB_MGMT_TABLE_STREAMTABLES, TSDB_MGMT_TABLE_CLUSTER, + TSDB_MGMT_TABLE_TP, TSDB_MGMT_TABLE_MAX, }; @@ -555,7 +562,9 @@ typedef struct { int8_t ignoreExist; int8_t update; int8_t cacheLastRow; - int8_t reserve[8]; + int8_t dbType; + int16_t partitions; + int8_t reserve[5]; } SCreateDbMsg, SAlterDbMsg; typedef struct { diff --git a/src/inc/tp.h b/src/inc/tp.h new file mode 100644 index 0000000000000000000000000000000000000000..d2165f1d61380c1cff9acdea28acc169e3589e46 --- /dev/null +++ b/src/inc/tp.h @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_TP +#define TDENGINE_TP + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t tpInit(); +void tpCleanUp(); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/mnode/inc/mnodeDef.h b/src/mnode/inc/mnodeDef.h index 5eeac97209634206d42e1c4aebec129e67ee38e5..ed1de1b87a475cf5a2264abb4b8787c0841d1b63 100644 --- a/src/mnode/inc/mnodeDef.h +++ b/src/mnode/inc/mnodeDef.h @@ -175,7 +175,9 @@ typedef struct { int8_t quorum; int8_t update; int8_t cacheLastRow; - int8_t reserved[10]; + int8_t dbType; + int16_t partitions; + int8_t reserved[7]; } SDbCfg; typedef struct SDbObj { diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index 9fdbaa79650c6b1a421501a7c5fe4b4d1709ea3f..1c6231558f582b9d79db3feaa46c1c175c47bddd 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -22,6 +22,7 @@ #include "tname.h" #include "tbn.h" #include "tdataformat.h" +#include "tp.h" #include "mnode.h" #include "mnodeDef.h" #include "mnodeInt.h" @@ -38,8 +39,8 @@ #include "mnodeVgroup.h" #define VG_LIST_SIZE 8 -int64_t tsDbRid = -1; -static void * tsDbSdb = NULL; +int64_t tsDbRid = -1; +void * tsDbSdb = NULL; static int32_t tsDbUpdateSize; static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg *pMsg); @@ -51,6 +52,11 @@ static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg); +#ifndef _TOPIC +int32_t tpInit() {} +void tpCleanUp() {} +#endif + static void mnodeDestroyDb(SDbObj *pDb) { pthread_mutex_destroy(&pDb->mutex); tfree(pDb->vgList); @@ -174,7 +180,14 @@ int32_t mnodeInitDbs() { mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_DB, mnodeGetDbMeta); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_DB, mnodeRetrieveDbs); mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_DB, mnodeCancelGetNextDb); - + + mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_CREATE_TP, mnodeProcessCreateDbMsg); + mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_ALTER_TP, mnodeProcessAlterDbMsg); + mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_DROP_TP, mnodeProcessDropDbMsg); + mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_TP, mnodeGetDbMeta); + mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_TP, mnodeRetrieveDbs); + mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_TP, mnodeCancelGetNextDb); + mDebug("table:dbs table is created"); return 0; } @@ -354,6 +367,8 @@ static void mnodeSetDefaultDbCfg(SDbCfg *pCfg) { if (pCfg->quorum < 0) pCfg->quorum = tsQuorum; if (pCfg->update < 0) pCfg->update = tsUpdate; if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = tsCacheLastRow; + if (pCfg->dbType < 0) pCfg->dbType = 0; + if (pCfg->partitions < 0) pCfg->partitions = tsPartitons; } static int32_t mnodeCreateDbCb(SMnodeMsg *pMsg, int32_t code) { @@ -408,7 +423,9 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg * .replications = pCreate->replications, .quorum = pCreate->quorum, .update = pCreate->update, - .cacheLastRow = pCreate->cacheLastRow + .cacheLastRow = pCreate->cacheLastRow, + .dbType = pCreate->dbType, + .partitions = pCreate->partitions }; mnodeSetDefaultDbCfg(&pDb->cfg); @@ -501,6 +518,7 @@ void mnodeRemoveVgroupFromDb(SVgObj *pVgroup) { } void mnodeCleanupDbs() { + tpCleanUp(); sdbCloseTable(tsDbRid); tsDbSdb = NULL; } @@ -660,7 +678,7 @@ static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn return 0; } -static char *mnodeGetDbStr(char *src) { +char *mnodeGetDbStr(char *src) { char *pos = strstr(src, TS_PATH_DELIMITER); if (pos != NULL) ++pos; @@ -679,7 +697,7 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void pShow->pIter = mnodeGetNextDb(pShow->pIter, &pDb); if (pDb == NULL) break; - if (pDb->pAcct != pUser->pAcct || pDb->status != TSDB_DB_STATUS_READY) { + if (pDb->pAcct != pUser->pAcct || pDb->status != TSDB_DB_STATUS_READY /*|| pDb->cfg.dbType != TSDB_DB_TYPE_DEFAULT*/) { mnodeDecDbRef(pDb); continue; } @@ -852,6 +870,7 @@ static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg) { pCreate->daysToKeep2 = htonl(pCreate->daysToKeep2); pCreate->commitTime = htonl(pCreate->commitTime); pCreate->fsyncPeriod = htonl(pCreate->fsyncPeriod); + pCreate->partitions = htons(pCreate->partitions); pCreate->minRowsPerFileBlock = htonl(pCreate->minRowsPerFileBlock); pCreate->maxRowsPerFileBlock = htonl(pCreate->maxRowsPerFileBlock); @@ -887,6 +906,8 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SAlterDbMsg *pAlter) { int8_t precision = pAlter->precision; int8_t update = pAlter->update; int8_t cacheLastRow = pAlter->cacheLastRow; + int8_t dbType = pAlter->dbType; + int16_t partitions = pAlter->partitions; terrno = TSDB_CODE_SUCCESS; @@ -1004,6 +1025,16 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SAlterDbMsg *pAlter) { newCfg.cacheLastRow = cacheLastRow; } + if (dbType >= 0 && dbType != pDb->cfg.dbType) { + mError("db:%s, can't alter dbType option", pDb->name); + terrno = TSDB_CODE_MND_INVALID_DB_OPTION; + } + + if (partitions >= 0 && partitions != pDb->cfg.partitions) { + mDebug("db:%s, partitions:%d change to %d", pDb->name, pDb->cfg.partitions, partitions); + newCfg.partitions = partitions; + } + return newCfg; }