未验证 提交 19427a9b 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #8926 from taosdata/feature/dnode3

Feature/dnode3
...@@ -98,6 +98,6 @@ tramp ...@@ -98,6 +98,6 @@ tramp
.\#* .\#*
TAGS TAGS
deps/* contrib/*
!deps/CMakeLists.txt !contrib/CMakeLists.txt
!deps/test !contrib/test
...@@ -6,77 +6,14 @@ project( ...@@ -6,77 +6,14 @@ project(
DESCRIPTION "An open-source big data platform designed and optimized for the Internet of Things(IOT)" DESCRIPTION "An open-source big data platform designed and optimized for the Internet of Things(IOT)"
) )
# ============================================================================
# DEPENDENCIES
# ============================================================================
set(CMAKE_SUPPORT_DIR "${CMAKE_SOURCE_DIR}/cmake") set(CMAKE_SUPPORT_DIR "${CMAKE_SOURCE_DIR}/cmake")
set(CMAKE_CONTRIB_DIR "${CMAKE_SOURCE_DIR}/contrib")
include(${CMAKE_SUPPORT_DIR}/cmake.options) include(${CMAKE_SUPPORT_DIR}/cmake.options)
function(cat IN_FILE OUT_FILE)
file(READ ${IN_FILE} CONTENTS)
file(APPEND ${OUT_FILE} "${CONTENTS}")
endfunction(cat IN_FILE OUT_FILE)
set(DEPS_TMP_FILE "${CMAKE_BINARY_DIR}/deps_tmp_CMakeLists.txt.in")
configure_file("${CMAKE_SUPPORT_DIR}/deps_CMakeLists.txt.in" ${DEPS_TMP_FILE})
## googletest
if(${BUILD_TEST})
cat("${CMAKE_SUPPORT_DIR}/gtest_CMakeLists.txt.in" ${DEPS_TMP_FILE})
endif(${BUILD_TEST})
## lz4
cat("${CMAKE_SUPPORT_DIR}/lz4_CMakeLists.txt.in" ${DEPS_TMP_FILE})
## zlib
cat("${CMAKE_SUPPORT_DIR}/zlib_CMakeLists.txt.in" ${DEPS_TMP_FILE})
## cJson
cat("${CMAKE_SUPPORT_DIR}/cjson_CMakeLists.txt.in" ${DEPS_TMP_FILE})
## leveldb
if(${BUILD_WITH_LEVELDB})
cat("${CMAKE_SUPPORT_DIR}/leveldb_CMakeLists.txt.in" ${DEPS_TMP_FILE})
endif(${BUILD_WITH_LEVELDB})
## rocksdb
if(${BUILD_WITH_ROCKSDB})
cat("${CMAKE_SUPPORT_DIR}/rocksdb_CMakeLists.txt.in" ${DEPS_TMP_FILE})
add_definitions(-DUSE_ROCKSDB)
endif(${BUILD_WITH_ROCKSDB})
## bdb
if(${BUILD_WITH_BDB})
cat("${CMAKE_SUPPORT_DIR}/bdb_CMakeLists.txt.in" ${DEPS_TMP_FILE})
endif(${BUILD_WITH_DBD})
## sqlite
if(${BUILD_WITH_SQLITE})
cat("${CMAKE_SUPPORT_DIR}/sqlite_CMakeLists.txt.in" ${DEPS_TMP_FILE})
endif(${BUILD_WITH_SQLITE})
## lucene
if(${BUILD_WITH_LUCENE})
cat("${CMAKE_SUPPORT_DIR}/lucene_CMakeLists.txt.in" ${DEPS_TMP_FILE})
add_definitions(-DUSE_LUCENE)
endif(${BUILD_WITH_LUCENE})
## NuRaft
if(${BUILD_WITH_NURAFT})
cat("${CMAKE_SUPPORT_DIR}/nuraft_CMakeLists.txt.in" ${DEPS_TMP_FILE})
endif(${BUILD_WITH_NURAFT})
## download dependencies
configure_file(${DEPS_TMP_FILE} "${CMAKE_SOURCE_DIR}/deps/deps-download/CMakeLists.txt")
execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" .
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/deps/deps-download")
execute_process(COMMAND "${CMAKE_COMMAND}" --build .
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/deps/deps-download")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fPIC -gdwarf-2 -msse4.2 -mfma") SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fPIC -gdwarf-2 -msse4.2 -mfma")
# deps # contrib
add_subdirectory(deps) add_subdirectory(contrib)
# api # api
add_library(api INTERFACE) add_library(api INTERFACE)
......
...@@ -3,8 +3,8 @@ ...@@ -3,8 +3,8 @@
ExternalProject_Add(bdb ExternalProject_Add(bdb
GIT_REPOSITORY https://github.com/berkeleydb/libdb.git GIT_REPOSITORY https://github.com/berkeleydb/libdb.git
GIT_TAG v5.3.28 GIT_TAG v5.3.28
SOURCE_DIR "${CMAKE_SOURCE_DIR}/deps/bdb" SOURCE_DIR "${CMAKE_CONTRIB_DIR}/bdb"
BINARY_DIR "${CMAKE_SOURCE_DIR}/deps/bdb" BINARY_DIR "${CMAKE_CONTRIB_DIR}/bdb"
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE
CONFIGURE_COMMAND "./dist/configure" CONFIGURE_COMMAND "./dist/configure"
BUILD_COMMAND "$(MAKE)" BUILD_COMMAND "$(MAKE)"
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
ExternalProject_Add(cjson ExternalProject_Add(cjson
GIT_REPOSITORY https://github.com/taosdata-contrib/cJSON.git GIT_REPOSITORY https://github.com/taosdata-contrib/cJSON.git
GIT_TAG v1.7.15 GIT_TAG v1.7.15
SOURCE_DIR "${CMAKE_SOURCE_DIR}/deps/cJson" SOURCE_DIR "${CMAKE_CONTRIB_DIR}/cJson"
BINARY_DIR "" BINARY_DIR ""
CONFIGURE_COMMAND "" CONFIGURE_COMMAND ""
BUILD_COMMAND "" BUILD_COMMAND ""
......
...@@ -52,5 +52,5 @@ option( ...@@ -52,5 +52,5 @@ option(
option( option(
BUILD_DOCS BUILD_DOCS
"If use doxygen build documents" "If use doxygen build documents"
ON OFF
) )
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
ExternalProject_Add(googletest ExternalProject_Add(googletest
GIT_REPOSITORY https://github.com/taosdata-contrib/googletest.git GIT_REPOSITORY https://github.com/taosdata-contrib/googletest.git
GIT_TAG release-1.11.0 GIT_TAG release-1.11.0
SOURCE_DIR "${CMAKE_SOURCE_DIR}/deps/googletest" SOURCE_DIR "${CMAKE_CONTRIB_DIR}/googletest"
BINARY_DIR "" BINARY_DIR ""
CONFIGURE_COMMAND "" CONFIGURE_COMMAND ""
BUILD_COMMAND "" BUILD_COMMAND ""
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
ExternalProject_Add(leveldb ExternalProject_Add(leveldb
GIT_REPOSITORY https://github.com/taosdata-contrib/leveldb.git GIT_REPOSITORY https://github.com/taosdata-contrib/leveldb.git
GIT_TAG master GIT_TAG master
SOURCE_DIR "${CMAKE_SOURCE_DIR}/deps/leveldb" SOURCE_DIR "${CMAKE_CONTRIB_DIR}/leveldb"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE
CONFIGURE_COMMAND "" CONFIGURE_COMMAND ""
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# lucene # lucene
ExternalProject_Add(lucene ExternalProject_Add(lucene
GIT_REPOSITORY https://github.com/yihaoDeng/LucenePlusPlus.git GIT_REPOSITORY https://github.com/yihaoDeng/LucenePlusPlus.git
SOURCE_DIR "${CMAKE_SOURCE_DIR}/deps/lucene" SOURCE_DIR "${CMAKE_CONTRIB_DIR}/lucene"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE
CONFIGURE_COMMAND "" CONFIGURE_COMMAND ""
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
ExternalProject_Add(lz4 ExternalProject_Add(lz4
GIT_REPOSITORY https://github.com/taosdata-contrib/lz4.git GIT_REPOSITORY https://github.com/taosdata-contrib/lz4.git
GIT_TAG v1.9.3 GIT_TAG v1.9.3
SOURCE_DIR "${CMAKE_SOURCE_DIR}/deps/lz4" SOURCE_DIR "${CMAKE_CONTRIB_DIR}/lz4"
BINARY_DIR "" BINARY_DIR ""
CONFIGURE_COMMAND "" CONFIGURE_COMMAND ""
BUILD_COMMAND "" BUILD_COMMAND ""
......
...@@ -3,8 +3,8 @@ ...@@ -3,8 +3,8 @@
ExternalProject_Add(NuRaft ExternalProject_Add(NuRaft
GIT_REPOSITORY https://github.com/eBay/NuRaft.git GIT_REPOSITORY https://github.com/eBay/NuRaft.git
GIT_TAG v1.3.0 GIT_TAG v1.3.0
SOURCE_DIR "${CMAKE_SOURCE_DIR}/deps/nuraft" SOURCE_DIR "${CMAKE_CONTRIB_DIR}/nuraft"
BINARY_DIR "${CMAKE_SOURCE_DIR}/deps/nuraft" BINARY_DIR "${CMAKE_CONTRIB_DIR}/nuraft"
CONFIGURE_COMMAND "./prepare.sh" CONFIGURE_COMMAND "./prepare.sh"
BUILD_COMMAND "" BUILD_COMMAND ""
INSTALL_COMMAND "" INSTALL_COMMAND ""
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
ExternalProject_Add(rocksdb ExternalProject_Add(rocksdb
GIT_REPOSITORY https://github.com/taosdata-contrib/rocksdb.git GIT_REPOSITORY https://github.com/taosdata-contrib/rocksdb.git
GIT_TAG v6.23.3 GIT_TAG v6.23.3
SOURCE_DIR "${CMAKE_SOURCE_DIR}/deps/rocksdb" SOURCE_DIR "${CMAKE_CONTRIB_DIR}/rocksdb"
CONFIGURE_COMMAND "" CONFIGURE_COMMAND ""
BUILD_COMMAND "" BUILD_COMMAND ""
INSTALL_COMMAND "" INSTALL_COMMAND ""
......
...@@ -3,8 +3,8 @@ ...@@ -3,8 +3,8 @@
ExternalProject_Add(sqlite ExternalProject_Add(sqlite
GIT_REPOSITORY https://github.com/sqlite/sqlite.git GIT_REPOSITORY https://github.com/sqlite/sqlite.git
GIT_TAG version-3.36.0 GIT_TAG version-3.36.0
SOURCE_DIR "${CMAKE_SOURCE_DIR}/deps/sqlite" SOURCE_DIR "${CMAKE_CONTRIB_DIR}/sqlite"
BINARY_DIR "${CMAKE_SOURCE_DIR}/deps/sqlite" BINARY_DIR "${CMAKE_CONTRIB_DIR}/sqlite"
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE
CONFIGURE_COMMAND "./configure" CONFIGURE_COMMAND "./configure"
BUILD_COMMAND "$(MAKE)" BUILD_COMMAND "$(MAKE)"
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
ExternalProject_Add(zlib ExternalProject_Add(zlib
GIT_REPOSITORY https://github.com/taosdata-contrib/zlib.git GIT_REPOSITORY https://github.com/taosdata-contrib/zlib.git
GIT_TAG v1.2.11 GIT_TAG v1.2.11
SOURCE_DIR "${CMAKE_SOURCE_DIR}/deps/zlib" SOURCE_DIR "${CMAKE_CONTRIB_DIR}/zlib"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE
CONFIGURE_COMMAND "" CONFIGURE_COMMAND ""
......
# ================================================================================================ # ================================================================================================
# DEPENDENCIES # Download
# ================================================================================================
function(cat IN_FILE OUT_FILE)
file(READ ${IN_FILE} CONTENTS)
file(APPEND ${OUT_FILE} "${CONTENTS}")
endfunction(cat IN_FILE OUT_FILE)
set(CONTRIB_TMP_FILE "${CMAKE_BINARY_DIR}/deps_tmp_CMakeLists.txt.in")
configure_file("${CMAKE_SUPPORT_DIR}/deps_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
# googletest
if(${BUILD_TEST})
cat("${CMAKE_SUPPORT_DIR}/gtest_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${BUILD_TEST})
# lz4
cat("${CMAKE_SUPPORT_DIR}/lz4_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
# zlib
cat("${CMAKE_SUPPORT_DIR}/zlib_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
# cJson
cat("${CMAKE_SUPPORT_DIR}/cjson_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
# leveldb
if(${BUILD_WITH_LEVELDB})
cat("${CMAKE_SUPPORT_DIR}/leveldb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${BUILD_WITH_LEVELDB})
# rocksdb
if(${BUILD_WITH_ROCKSDB})
cat("${CMAKE_SUPPORT_DIR}/rocksdb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
add_definitions(-DUSE_ROCKSDB)
endif(${BUILD_WITH_ROCKSDB})
# bdb
if(${BUILD_WITH_BDB})
cat("${CMAKE_SUPPORT_DIR}/bdb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${BUILD_WITH_DBD})
# sqlite
if(${BUILD_WITH_SQLITE})
cat("${CMAKE_SUPPORT_DIR}/sqlite_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${BUILD_WITH_SQLITE})
# lucene
if(${BUILD_WITH_LUCENE})
cat("${CMAKE_SUPPORT_DIR}/lucene_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
add_definitions(-DUSE_LUCENE)
endif(${BUILD_WITH_LUCENE})
# NuRaft
if(${BUILD_WITH_NURAFT})
cat("${CMAKE_SUPPORT_DIR}/nuraft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${BUILD_WITH_NURAFT})
# download dependencies
configure_file(${CONTRIB_TMP_FILE} "${CMAKE_CONTRIB_DIR}/deps-download/CMakeLists.txt")
execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" .
WORKING_DIRECTORY "${CMAKE_CONTRIB_DIR}/deps-download")
execute_process(COMMAND "${CMAKE_COMMAND}" --build .
WORKING_DIRECTORY "${CMAKE_CONTRIB_DIR}/deps-download")
# ================================================================================================
# Build
# ================================================================================================ # ================================================================================================
# googletest # googletest
if(${BUILD_TEST}) if(${BUILD_TEST})
...@@ -82,7 +146,7 @@ endif(${BUILD_WITH_NURAFT}) ...@@ -82,7 +146,7 @@ endif(${BUILD_WITH_NURAFT})
# BDB # BDB
if(${BUILD_WITH_BDB}) if(${BUILD_WITH_BDB})
add_library(bdb STATIC IMPORTED) add_library(bdb STATIC IMPORTED GLOBAL)
set_target_properties(bdb PROPERTIES set_target_properties(bdb PROPERTIES
IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/bdb/libdb.a" IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/bdb/libdb.a"
INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/bdb" INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/bdb"
...@@ -93,8 +157,9 @@ if(${BUILD_WITH_BDB}) ...@@ -93,8 +157,9 @@ if(${BUILD_WITH_BDB})
endif(${BUILD_WITH_BDB}) endif(${BUILD_WITH_BDB})
# SQLite # SQLite
# see https://stackoverflow.com/questions/8774593/cmake-link-to-external-library#comment58570736_10550334
if(${BUILD_WITH_SQLITE}) if(${BUILD_WITH_SQLITE})
add_library(sqlite STATIC IMPORTED) add_library(sqlite STATIC IMPORTED GLOBAL)
set_target_properties(sqlite PROPERTIES set_target_properties(sqlite PROPERTIES
IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/sqlite/.libs/libsqlite3.a" IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/sqlite/.libs/libsqlite3.a"
INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/sqlite" INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/sqlite"
...@@ -109,7 +174,7 @@ endif(${BUILD_WITH_SQLITE}) ...@@ -109,7 +174,7 @@ endif(${BUILD_WITH_SQLITE})
# ================================================================================================ # ================================================================================================
# DEPENDENCY TEST # Build test
# ================================================================================================ # ================================================================================================
if(${BUILD_DEPENDENCY_TESTS}) if(${BUILD_DEPENDENCY_TESTS})
add_subdirectory(test) add_subdirectory(test)
......
...@@ -81,6 +81,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STABLE, "alter-stable" ) ...@@ -81,6 +81,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STABLE, "alter-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STABLE, "drop-stable" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STABLE, "drop-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_STABLE_VGROUP, "stable-vgroup" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_STABLE_VGROUP, "stable-vgroup" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_QUERY, "kill-query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_QUERY, "kill-query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_STREAM, "kill-stream" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_CONN, "kill-conn" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_CONN, "kill-conn" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_HEARTBEAT, "heartbeat" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_HEARTBEAT, "heartbeat" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW, "show" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW, "show" )
...@@ -153,7 +154,8 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY9, "dummy9" ) ...@@ -153,7 +154,8 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY9, "dummy9" )
#define TSDB_IE_TYPE_DNODE_EXT 6 #define TSDB_IE_TYPE_DNODE_EXT 6
#define TSDB_IE_TYPE_DNODE_STATE 7 #define TSDB_IE_TYPE_DNODE_STATE 7
enum _mgmt_table { typedef enum _mgmt_table {
TSDB_MGMT_TABLE_START,
TSDB_MGMT_TABLE_ACCT, TSDB_MGMT_TABLE_ACCT,
TSDB_MGMT_TABLE_USER, TSDB_MGMT_TABLE_USER,
TSDB_MGMT_TABLE_DB, TSDB_MGMT_TABLE_DB,
...@@ -175,7 +177,7 @@ enum _mgmt_table { ...@@ -175,7 +177,7 @@ enum _mgmt_table {
TSDB_MGMT_TABLE_TP, TSDB_MGMT_TABLE_TP,
TSDB_MGMT_TABLE_FUNCTION, TSDB_MGMT_TABLE_FUNCTION,
TSDB_MGMT_TABLE_MAX, TSDB_MGMT_TABLE_MAX,
}; } EShowType;
#define TSDB_ALTER_TABLE_ADD_TAG_COLUMN 1 #define TSDB_ALTER_TABLE_ADD_TAG_COLUMN 1
#define TSDB_ALTER_TABLE_DROP_TAG_COLUMN 2 #define TSDB_ALTER_TABLE_DROP_TAG_COLUMN 2
...@@ -352,11 +354,9 @@ typedef struct { ...@@ -352,11 +354,9 @@ typedef struct {
} SUpdateTableTagValMsg; } SUpdateTableTagValMsg;
typedef struct { typedef struct {
char clientVersion[TSDB_VERSION_LEN];
char msgVersion[TSDB_VERSION_LEN];
char db[TSDB_TABLE_FNAME_LEN];
char appName[TSDB_APPNAME_LEN];
int32_t pid; int32_t pid;
char app[TSDB_APP_NAME_LEN];
char db[TSDB_DB_NAME_LEN];
} SConnectMsg; } SConnectMsg;
typedef struct SEpSet { typedef struct SEpSet {
...@@ -367,15 +367,14 @@ typedef struct SEpSet { ...@@ -367,15 +367,14 @@ typedef struct SEpSet {
} SEpSet; } SEpSet;
typedef struct { typedef struct {
char acctId[TSDB_ACCT_ID_LEN]; int32_t acctId;
char serverVersion[TSDB_VERSION_LEN]; int32_t clusterId;
char clusterId[TSDB_CLUSTER_ID_LEN]; int32_t connId;
int8_t writeAuth; int8_t superAuth;
int8_t superAuth; int8_t readAuth;
int8_t reserved1; int8_t writeAuth;
int8_t reserved2; int8_t reserved[5];
int32_t connId; SEpSet epSet;
SEpSet epSet;
} SConnectRsp; } SConnectRsp;
typedef struct { typedef struct {
...@@ -874,23 +873,23 @@ typedef struct { ...@@ -874,23 +873,23 @@ typedef struct {
} SStreamDesc; } SStreamDesc;
typedef struct { typedef struct {
char clientVer[TSDB_VERSION_LEN]; int32_t connId;
uint32_t connId; int32_t pid;
int32_t pid; int32_t numOfQueries;
int32_t numOfQueries; int32_t numOfStreams;
int32_t numOfStreams; char app[TSDB_APP_NAME_LEN];
char appName[TSDB_APPNAME_LEN]; char pData[];
char pData[];
} SHeartBeatMsg; } SHeartBeatMsg;
typedef struct { typedef struct {
uint32_t queryId; int32_t connId;
uint32_t streamId; int32_t queryId;
uint32_t totalDnodes; int32_t streamId;
uint32_t onlineDnodes; int32_t totalDnodes;
uint32_t connId; int32_t onlineDnodes;
int8_t killConnection; int8_t killConnection;
SEpSet epSet; int8_t reserved[3];
SEpSet epSet;
} SHeartBeatRsp; } SHeartBeatRsp;
typedef struct { typedef struct {
......
...@@ -57,6 +57,7 @@ typedef struct { ...@@ -57,6 +57,7 @@ typedef struct {
int32_t sver; int32_t sver;
int32_t statusInterval; int32_t statusInterval;
int32_t mnodeEqualVnodeNum; int32_t mnodeEqualVnodeNum;
int32_t shellActivityTimer;
char *timezone; char *timezone;
char *locale; char *locale;
char *charset; char *charset;
......
...@@ -174,7 +174,7 @@ do { \ ...@@ -174,7 +174,7 @@ do { \
#define TSDB_MAX_SQL_SHOW_LEN 512 #define TSDB_MAX_SQL_SHOW_LEN 512
#define TSDB_MAX_ALLOWED_SQL_LEN (1*1024*1024u) // sql length should be less than 1mb #define TSDB_MAX_ALLOWED_SQL_LEN (1*1024*1024u) // sql length should be less than 1mb
#define TSDB_APPNAME_LEN TSDB_UNI_LEN #define TSDB_APP_NAME_LEN TSDB_UNI_LEN
/** /**
* In some scenarios uint16_t (0~65535) is used to store the row len. * In some scenarios uint16_t (0~65535) is used to store the row len.
......
...@@ -41,8 +41,10 @@ char * paGetToken(char *src, char **token, int32_t *tokenLen); ...@@ -41,8 +41,10 @@ char * paGetToken(char *src, char **token, int32_t *tokenLen);
int32_t taosByteArrayToHexStr(char bytes[], int32_t len, char hexstr[]); int32_t taosByteArrayToHexStr(char bytes[], int32_t len, char hexstr[]);
int32_t taosHexStrToByteArray(char hexstr[], char bytes[]); int32_t taosHexStrToByteArray(char hexstr[], char bytes[]);
char * taosIpStr(uint32_t ipInt); char *taosIpStr(uint32_t ipInt);
uint32_t ip2uint(const char *const ip_addr); uint32_t ip2uint(const char *const ip_addr);
void taosIp2String(uint32_t ip, char *str);
void taosIpPort2String(uint32_t ip, uint16_t port, char *str);
static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) { static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) {
MD5_CTX context; MD5_CTX context;
......
...@@ -14,3 +14,7 @@ target_include_directories( ...@@ -14,3 +14,7 @@ target_include_directories(
PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/mgmt" PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/mgmt"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
) )
if(${BUILD_TEST})
add_subdirectory(test)
endif(${BUILD_TEST})
\ No newline at end of file
...@@ -334,6 +334,7 @@ static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) { ...@@ -334,6 +334,7 @@ static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) {
pOption->sver = pDnode->opt.sver; pOption->sver = pDnode->opt.sver;
pOption->statusInterval = pDnode->opt.statusInterval; pOption->statusInterval = pDnode->opt.statusInterval;
pOption->mnodeEqualVnodeNum = pDnode->opt.mnodeEqualVnodeNum; pOption->mnodeEqualVnodeNum = pDnode->opt.mnodeEqualVnodeNum;
pOption->shellActivityTimer = pDnode->opt.shellActivityTimer;
pOption->timezone = pDnode->opt.timezone; pOption->timezone = pDnode->opt.timezone;
pOption->charset = pDnode->opt.charset; pOption->charset = pDnode->opt.charset;
pOption->locale = pDnode->opt.locale; pOption->locale = pDnode->opt.locale;
...@@ -675,7 +676,7 @@ void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -675,7 +676,7 @@ void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = dndAcquireMnode(pDnode); SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pSyncQ, pMsg) != 0) { if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pReadQ, pMsg) != 0) {
SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno}; SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno};
rpcSendResponse(&rsp); rpcSendResponse(&rsp);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
......
...@@ -76,6 +76,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { ...@@ -76,6 +76,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_STABLE] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_DROP_STABLE] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = dndProcessMnodeReadMsg; pMgmt->msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_KILL_QUERY] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_KILL_QUERY] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_KILL_STREAM] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_KILL_CONN] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_KILL_CONN] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_HEARTBEAT] = dndProcessMnodeReadMsg; pMgmt->msgFp[TSDB_MSG_TYPE_HEARTBEAT] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_SHOW] = dndProcessMnodeReadMsg; pMgmt->msgFp[TSDB_MSG_TYPE_SHOW] = dndProcessMnodeReadMsg;
...@@ -157,7 +158,7 @@ static int32_t dndInitClient(SDnode *pDnode) { ...@@ -157,7 +158,7 @@ static int32_t dndInitClient(SDnode *pDnode) {
rpcInit.label = "DND-C"; rpcInit.label = "DND-C";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = dndProcessResponse; rpcInit.cfp = dndProcessResponse;
rpcInit.sessions = 8; rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = pDnode->opt.shellActivityTimer * 1000; rpcInit.idleTime = pDnode->opt.shellActivityTimer * 1000;
rpcInit.user = INTERNAL_USER; rpcInit.user = INTERNAL_USER;
......
add_subdirectory(test01)
\ No newline at end of file
add_executable(dndTest01 "")
target_sources(dndTest01
PRIVATE
"dndTest01.cpp"
"../util/dndTestDeploy.cpp"
)
target_link_libraries(
dndTest01
PUBLIC dnode
PUBLIC util
PUBLIC os
PUBLIC gtest_main
)
target_include_directories(dndTest01
PUBLIC
"${CMAKE_SOURCE_DIR}/include/server/dnode/mgmt"
"${CMAKE_CURRENT_SOURCE_DIR}/../../inc"
"${CMAKE_CURRENT_SOURCE_DIR}/../util"
)
enable_testing()
add_test(
NAME dndTest01
COMMAND dndTest01
)
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "dndTestDeploy.h"
class DndTest01 : public ::testing::Test {
protected:
void SetUp() override {
pServer = createServer("/tmp/dndTest01");
pClient = createClient("root", "taosdata");
}
void TearDown() override {
dropServer(pServer);
dropClient(pClient);
}
SServer* pServer;
SClient* pClient;
};
TEST_F(DndTest01, connectMsg) {
SConnectMsg* pReq = (SConnectMsg*)rpcMallocCont(sizeof(SConnectMsg));
pReq->pid = htonl(1234);
strcpy(pReq->app, "test01");
strcpy(pReq->db, "");
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SConnectMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_CONNECT;
sendMsg(pClient, &rpcMsg);
SConnectRsp* pRsp = (SConnectRsp*)pClient->pRsp;
ASSERT(pRsp);
pRsp->acctId = htonl(pRsp->acctId);
pRsp->clusterId = htonl(pRsp->clusterId);
pRsp->connId = htonl(pRsp->connId);
pRsp->epSet.port[0] = htonl(pRsp->epSet.port[0]);
EXPECT_EQ(pRsp->acctId, 1);
EXPECT_GT(pRsp->clusterId, 0);
EXPECT_GT(pRsp->connId, 1);
EXPECT_EQ(pRsp->superAuth, 1);
EXPECT_EQ(pRsp->readAuth, 1);
EXPECT_EQ(pRsp->writeAuth, 1);
EXPECT_EQ(pRsp->epSet.inUse, 0);
EXPECT_EQ(pRsp->epSet.numOfEps, 1);
EXPECT_EQ(pRsp->epSet.port[0], 9527);
EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost");
}
// TEST_F(DndTest01, heartbeatMsg) {
// SHeartBeatMsg* pReq = (SHeartBeatMsg*)rpcMallocCont(sizeof(SHeartBeatMsg));
// pReq->connId = htonl(1);
// pReq->pid = htonl(1234);
// pReq->numOfQueries = htonl(0);
// pReq->numOfStreams = htonl(0);
// strcpy(pReq->app, "test01");
// SRpcMsg rpcMsg = {0};
// rpcMsg.pCont = pReq;
// rpcMsg.contLen = sizeof(SHeartBeatMsg);
// rpcMsg.msgType = TSDB_MSG_TYPE_HEARTBEAT;
// sendMsg(pClient, &rpcMsg);
// SHeartBeatRsp* pRsp = (SHeartBeatRsp*)pClient->pRsp;
// ASSERT(pRsp);
// pRsp->epSet.port[0] = htonl(pRsp->epSet.port[0]);
// EXPECT_EQ(htonl(pRsp->connId), 1);
// EXPECT_GT(htonl(pRsp->queryId), 0);
// EXPECT_GT(htonl(pRsp->streamId), 1);
// EXPECT_EQ(htonl(pRsp->totalDnodes), 1);
// EXPECT_EQ(htonl(pRsp->onlineDnodes), 1);
// EXPECT_EQ(pRsp->killConnection, 0);
// EXPECT_EQ(pRsp->epSet.inUse, 0);
// EXPECT_EQ(pRsp->epSet.numOfEps, 1);
// EXPECT_EQ(pRsp->epSet.port[0], 9527);
// EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost");
// }
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "dndTestDeploy.h"
void initLog(char *path) {
mDebugFlag = 207;
char temp[PATH_MAX];
snprintf(temp, PATH_MAX, "%s/taosdlog", path);
if (taosInitLog(temp, tsNumOfLogLines, 1) != 0) {
printf("failed to init log file\n");
}
}
void* runServer(void* param) {
SServer* pServer = (SServer*)param;
while (1) {
taosMsleep(100);
pthread_testcancel();
}
}
void initOption(SDnodeOpt* pOption, char *path) {
pOption->sver = 1;
pOption->numOfCores = 1;
pOption->numOfSupportMnodes = 1;
pOption->numOfSupportVnodes = 1;
pOption->numOfSupportQnodes = 1;
pOption->statusInterval = 1;
pOption->mnodeEqualVnodeNum = 1;
pOption->numOfThreadsPerCore = 1;
pOption->ratioOfQueryCores = 1;
pOption->maxShellConns = 1000;
pOption->shellActivityTimer = 30;
pOption->serverPort = 9527;
strcpy(pOption->dataDir, path);
strcpy(pOption->localEp, "localhost:9527");
strcpy(pOption->localFqdn, "localhost");
strcpy(pOption->firstEp, "localhost:9527");
taosRemoveDir(path);
taosMkDir(path);
}
SServer* createServer(char *path) {
SDnodeOpt option = {0};
initOption(&option, path);
SDnode* pDnode = dndInit(&option);
ASSERT(pDnode);
SServer* pServer = (SServer*)calloc(1, sizeof(SServer));
ASSERT(pServer);
pServer->pDnode = pDnode;
pServer->threadId = taosCreateThread(runServer, pServer);
ASSERT(pServer->threadId);
return pServer;
}
void dropServer(SServer* pServer) {
if (pServer->threadId != NULL) {
taosDestoryThread(pServer->threadId);
}
}
void processClientRsp(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
SClient* pClient = (SClient*)parent;
pClient->pRsp = pMsg;
taosMsleep(100000);
tsem_post(&pClient->sem);
}
SClient* createClient(char *user, char *pass) {
SClient* pClient = (SClient*)calloc(1, sizeof(SClient));
ASSERT(pClient);
char secretEncrypt[32] = {0};
taosEncryptPass((uint8_t*)pass, strlen(pass), secretEncrypt);
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.label = "DND-C";
rpcInit.numOfThreads = 1;
rpcInit.cfp = processClientRsp;
rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = 30 * 1000;
rpcInit.user = user;
rpcInit.ckey = "key";
rpcInit.parent = pClient;
rpcInit.secret = (char*)secretEncrypt;
rpcInit.parent = pClient;
// rpcInit.spi = 1;
pClient->clientRpc = rpcOpen(&rpcInit);
ASSERT(pClient->clientRpc);
tsem_init(&pClient->sem, 0, 0);
return pClient;
}
void dropClient(SClient* pClient) {
tsem_destroy(&pClient->sem);
rpcClose(pClient->clientRpc);
}
void sendMsg(SClient* pClient, SRpcMsg* pMsg) {
SEpSet epSet = {0};
epSet.inUse = 0;
epSet.numOfEps = 1;
epSet.port[0] = 9527;
strcpy(epSet.fqdn[0], "localhost");
rpcSendRequest(pClient->clientRpc, &epSet, pMsg, NULL);
tsem_wait(&pClient->sem);
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include "os.h"
#include "dnode.h"
#include "taosmsg.h"
#include "tconfig.h"
#include "tglobal.h"
#include "tnote.h"
#include "trpc.h"
#include "tthread.h"
#include "ulog.h"
typedef struct {
SDnode* pDnode;
pthread_t* threadId;
} SServer;
typedef struct {
void* clientRpc;
SRpcMsg* pRsp;
tsem_t sem;
} SClient;
SServer* createServer(char* path);
void dropServer(SServer* pServer);
SClient* createClient(char *user, char *pass);
void dropClient(SClient* pClient);
void sendMsg(SClient* pClient, SRpcMsg* pMsg);
...@@ -24,6 +24,8 @@ extern "C" { ...@@ -24,6 +24,8 @@ extern "C" {
int32_t mndInitDb(SMnode *pMnode); int32_t mndInitDb(SMnode *pMnode);
void mndCleanupDb(SMnode *pMnode); void mndCleanupDb(SMnode *pMnode);
SDbObj *mndAcquireDb(SMnode *pMnode, char *db);
void mndReleaseDb(SMnode *pMnode, SDbObj *pDb);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -185,9 +185,11 @@ typedef struct SUserObj { ...@@ -185,9 +185,11 @@ typedef struct SUserObj {
char acct[TSDB_USER_LEN]; char acct[TSDB_USER_LEN];
int64_t createdTime; int64_t createdTime;
int64_t updateTime; int64_t updateTime;
int8_t rootAuth; int8_t superAuth;
int8_t readAuth;
int8_t writeAuth;
int32_t acctId;
SHashObj *prohibitDbHash; SHashObj *prohibitDbHash;
SAcctObj *pAcct;
} SUserObj; } SUserObj;
typedef struct { typedef struct {
...@@ -281,26 +283,30 @@ typedef struct SFuncObj { ...@@ -281,26 +283,30 @@ typedef struct SFuncObj {
int16_t type; int16_t type;
} SFuncObj; } SFuncObj;
typedef struct { typedef struct SShowObj SShowObj;
int8_t type; typedef struct SShowObj {
int8_t maxReplica; int8_t type;
int16_t numOfColumns; int8_t maxReplica;
int32_t index; int16_t numOfColumns;
int32_t rowSize; int32_t id;
int32_t numOfRows; int32_t rowSize;
int32_t numOfReads; int32_t numOfRows;
uint16_t payloadLen; int32_t numOfReads;
void *pIter; uint16_t payloadLen;
void *pVgIter; void *pIter;
void **ppShow; void *pVgIter;
char db[TSDB_FULL_DB_NAME_LEN]; SMnode *pMnode;
int16_t offset[TSDB_MAX_COLUMNS]; SShowObj **ppShow;
int32_t bytes[TSDB_MAX_COLUMNS]; char db[TSDB_FULL_DB_NAME_LEN];
char payload[]; int16_t offset[TSDB_MAX_COLUMNS];
int32_t bytes[TSDB_MAX_COLUMNS];
char payload[];
} SShowObj; } SShowObj;
typedef struct SMnodeMsg { typedef struct SMnodeMsg {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char db[TSDB_FULL_DB_NAME_LEN];
int32_t acctId;
SMnode *pMnode; SMnode *pMnode;
int16_t received; int16_t received;
int16_t successed; int16_t successed;
......
...@@ -18,15 +18,19 @@ ...@@ -18,15 +18,19 @@
#include "mndDef.h" #include "mndDef.h"
#include "sdb.h" #include "sdb.h"
#include "tcache.h"
#include "tqueue.h" #include "tqueue.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
typedef int32_t (*MndMsgFp)(SMnode *pMnode, SMnodeMsg *pMsg); typedef int32_t (*MndMsgFp)(SMnodeMsg *pMsg);
typedef int32_t (*MndInitFp)(SMnode *pMnode); typedef int32_t (*MndInitFp)(SMnode *pMnode);
typedef void (*MndCleanupFp)(SMnode *pMnode); typedef void (*MndCleanupFp)(SMnode *pMnode);
typedef int32_t (*ShowMetaFp)(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
typedef int32_t (*ShowRetrieveFp)(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
typedef void (*ShowFreeIterFp)(SMnode *pMnode, void *pIter);
typedef struct { typedef struct {
const char *name; const char *name;
...@@ -34,6 +38,19 @@ typedef struct { ...@@ -34,6 +38,19 @@ typedef struct {
MndCleanupFp cleanupFp; MndCleanupFp cleanupFp;
} SMnodeStep; } SMnodeStep;
typedef struct {
int32_t showId;
ShowMetaFp metaFps[TSDB_MGMT_TABLE_MAX];
ShowRetrieveFp retrieveFps[TSDB_MGMT_TABLE_MAX];
ShowFreeIterFp freeIterFps[TSDB_MGMT_TABLE_MAX];
SCacheObj *cache;
} SShowMgmt;
typedef struct {
int32_t connId;
SCacheObj *cache;
} SProfileMgmt;
typedef struct SMnode { typedef struct SMnode {
int32_t dnodeId; int32_t dnodeId;
int32_t clusterId; int32_t clusterId;
...@@ -45,6 +62,8 @@ typedef struct SMnode { ...@@ -45,6 +62,8 @@ typedef struct SMnode {
SSdb *pSdb; SSdb *pSdb;
SDnode *pDnode; SDnode *pDnode;
SArray *pSteps; SArray *pSteps;
SShowMgmt showMgmt;
SProfileMgmt profileMgmt;
MndMsgFp msgFp[TSDB_MSG_TYPE_MAX]; MndMsgFp msgFp[TSDB_MSG_TYPE_MAX];
SendMsgToDnodeFp sendMsgToDnodeFp; SendMsgToDnodeFp sendMsgToDnodeFp;
SendMsgToMnodeFp sendMsgToMnodeFp; SendMsgToMnodeFp sendMsgToMnodeFp;
...@@ -53,6 +72,7 @@ typedef struct SMnode { ...@@ -53,6 +72,7 @@ typedef struct SMnode {
int32_t sver; int32_t sver;
int32_t statusInterval; int32_t statusInterval;
int32_t mnodeEqualVnodeNum; int32_t mnodeEqualVnodeNum;
int32_t shellActivityTimer;
char *timezone; char *timezone;
char *locale; char *locale;
char *charset; char *charset;
......
...@@ -25,6 +25,7 @@ extern "C" { ...@@ -25,6 +25,7 @@ extern "C" {
int32_t mndInitMnode(SMnode *pMnode); int32_t mndInitMnode(SMnode *pMnode);
void mndCleanupMnode(SMnode *pMnode); void mndCleanupMnode(SMnode *pMnode);
bool mndIsMnode(SMnode *pMnode, int32_t dnodeId); bool mndIsMnode(SMnode *pMnode, int32_t dnodeId);
void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -24,6 +24,10 @@ extern "C" { ...@@ -24,6 +24,10 @@ extern "C" {
int32_t mndInitShow(SMnode *pMnode); int32_t mndInitShow(SMnode *pMnode);
void mndCleanupShow(SMnode *pMnode); void mndCleanupShow(SMnode *pMnode);
void mndAddShowMetaHandle(SMnode *pMnode, EShowType showType, ShowMetaFp fp);
void mndAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp);
void mndAddShowFreeIterHandle(SMnode *pMnode, EShowType msgType, ShowFreeIterFp fp);
void mnodeVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -24,7 +24,7 @@ extern "C" { ...@@ -24,7 +24,7 @@ extern "C" {
int32_t mndInitUser(SMnode *pMnode); int32_t mndInitUser(SMnode *pMnode);
void mndCleanupUser(SMnode *pMnode); void mndCleanupUser(SMnode *pMnode);
SUserObj *mndAcquireUser(SMnode *pMnode, const char *userName); SUserObj *mndAcquireUser(SMnode *pMnode, char *userName);
void mndReleaseUser(SMnode *pMnode, SUserObj *pUser); void mndReleaseUser(SMnode *pMnode, SUserObj *pUser);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -14,10 +14,61 @@ ...@@ -14,10 +14,61 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndInt.h" #include "mndAcct.h"
#include "mndShow.h"
#define SDB_ACCT_VER 1 #define SDB_ACCT_VER 1
static int32_t mnodeCreateDefaultAcct(SMnode *pMnode);
static SSdbRaw *mnodeAcctActionEncode(SAcctObj *pAcct);
static SSdbRow *mnodeAcctActionDecode(SSdbRaw *pRaw);
static int32_t mnodeAcctActionInsert(SSdb *pSdb, SAcctObj *pAcct);
static int32_t mnodeAcctActionDelete(SSdb *pSdb, SAcctObj *pAcct);
static int32_t mnodeAcctActionUpdate(SSdb *pSdb, SAcctObj *pSrcAcct, SAcctObj *pDstAcct);
static int32_t mndProcessCreateAcctMsg(SMnodeMsg *pMnodeMsg);
static int32_t mndProcessAlterAcctMsg(SMnodeMsg *pMnodeMsg);
static int32_t mndProcessDropAcctMsg(SMnodeMsg *pMnodeMsg);
int32_t mndInitAcct(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_ACCT,
.keyType = SDB_KEY_BINARY,
.deployFp = mnodeCreateDefaultAcct,
.encodeFp = (SdbEncodeFp)mnodeAcctActionEncode,
.decodeFp = (SdbDecodeFp)mnodeAcctActionDecode,
.insertFp = (SdbInsertFp)mnodeAcctActionInsert,
.updateFp = (SdbUpdateFp)mnodeAcctActionUpdate,
.deleteFp = (SdbDeleteFp)mnodeAcctActionDelete};
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_ACCT, mndProcessCreateAcctMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_ALTER_ACCT, mndProcessAlterAcctMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_ACCT, mndProcessDropAcctMsg);
return sdbSetTable(pMnode->pSdb, table);
}
void mndCleanupAcct(SMnode *pMnode) {}
static int32_t mnodeCreateDefaultAcct(SMnode *pMnode) {
SAcctObj acctObj = {0};
tstrncpy(acctObj.acct, TSDB_DEFAULT_USER, TSDB_USER_LEN);
acctObj.createdTime = taosGetTimestampMs();
acctObj.updateTime = acctObj.createdTime;
acctObj.acctId = 1;
acctObj.cfg = (SAcctCfg){.maxUsers = 1024,
.maxDbs = 1024,
.maxTimeSeries = INT32_MAX,
.maxStreams = 8092,
.maxStorage = INT64_MAX,
.accessState = TSDB_VN_ALL_ACCCESS};
SSdbRaw *pRaw = mnodeAcctActionEncode(&acctObj);
if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mTrace("acct:%s, will be created while deploy sdb", acctObj.acct);
return sdbWrite(pMnode->pSdb, pRaw);
}
static SSdbRaw *mnodeAcctActionEncode(SAcctObj *pAcct) { static SSdbRaw *mnodeAcctActionEncode(SAcctObj *pAcct) {
SSdbRaw *pRaw = sdbAllocRaw(SDB_ACCT, SDB_ACCT_VER, sizeof(SAcctObj)); SSdbRaw *pRaw = sdbAllocRaw(SDB_ACCT, SDB_ACCT_VER, sizeof(SAcctObj));
if (pRaw == NULL) return NULL; if (pRaw == NULL) return NULL;
...@@ -92,40 +143,20 @@ static int32_t mnodeAcctActionUpdate(SSdb *pSdb, SAcctObj *pSrcAcct, SAcctObj *p ...@@ -92,40 +143,20 @@ static int32_t mnodeAcctActionUpdate(SSdb *pSdb, SAcctObj *pSrcAcct, SAcctObj *p
return 0; return 0;
} }
static int32_t mnodeCreateDefaultAcct(SMnode *pMnode) { static int32_t mndProcessCreateAcctMsg(SMnodeMsg *pMnodeMsg) {
int32_t code = 0; terrno = TSDB_CODE_MND_MSG_NOT_PROCESSED;
mError("failed to process create acct msg since %s", terrstr());
SAcctObj acctObj = {0}; return -1;
tstrncpy(acctObj.acct, TSDB_DEFAULT_USER, TSDB_USER_LEN);
acctObj.createdTime = taosGetTimestampMs();
acctObj.updateTime = acctObj.createdTime;
acctObj.acctId = 1;
acctObj.cfg = (SAcctCfg){.maxUsers = 1024,
.maxDbs = 1024,
.maxTimeSeries = INT32_MAX,
.maxStreams = 8092,
.maxStorage = INT64_MAX,
.accessState = TSDB_VN_ALL_ACCCESS};
SSdbRaw *pRaw = mnodeAcctActionEncode(&acctObj);
if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mTrace("acct:%s, will be created while deploy sdb", acctObj.acct);
return sdbWrite(pMnode->pSdb, pRaw);
} }
int32_t mndInitAcct(SMnode *pMnode) { static int32_t mndProcessAlterAcctMsg(SMnodeMsg *pMnodeMsg) {
SSdbTable table = {.sdbType = SDB_ACCT, terrno = TSDB_CODE_MND_MSG_NOT_PROCESSED;
.keyType = SDB_KEY_BINARY, mError("failed to process create acct msg since %s", terrstr());
.deployFp = mnodeCreateDefaultAcct, return -1;
.encodeFp = (SdbEncodeFp)mnodeAcctActionEncode,
.decodeFp = (SdbDecodeFp)mnodeAcctActionDecode,
.insertFp = (SdbInsertFp)mnodeAcctActionInsert,
.updateFp = (SdbUpdateFp)mnodeAcctActionUpdate,
.deleteFp = (SdbDeleteFp)mnodeAcctActionDelete};
return sdbSetTable(pMnode->pSdb, table);
} }
void mndCleanupAcct(SMnode *pMnode) {} static int32_t mndProcessDropAcctMsg(SMnodeMsg *pMnodeMsg) {
terrno = TSDB_CODE_MND_MSG_NOT_PROCESSED;
mError("failed to process create acct msg since %s", terrstr());
return -1;
}
\ No newline at end of file
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndCluster.h" #include "mndCluster.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "mndShow.h"
#define SDB_CLUSTER_VER 1 #define SDB_CLUSTER_VER 1
...@@ -94,6 +95,71 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) { ...@@ -94,6 +95,71 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
return sdbWrite(pMnode->pSdb, pRaw); return sdbWrite(pMnode->pSdb, pRaw);
} }
// static int32_t mnodeGetClusterMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
// int32_t cols = 0;
// SSchema *pSchema = pMeta->schema;
// pShow->bytes[cols] = TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE;
// pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
// strcpy(pSchema[cols].name, "clusterId");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
// pShow->bytes[cols] = 8;
// pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
// strcpy(pSchema[cols].name, "create_time");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
// pMeta->numOfColumns = htons(cols);
// strcpy(pMeta->tableFname, "show cluster");
// pShow->numOfColumns = cols;
// pShow->offset[0] = 0;
// for (int32_t i = 1; i < cols; ++i) {
// pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
// }
// pShow->numOfRows = 1;
// pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
// return 0;
// }
// static int32_t mnodeRetrieveClusters(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
// int32_t numOfRows = 0;
// int32_t cols = 0;
// char * pWrite;
// SClusterObj *pCluster = NULL;
// while (numOfRows < rows) {
// pShow->pIter = mnodeGetNextCluster(pShow->pIter, &pCluster);
// if (pCluster == NULL) break;
// cols = 0;
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pCluster->uid, TSDB_CLUSTER_ID_LEN);
// cols++;
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// *(int64_t *) pWrite = pCluster->createdTime;
// cols++;
// mnodeDecClusterRef(pCluster);
// numOfRows++;
// }
// mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
// pShow->numOfReads += numOfRows;
// return numOfRows;
// }
// static void mnodeCancelGetNextCluster(void *pIter) {
// sdbFreeIter(tsClusterSdb, pIter);
// }
int32_t mndInitCluster(SMnode *pMnode) { int32_t mndInitCluster(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_CLUSTER, SSdbTable table = {.sdbType = SDB_CLUSTER,
.keyType = SDB_KEY_INT32, .keyType = SDB_KEY_INT32,
...@@ -104,6 +170,9 @@ int32_t mndInitCluster(SMnode *pMnode) { ...@@ -104,6 +170,9 @@ int32_t mndInitCluster(SMnode *pMnode) {
.updateFp = (SdbUpdateFp)mndClusterActionUpdate, .updateFp = (SdbUpdateFp)mndClusterActionUpdate,
.deleteFp = (SdbDeleteFp)mndClusterActionDelete}; .deleteFp = (SdbDeleteFp)mndClusterActionDelete};
// mndAddShowMetaHandle(TSDB_MGMT_TABLE_CLUSTER, mnodeGetClusterMeta);
// mndAddShowRetrieveHandle(TSDB_MGMT_TABLE_CLUSTER, mnodeRetrieveClusters);
// mndAddShowFreeIterHandle(TSDB_MGMT_TABLE_CLUSTER, mnodeCancelGetNextCluster);
return sdbSetTable(pMnode->pSdb, table); return sdbSetTable(pMnode->pSdb, table);
} }
......
...@@ -14,8 +14,39 @@ ...@@ -14,8 +14,39 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "mndDb.h"
#include "mndInt.h"
int32_t mndInitDb(SMnode *pMnode) { return 0; } static int32_t mnodeProcessUseMsg(SMnodeMsg *pMsg);
void mndCleanupDb(SMnode *pMnode) {}
\ No newline at end of file int32_t mndInitDb(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_USE_DB, mnodeProcessUseMsg);
return 0;
}
void mndCleanupDb(SMnode *pMnode) {}
SDbObj *mndAcquireDb(SMnode *pMnode, char *db) {
SSdb *pSdb = pMnode->pSdb;
return sdbAcquire(pSdb, SDB_DB, db);
}
void mndReleaseDb(SMnode *pMnode, SDbObj *pDb) {
SSdb *pSdb = pMnode->pSdb;
sdbRelease(pSdb, pDb);
}
static int32_t mnodeProcessUseMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SUseDbMsg *pUse = pMsg->rpcMsg.pCont;
strncpy(pMsg->db, pUse->db, TSDB_FULL_DB_NAME_LEN);
SDbObj *pDb = mndAcquireDb(pMnode, pMsg->db);
if (pDb != NULL) {
mndReleaseDb(pMnode, pDb);
return 0;
} else {
mError("db:%s, failed to process use db msg since %s", pMsg->db, terrstr());
return -1;
}
}
...@@ -226,7 +226,8 @@ static void mndParseStatusMsg(SStatusMsg *pStatus) { ...@@ -226,7 +226,8 @@ static void mndParseStatusMsg(SStatusMsg *pStatus) {
pStatus->clusterCfg.checkTime = htobe64(pStatus->clusterCfg.checkTime); pStatus->clusterCfg.checkTime = htobe64(pStatus->clusterCfg.checkTime);
} }
static int32_t mndProcessStatusMsg(SMnode *pMnode, SMnodeMsg *pMsg) { static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SStatusMsg *pStatus = pMsg->rpcMsg.pCont; SStatusMsg *pStatus = pMsg->rpcMsg.pCont;
mndParseStatusMsg(pStatus); mndParseStatusMsg(pStatus);
...@@ -315,11 +316,11 @@ static int32_t mndProcessStatusMsg(SMnode *pMnode, SMnodeMsg *pMsg) { ...@@ -315,11 +316,11 @@ static int32_t mndProcessStatusMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
return 0; return 0;
} }
static int32_t mndProcessCreateDnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessCreateDnodeMsg(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessDropDnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessDropDnodeMsg(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessConfigDnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessConfigDnodeMsg(SMnodeMsg *pMsg) { return 0; }
int32_t mndInitDnode(SMnode *pMnode) { int32_t mndInitDnode(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_DNODE, SSdbTable table = {.sdbType = SDB_DNODE,
......
...@@ -103,9 +103,9 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) { ...@@ -103,9 +103,9 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
return sdbWrite(pMnode->pSdb, pRaw); return sdbWrite(pMnode->pSdb, pRaw);
} }
static int32_t mndProcessCreateMnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessCreateMnodeMsg(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessDropMnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessDropMnodeMsg(SMnodeMsg *pMsg) { return 0; }
int32_t mndInitMnode(SMnode *pMnode) { int32_t mndInitMnode(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_MNODE, SSdbTable table = {.sdbType = SDB_MNODE,
...@@ -135,4 +135,6 @@ bool mndIsMnode(SMnode *pMnode, int32_t dnodeId) { ...@@ -135,4 +135,6 @@ bool mndIsMnode(SMnode *pMnode, int32_t dnodeId) {
sdbRelease(pSdb, pMnodeObj); sdbRelease(pSdb, pMnodeObj);
return true; return true;
} }
\ No newline at end of file
void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {}
\ No newline at end of file
...@@ -14,8 +14,322 @@ ...@@ -14,8 +14,322 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "mndShow.h"
#include "mndInt.h"
int32_t mndInitShow(SMnode *pMnode) { return 0; } static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg);
void mndCleanupShow(SMnode *pMnode) {} static int32_t mndProcessRetrieveMsg( SMnodeMsg *pMsg);
\ No newline at end of file static bool mndCheckRetrieveFinished(SShowObj *pShow);
static int32_t mndAcquireShowObj(SMnode *pMnode, SShowObj *pShow);
static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove);
static int32_t mndPutShowObj(SMnode *pMnode, SShowObj *pShow);
static void mndFreeShowObj(void *ppShow);
static char *mndShowStr(int32_t showType);
int32_t mndInitShow(SMnode *pMnode) {
SShowMgmt *pMgmt = &pMnode->showMgmt;
pMgmt->cache = taosCacheInit(TSDB_CACHE_PTR_KEY, 5, true, mndFreeShowObj, "show");
if (pMgmt->cache == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to alloc show cache since %s", terrstr());
return -1;
}
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_SHOW, mndProcessShowMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_SHOW_RETRIEVE, mndProcessRetrieveMsg);
return 0;
}
void mndCleanupShow(SMnode *pMnode) {
SShowMgmt *pMgmt = &pMnode->showMgmt;
if (pMgmt->cache != NULL) {
taosCacheCleanup(pMgmt->cache);
pMgmt->cache = NULL;
}
}
static int32_t mndAcquireShowObj(SMnode *pMnode, SShowObj *pShow) {
TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE)pShow;
SShowMgmt *pMgmt = &pMnode->showMgmt;
SShowObj **ppShow = taosCacheAcquireByKey(pMgmt->cache, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE));
if (ppShow) {
mTrace("show:%d, data:%p acquired from cache", pShow->id, ppShow);
return 0;
}
return -1;
}
static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove) {
SMnode *pMnode = pShow->pMnode;
SShowMgmt *pMgmt = &pMnode->showMgmt;
SShowObj **ppShow = (SShowObj **)pShow->ppShow;
taosCacheRelease(pMgmt->cache, (void **)(&ppShow), forceRemove);
mDebug("show:%d, data:%p released from cache, force:%d", pShow->id, ppShow, forceRemove);
}
static int32_t mndPutShowObj(SMnode *pMnode, SShowObj *pShow) {
SShowMgmt *pMgmt = &pMnode->showMgmt;
int32_t lifeSpan = pMnode->shellActivityTimer * 6 * 1000;
TSDB_CACHE_PTR_TYPE val = (TSDB_CACHE_PTR_TYPE)pShow;
pShow->id = atomic_add_fetch_32(&pMgmt->showId, 1);
SShowObj **ppShow =
taosCachePut(pMgmt->cache, &val, sizeof(TSDB_CACHE_PTR_TYPE), &pShow, sizeof(TSDB_CACHE_PTR_TYPE), lifeSpan);
if (ppShow == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("show:%d, failed to put into cache", pShow->id);
return -1;
}
mTrace("show:%d, data:%p put into cache", pShow->id, ppShow);
return 0;
}
static void mndFreeShowObj(void *ppShow) {
SShowObj *pShow = *(SShowObj **)ppShow;
SMnode *pMnode = pShow->pMnode;
SShowMgmt *pMgmt = &pMnode->showMgmt;
ShowFreeIterFp freeFp = pMgmt->freeIterFps[pShow->type];
if (freeFp != NULL) {
if (pShow->pVgIter != NULL) {
// only used in 'show vnodes "ep"'
(*freeFp)(pMnode, pShow->pVgIter);
}
if (pShow->pIter != NULL) {
(*freeFp)(pMnode, pShow->pIter);
}
}
mDebug("show:%d, data:%p destroyed", pShow->id, ppShow);
tfree(pShow);
}
static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg) {
SMnode *pMnode = pMnodeMsg->pMnode;
SShowMgmt *pMgmt = &pMnode->showMgmt;
SShowMsg *pMsg = pMnodeMsg->rpcMsg.pCont;
int8_t type = pMsg->type;
uint16_t payloadLen = htonl(pMsg->payloadLen);
if (type <= TSDB_MGMT_TABLE_START || type >= TSDB_MGMT_TABLE_MAX) {
terrno = TSDB_CODE_MND_INVALID_MSG_TYPE;
mError("failed to process show msg since %s", terrstr());
return -1;
}
ShowMetaFp metaFp = pMgmt->metaFps[type];
if (metaFp == NULL) {
terrno = TSDB_CODE_MND_INVALID_MSG_TYPE;
mError("failed to process show-meta msg:%s since no message handle", mndShowStr(type));
return -1;
}
int32_t size = sizeof(SShowObj) + payloadLen;
SShowObj *pShow = calloc(1, size);
if (pShow != NULL) {
pShow->pMnode = pMnode;
pShow->type = type;
pShow->payloadLen = payloadLen;
memcpy(pShow->db, pMsg->db, TSDB_FULL_DB_NAME_LEN);
memcpy(pShow->payload, pMsg->payload, payloadLen);
} else {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to process show-meta msg:%s since %s", mndShowStr(type), terrstr());
return -1;
}
if (mndPutShowObj(pMnode, pShow) == 0) {
mError("failed to process show-meta msg:%s since %s", mndShowStr(type), terrstr());
free(pShow);
return -1;
}
size = sizeof(SShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE;
SShowRsp *pRsp = rpcMallocCont(size);
if (pRsp == NULL) {
mndReleaseShowObj(pShow, true);
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("show:%d, failed to process show-meta msg:%s since malloc rsp error", pShow->id, mndShowStr(type));
return -1;
}
pRsp->qhandle = htobe64((uint64_t)pShow);
int32_t code = (*metaFp)(pMnodeMsg,pShow, &pRsp->tableMeta);
mDebug("show:%d, type:%s, get meta finished, numOfRows:%d cols:%d result:%s", pShow->id, mndShowStr(type),
pShow->numOfRows, pShow->numOfColumns, tstrerror(code));
if (code == TSDB_CODE_SUCCESS) {
pMnodeMsg->contLen = sizeof(SShowRsp) + sizeof(SSchema) * pShow->numOfColumns;
pMnodeMsg->pCont = pRsp;
mndReleaseShowObj(pShow, false);
return TSDB_CODE_SUCCESS;
} else {
rpcFreeCont(pRsp);
mndReleaseShowObj(pShow, true);
return code;
}
}
static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) {
SMnode *pMnode = pMnodeMsg->pMnode;
SShowMgmt *pMgmt = &pMnode->showMgmt;
int32_t rowsToRead = 0;
int32_t size = 0;
int32_t rowsRead = 0;
SRetrieveTableMsg *pRetrieve = pMnodeMsg->rpcMsg.pCont;
pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
SShowObj *pShow = (SShowObj *)pRetrieve->qhandle;
/*
* in case of server restart, apps may hold qhandle created by server before
* restart, which is actually invalid, therefore, signature check is required.
*/
if (mndAcquireShowObj(pMnode, pShow) != 0) {
terrno = TSDB_CODE_MND_INVALID_SHOWOBJ;
mError("failed to process show-retrieve msg:%p since %s", pShow, terrstr());
return -1;
}
ShowRetrieveFp retrieveFp = pMgmt->retrieveFps[pShow->type];
if (retrieveFp == NULL) {
mndReleaseShowObj(pShow, false);
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
mError("show:%d, failed to retrieve data since %s", pShow->id, terrstr());
return -1;
}
mDebug("show:%d, type:%s, start retrieve data, numOfReads:%d numOfRows:%d", pShow->id, mndShowStr(pShow->type),
pShow->numOfReads, pShow->numOfRows);
if (mndCheckRetrieveFinished(pShow)) {
mDebug("show:%d, read finished, numOfReads:%d numOfRows:%d", pShow->id, pShow->numOfReads, pShow->numOfRows);
pShow->numOfReads = pShow->numOfRows;
}
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
rowsToRead = pShow->numOfRows - pShow->numOfReads;
}
/* return no more than 100 tables in one round trip */
if (rowsToRead > 100) rowsToRead = 100;
/*
* the actual number of table may be larger than the value of pShow->numOfRows, if a query is
* issued during a continuous create table operation. Therefore, rowToRead may be less than 0.
*/
if (rowsToRead < 0) rowsToRead = 0;
size = pShow->rowSize * rowsToRead;
size += 100;
SRetrieveTableRsp *pRsp = rpcMallocCont(size);
if (pRsp == NULL) {
mndReleaseShowObj(pShow, false);
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("show:%d, failed to retrieve data since %s", pShow->id, terrstr());
return -1;
}
// if free flag is set, client wants to clean the resources
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
rowsRead = (*retrieveFp)(pMnodeMsg, pShow, pRsp->data, rowsToRead);
}
mDebug("show:%d, stop retrieve data, rowsRead:%d rowsToRead:%d", pShow->id, rowsRead, rowsToRead);
pRsp->numOfRows = htonl(rowsRead);
pRsp->precision = (int16_t)htonl(TSDB_TIME_PRECISION_MILLI); // millisecond time precision
pMnodeMsg->pCont = pRsp;
pMnodeMsg->contLen = size;
if (rowsToRead == 0 || (rowsRead == rowsToRead && pShow->numOfRows == pShow->numOfReads)) {
pRsp->completed = 1;
mDebug("%p, retrieve completed", pShow);
mndReleaseShowObj(pShow, true);
} else {
mDebug("%p, retrieve not completed yet", pShow);
mndReleaseShowObj(pShow, false);
}
return TSDB_CODE_SUCCESS;
}
static char *mndShowStr(int32_t showType) {
switch (showType) {
case TSDB_MGMT_TABLE_ACCT:
return "show accounts";
case TSDB_MGMT_TABLE_USER:
return "show users";
case TSDB_MGMT_TABLE_DB:
return "show databases";
case TSDB_MGMT_TABLE_TABLE:
return "show tables";
case TSDB_MGMT_TABLE_DNODE:
return "show dnodes";
case TSDB_MGMT_TABLE_MNODE:
return "show mnodes";
case TSDB_MGMT_TABLE_VGROUP:
return "show vgroups";
case TSDB_MGMT_TABLE_METRIC:
return "show stables";
case TSDB_MGMT_TABLE_MODULE:
return "show modules";
case TSDB_MGMT_TABLE_QUERIES:
return "show queries";
case TSDB_MGMT_TABLE_STREAMS:
return "show streams";
case TSDB_MGMT_TABLE_VARIABLES:
return "show configs";
case TSDB_MGMT_TABLE_CONNS:
return "show connections";
case TSDB_MGMT_TABLE_SCORES:
return "show scores";
case TSDB_MGMT_TABLE_GRANTS:
return "show grants";
case TSDB_MGMT_TABLE_VNODES:
return "show vnodes";
case TSDB_MGMT_TABLE_CLUSTER:
return "show clusters";
case TSDB_MGMT_TABLE_STREAMTABLES:
return "show streamtables";
case TSDB_MGMT_TABLE_TP:
return "show topics";
default:
return "undefined";
}
}
static bool mndCheckRetrieveFinished(SShowObj *pShow) {
if (pShow->pIter == NULL && pShow->numOfReads != 0) {
return true;
}
return false;
}
void mnodeVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow) {
if (rows < capacity) {
for (int32_t i = 0; i < numOfCols; ++i) {
memmove(data + pShow->offset[i] * rows, data + pShow->offset[i] * capacity, pShow->bytes[i] * rows);
}
}
}
void mndAddShowMetaHandle(SMnode *pMnode, EShowType showType, ShowMetaFp fp) {
SShowMgmt *pMgmt = &pMnode->showMgmt;
pMgmt->metaFps[showType] = fp;
}
void mndAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp) {
SShowMgmt *pMgmt = &pMnode->showMgmt;
pMgmt->retrieveFps[showType] = fp;
}
void mndAddShowFreeIterHandle(SMnode *pMnode, EShowType showType, ShowFreeIterFp fp) {
SShowMgmt *pMgmt = &pMnode->showMgmt;
pMgmt->freeIterFps[showType] = fp;
}
...@@ -14,12 +14,76 @@ ...@@ -14,12 +14,76 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndUser.h"
#include "mndShow.h"
#include "mndSync.h" #include "mndSync.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "tkey.h" #include "tkey.h"
#define SDB_USER_VER 1 #define SDB_USER_VER 1
static int32_t mndCreateDefaultUsers(SMnode *pMnode);
static SSdbRaw *mndUserActionEncode(SUserObj *pUser);
static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw);
static int32_t mndUserActionInsert(SSdb *pSdb, SUserObj *pUser);
static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser);
static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pSrcUser, SUserObj *pDstUser);
static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass, SMnodeMsg *pMsg);
static int32_t mndProcessCreateUserMsg(SMnodeMsg *pMsg);
static int32_t mndProcessAlterUserMsg(SMnodeMsg *pMsg);
static int32_t mndProcessDropUserMsg(SMnodeMsg *pMsg);
int32_t mndInitUser(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_USER,
.keyType = SDB_KEY_BINARY,
.deployFp = (SdbDeployFp)mndCreateDefaultUsers,
.encodeFp = (SdbEncodeFp)mndUserActionEncode,
.decodeFp = (SdbDecodeFp)mndUserActionDecode,
.insertFp = (SdbInsertFp)mndUserActionInsert,
.updateFp = (SdbUpdateFp)mndUserActionUpdate,
.deleteFp = (SdbDeleteFp)mndUserActionDelete};
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_USER, mndProcessCreateUserMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_ALTER_USER, mndProcessAlterUserMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_USER, mndProcessDropUserMsg);
return sdbSetTable(pMnode->pSdb, table);
}
void mndCleanupUser(SMnode *pMnode) {}
static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char *pass) {
SUserObj userObj = {0};
tstrncpy(userObj.user, user, TSDB_USER_LEN);
tstrncpy(userObj.acct, acct, TSDB_USER_LEN);
taosEncryptPass((uint8_t *)pass, strlen(pass), userObj.pass);
userObj.createdTime = taosGetTimestampMs();
userObj.updateTime = userObj.createdTime;
if (strcmp(user, TSDB_DEFAULT_USER) == 0) {
userObj.superAuth = 1;
}
SSdbRaw *pRaw = mndUserActionEncode(&userObj);
if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mTrace("user:%s, will be created while deploy sdb", userObj.user);
return sdbWrite(pMnode->pSdb, pRaw);
}
static int32_t mndCreateDefaultUsers(SMnode *pMnode) {
if (mndCreateDefaultUser(pMnode, TSDB_DEFAULT_USER, TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS) != 0) {
return -1;
}
if (mndCreateDefaultUser(pMnode, TSDB_DEFAULT_USER, "_" TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS) != 0) {
return -1;
}
return 0;
}
static SSdbRaw *mndUserActionEncode(SUserObj *pUser) { static SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, SDB_USER_VER, sizeof(SUserObj)); SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, SDB_USER_VER, sizeof(SUserObj));
if (pRaw == NULL) return NULL; if (pRaw == NULL) return NULL;
...@@ -30,7 +94,7 @@ static SSdbRaw *mndUserActionEncode(SUserObj *pUser) { ...@@ -30,7 +94,7 @@ static SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
SDB_SET_BINARY(pRaw, dataPos, pUser->acct, TSDB_USER_LEN) SDB_SET_BINARY(pRaw, dataPos, pUser->acct, TSDB_USER_LEN)
SDB_SET_INT64(pRaw, dataPos, pUser->createdTime) SDB_SET_INT64(pRaw, dataPos, pUser->createdTime)
SDB_SET_INT64(pRaw, dataPos, pUser->updateTime) SDB_SET_INT64(pRaw, dataPos, pUser->updateTime)
SDB_SET_INT8(pRaw, dataPos, pUser->rootAuth) SDB_SET_INT8(pRaw, dataPos, pUser->superAuth)
SDB_SET_DATALEN(pRaw, dataPos); SDB_SET_DATALEN(pRaw, dataPos);
return pRaw; return pRaw;
...@@ -56,7 +120,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) { ...@@ -56,7 +120,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
SDB_GET_BINARY(pRaw, pRow, dataPos, pUser->acct, TSDB_USER_LEN) SDB_GET_BINARY(pRaw, pRow, dataPos, pUser->acct, TSDB_USER_LEN)
SDB_GET_INT64(pRaw, pRow, dataPos, &pUser->createdTime) SDB_GET_INT64(pRaw, pRow, dataPos, &pUser->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pUser->updateTime) SDB_GET_INT64(pRaw, pRow, dataPos, &pUser->updateTime)
SDB_GET_INT8(pRaw, pRow, dataPos, &pUser->rootAuth) SDB_GET_INT8(pRaw, pRow, dataPos, &pUser->superAuth)
return pRow; return pRow;
} }
...@@ -70,12 +134,14 @@ static int32_t mndUserActionInsert(SSdb *pSdb, SUserObj *pUser) { ...@@ -70,12 +134,14 @@ static int32_t mndUserActionInsert(SSdb *pSdb, SUserObj *pUser) {
return -1; return -1;
} }
pUser->pAcct = sdbAcquire(pSdb, SDB_ACCT, pUser->acct); SAcctObj *pAcct = sdbAcquire(pSdb, SDB_ACCT, pUser->acct);
if (pUser->pAcct == NULL) { if (pAcct == NULL) {
terrno = TSDB_CODE_MND_ACCT_NOT_EXIST; terrno = TSDB_CODE_MND_ACCT_NOT_EXIST;
mError("user:%s, failed to perform insert action since %s", pUser->user, terrstr()); mError("user:%s, failed to perform insert action since %s", pUser->user, terrstr());
return -1; return -1;
} }
pUser->acctId = pAcct->acctId;
sdbRelease(pSdb, pAcct);
return 0; return 0;
} }
...@@ -87,11 +153,6 @@ static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) { ...@@ -87,11 +153,6 @@ static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) {
pUser->prohibitDbHash = NULL; pUser->prohibitDbHash = NULL;
} }
if (pUser->pAcct != NULL) {
sdbRelease(pSdb, pUser->pAcct);
pUser->pAcct = NULL;
}
return 0; return 0;
} }
...@@ -102,40 +163,18 @@ static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pSrcUser, SUserObj *pDs ...@@ -102,40 +163,18 @@ static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pSrcUser, SUserObj *pDs
memcpy(pSrcUser->acct, pDstUser->acct, TSDB_USER_LEN); memcpy(pSrcUser->acct, pDstUser->acct, TSDB_USER_LEN);
pSrcUser->createdTime = pDstUser->createdTime; pSrcUser->createdTime = pDstUser->createdTime;
pSrcUser->updateTime = pDstUser->updateTime; pSrcUser->updateTime = pDstUser->updateTime;
pSrcUser->rootAuth = pDstUser->rootAuth; pSrcUser->superAuth = pDstUser->superAuth;
return 0; return 0;
} }
static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char *pass) { SUserObj *mndAcquireUser(SMnode *pMnode, char *userName) {
SUserObj userObj = {0}; SSdb *pSdb = pMnode->pSdb;
tstrncpy(userObj.user, user, TSDB_USER_LEN); return sdbAcquire(pSdb, SDB_USER, userName);
tstrncpy(userObj.acct, acct, TSDB_USER_LEN);
taosEncryptPass((uint8_t *)pass, strlen(pass), userObj.pass);
userObj.createdTime = taosGetTimestampMs();
userObj.updateTime = userObj.createdTime;
if (strcmp(user, TSDB_DEFAULT_USER) == 0) {
userObj.rootAuth = 1;
}
SSdbRaw *pRaw = mndUserActionEncode(&userObj);
if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mTrace("user:%s, will be created while deploy sdb", userObj.user);
return sdbWrite(pMnode->pSdb, pRaw);
} }
static int32_t mndCreateDefaultUsers(SMnode *pMnode) { void mndReleaseUser(SMnode *pMnode, SUserObj *pUser) {
if (mndCreateDefaultUser(pMnode, TSDB_DEFAULT_USER, TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS) != 0) { SSdb *pSdb = pMnode->pSdb;
return -1; sdbRelease(pSdb, pUser);
}
if (mndCreateDefaultUser(pMnode, TSDB_DEFAULT_USER, "_" TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS) != 0) {
return -1;
}
return 0;
} }
static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass, SMnodeMsg *pMsg) { static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass, SMnodeMsg *pMsg) {
...@@ -145,7 +184,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass, ...@@ -145,7 +184,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass,
taosEncryptPass((uint8_t *)pass, strlen(pass), userObj.pass); taosEncryptPass((uint8_t *)pass, strlen(pass), userObj.pass);
userObj.createdTime = taosGetTimestampMs(); userObj.createdTime = taosGetTimestampMs();
userObj.updateTime = userObj.createdTime; userObj.updateTime = userObj.createdTime;
userObj.rootAuth = 0; userObj.superAuth = 0;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
if (pTrans == NULL) return -1; if (pTrans == NULL) return -1;
...@@ -183,7 +222,8 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass, ...@@ -183,7 +222,8 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass,
return 0; return 0;
} }
static int32_t mndProcessCreateUserMsg(SMnode *pMnode, SMnodeMsg *pMsg) { static int32_t mndProcessCreateUserMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SCreateUserMsg *pCreate = pMsg->rpcMsg.pCont; SCreateUserMsg *pCreate = pMsg->rpcMsg.pCont;
if (pCreate->user[0] == 0) { if (pCreate->user[0] == 0) {
...@@ -224,30 +264,14 @@ static int32_t mndProcessCreateUserMsg(SMnode *pMnode, SMnodeMsg *pMsg) { ...@@ -224,30 +264,14 @@ static int32_t mndProcessCreateUserMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
int32_t mndInitUser(SMnode *pMnode) { static int32_t mndProcessAlterUserMsg(SMnodeMsg *pMsg) {
SSdbTable table = {.sdbType = SDB_USER, terrno = TSDB_CODE_MND_MSG_NOT_PROCESSED;
.keyType = SDB_KEY_BINARY, mError("failed to process alter user msg since %s", terrstr());
.deployFp = (SdbDeployFp)mndCreateDefaultUsers, return -1;
.encodeFp = (SdbEncodeFp)mndUserActionEncode,
.decodeFp = (SdbDecodeFp)mndUserActionDecode,
.insertFp = (SdbInsertFp)mndUserActionInsert,
.updateFp = (SdbUpdateFp)mndUserActionUpdate,
.deleteFp = (SdbDeleteFp)mndUserActionDelete};
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_USER, mndProcessCreateUserMsg);
return sdbSetTable(pMnode->pSdb, table);
}
void mndCleanupUser(SMnode *pMnode) {}
SUserObj *mndAcquireUser(SMnode *pMnode, const char *userName) {
SSdb *pSdb = pMnode->pSdb;
return sdbAcquire(pSdb, SDB_USER, &userName);
}
void mndReleaseUser(SMnode *pMnode, SUserObj *pUser) {
SSdb *pSdb = pMnode->pSdb;
sdbRelease(pSdb, pUser);
} }
static int32_t mndProcessDropUserMsg(SMnodeMsg *pMsg) {
terrno = TSDB_CODE_MND_MSG_NOT_PROCESSED;
mError("failed to process drop user msg since %s", terrstr());
return -1;
}
\ No newline at end of file
...@@ -206,6 +206,7 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { ...@@ -206,6 +206,7 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
pMnode->sver = pOption->sver; pMnode->sver = pOption->sver;
pMnode->statusInterval = pOption->statusInterval; pMnode->statusInterval = pOption->statusInterval;
pMnode->mnodeEqualVnodeNum = pOption->mnodeEqualVnodeNum; pMnode->mnodeEqualVnodeNum = pOption->mnodeEqualVnodeNum;
pMnode->shellActivityTimer = pOption->shellActivityTimer;
pMnode->timezone = strdup(pOption->timezone); pMnode->timezone = strdup(pOption->timezone);
pMnode->locale = strdup(pOption->locale); pMnode->locale = strdup(pOption->locale);
pMnode->charset = strdup(pOption->charset); pMnode->charset = strdup(pOption->charset);
...@@ -386,7 +387,7 @@ static void mndProcessRpcMsg(SMnodeMsg *pMsg) { ...@@ -386,7 +387,7 @@ static void mndProcessRpcMsg(SMnodeMsg *pMsg) {
goto PROCESS_RPC_END; goto PROCESS_RPC_END;
} }
code = (*fp)(pMnode, pMsg); code = (*fp)(pMsg);
if (code != 0) { if (code != 0) {
code = terrno; code = terrno;
mError("msg:%p, app:%p failed to process since %s", pMsg, ahandle, terrstr()); mError("msg:%p, app:%p failed to process since %s", pMsg, ahandle, terrstr());
......
...@@ -7,6 +7,7 @@ target_include_directories( ...@@ -7,6 +7,7 @@ target_include_directories(
) )
target_link_libraries( target_link_libraries(
meta meta
PUBLIC sqlite
PUBLIC common PUBLIC common
PUBLIC tkv PUBLIC tkv
) )
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#define _TD_META_DB_H_ #define _TD_META_DB_H_
#include "rocksdb/c.h" #include "rocksdb/c.h"
#include "sqlite3.h"
#include "meta.h" #include "meta.h"
...@@ -29,7 +30,7 @@ typedef struct { ...@@ -29,7 +30,7 @@ typedef struct {
rocksdb_t *nameDb; // name -> uid rocksdb_t *nameDb; // name -> uid
rocksdb_t *tagDb; // uid -> tag rocksdb_t *tagDb; // uid -> tag
rocksdb_t *schemaDb; // uid+version -> schema rocksdb_t *schemaDb; // uid+version -> schema
rocksdb_t *mapDb; // suid -> uid_list sqlite3 * mapDb; // suid -> uid_list
} meta_db_t; } meta_db_t;
int metaOpenDB(SMeta *pMeta); int metaOpenDB(SMeta *pMeta);
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
static void metaSaveSchemaDB(SMeta *pMeta, tb_uid_t uid, STSchema *pSchema); static void metaSaveSchemaDB(SMeta *pMeta, tb_uid_t uid, STSchema *pSchema);
static void metaGetSchemaDBKey(char key[], tb_uid_t uid, int sversion); static void metaGetSchemaDBKey(char key[], tb_uid_t uid, int sversion);
static int metaSaveMapDB(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid); // static int metaSaveMapDB(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid);
#define SCHEMA_KEY_LEN (sizeof(tb_uid_t) + sizeof(int)) #define SCHEMA_KEY_LEN (sizeof(tb_uid_t) + sizeof(int))
...@@ -65,8 +65,14 @@ int metaOpenDB(SMeta *pMeta) { ...@@ -65,8 +65,14 @@ int metaOpenDB(SMeta *pMeta) {
META_OPEN_DB_IMPL(pMeta->pDB->schemaDb, options, dir, err); META_OPEN_DB_IMPL(pMeta->pDB->schemaDb, options, dir, err);
// mapDb // mapDb
sprintf(dir, "%s/map_db", pMeta->path); sprintf(dir, "%s/meta.db", pMeta->path);
META_OPEN_DB_IMPL(pMeta->pDB->mapDb, options, dir, err); if (sqlite3_open(dir, &(pMeta->pDB->mapDb)) != SQLITE_OK) {
// TODO
}
// // set read uncommitted
sqlite3_exec(pMeta->pDB->mapDb, "PRAGMA read_uncommitted=true;", 0, 0, 0);
sqlite3_exec(pMeta->pDB->mapDb, "BEGIN;", 0, 0, 0);
rocksdb_options_destroy(options); rocksdb_options_destroy(options);
return 0; return 0;
...@@ -82,7 +88,12 @@ int metaOpenDB(SMeta *pMeta) { ...@@ -82,7 +88,12 @@ int metaOpenDB(SMeta *pMeta) {
void metaCloseDB(SMeta *pMeta) { void metaCloseDB(SMeta *pMeta) {
if (pMeta->pDB) { if (pMeta->pDB) {
META_CLOSE_DB_IMPL(pMeta->pDB->mapDb); if (pMeta->pDB->mapDb) {
sqlite3_exec(pMeta->pDB->mapDb, "COMMIT;", 0, 0, 0);
sqlite3_close(pMeta->pDB->mapDb);
pMeta->pDB->mapDb = NULL;
}
META_CLOSE_DB_IMPL(pMeta->pDB->schemaDb); META_CLOSE_DB_IMPL(pMeta->pDB->schemaDb);
META_CLOSE_DB_IMPL(pMeta->pDB->tagDb); META_CLOSE_DB_IMPL(pMeta->pDB->tagDb);
META_CLOSE_DB_IMPL(pMeta->pDB->nameDb); META_CLOSE_DB_IMPL(pMeta->pDB->nameDb);
...@@ -97,6 +108,7 @@ int metaSaveTableToDB(SMeta *pMeta, const STbCfg *pTbOptions) { ...@@ -97,6 +108,7 @@ int metaSaveTableToDB(SMeta *pMeta, const STbCfg *pTbOptions) {
char * err = NULL; char * err = NULL;
size_t size; size_t size;
char pBuf[1024]; // TODO char pBuf[1024]; // TODO
char sql[128];
rocksdb_writeoptions_t *wopt = rocksdb_writeoptions_create(); rocksdb_writeoptions_t *wopt = rocksdb_writeoptions_create();
...@@ -124,8 +136,12 @@ int metaSaveTableToDB(SMeta *pMeta, const STbCfg *pTbOptions) { ...@@ -124,8 +136,12 @@ int metaSaveTableToDB(SMeta *pMeta, const STbCfg *pTbOptions) {
// save schemaDB // save schemaDB
metaSaveSchemaDB(pMeta, uid, pTbOptions->stbCfg.pSchema); metaSaveSchemaDB(pMeta, uid, pTbOptions->stbCfg.pSchema);
// save mapDB (really need?) // // save mapDB (really need?)
rocksdb_put(pMeta->pDB->mapDb, wopt, (char *)(&uid), sizeof(uid), "", 0, &err); // rocksdb_put(pMeta->pDB->mapDb, wopt, (char *)(&uid), sizeof(uid), "", 0, &err);
sprintf(sql, "create table st_%" PRIu64 " (uid BIGINT);", uid);
if (sqlite3_exec(pMeta->pDB->mapDb, sql, NULL, NULL, &err) != SQLITE_OK) {
// fprintf(stderr, "Failed to create table, since %s\n", err);
}
break; break;
case META_CHILD_TABLE: case META_CHILD_TABLE:
// save tagDB // save tagDB
...@@ -133,7 +149,10 @@ int metaSaveTableToDB(SMeta *pMeta, const STbCfg *pTbOptions) { ...@@ -133,7 +149,10 @@ int metaSaveTableToDB(SMeta *pMeta, const STbCfg *pTbOptions) {
kvRowLen(pTbOptions->ctbCfg.pTag), &err); kvRowLen(pTbOptions->ctbCfg.pTag), &err);
// save mapDB // save mapDB
metaSaveMapDB(pMeta, pTbOptions->ctbCfg.suid, uid); sprintf(sql, "insert into st_%" PRIu64 " values (%" PRIu64 ");", pTbOptions->ctbCfg.suid, uid);
if (sqlite3_exec(pMeta->pDB->mapDb, sql, NULL, NULL, &err) != SQLITE_OK) {
fprintf(stderr, "failed to insert data, since %s\n", err);
}
break; break;
default: default:
ASSERT(0); ASSERT(0);
...@@ -172,32 +191,32 @@ static void metaGetSchemaDBKey(char *key, tb_uid_t uid, int sversion) { ...@@ -172,32 +191,32 @@ static void metaGetSchemaDBKey(char *key, tb_uid_t uid, int sversion) {
*(int *)POINTER_SHIFT(key, sizeof(tb_uid_t)) = sversion; *(int *)POINTER_SHIFT(key, sizeof(tb_uid_t)) = sversion;
} }
static int metaSaveMapDB(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid) { // static int metaSaveMapDB(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid) {
size_t vlen; // size_t vlen;
char * val; // char * val;
char * err = NULL; // char * err = NULL;
rocksdb_readoptions_t *ropt = rocksdb_readoptions_create(); // rocksdb_readoptions_t *ropt = rocksdb_readoptions_create();
val = rocksdb_get(pMeta->pDB->mapDb, ropt, (char *)(&suid), sizeof(suid), &vlen, &err); // val = rocksdb_get(pMeta->pDB->mapDb, ropt, (char *)(&suid), sizeof(suid), &vlen, &err);
rocksdb_readoptions_destroy(ropt); // rocksdb_readoptions_destroy(ropt);
void *nval = malloc(vlen + sizeof(uid)); // void *nval = malloc(vlen + sizeof(uid));
if (nval == NULL) { // if (nval == NULL) {
return -1; // return -1;
} // }
if (vlen) { // if (vlen) {
memcpy(nval, val, vlen); // memcpy(nval, val, vlen);
} // }
memcpy(POINTER_SHIFT(nval, vlen), (void *)(&uid), sizeof(uid)); // memcpy(POINTER_SHIFT(nval, vlen), (void *)(&uid), sizeof(uid));
rocksdb_writeoptions_t *wopt = rocksdb_writeoptions_create(); // rocksdb_writeoptions_t *wopt = rocksdb_writeoptions_create();
rocksdb_writeoptions_disable_WAL(wopt, 1); // rocksdb_writeoptions_disable_WAL(wopt, 1);
rocksdb_put(pMeta->pDB->mapDb, wopt, (char *)(&suid), sizeof(suid), nval, vlen + sizeof(uid), &err); // rocksdb_put(pMeta->pDB->mapDb, wopt, (char *)(&suid), sizeof(suid), nval, vlen + sizeof(uid), &err);
rocksdb_writeoptions_destroy(wopt); // rocksdb_writeoptions_destroy(wopt);
free(nval); // free(nval);
return 0; // return 0;
} // }
\ No newline at end of file \ No newline at end of file
...@@ -507,7 +507,6 @@ void rpcSendRedirectRsp(void *thandle, const SEpSet *pEpSet) { ...@@ -507,7 +507,6 @@ void rpcSendRedirectRsp(void *thandle, const SEpSet *pEpSet) {
} }
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
#if 0
SRpcConn *pConn = (SRpcConn *)thandle; SRpcConn *pConn = (SRpcConn *)thandle;
if (pConn->user[0] == 0) return -1; if (pConn->user[0] == 0) return -1;
...@@ -516,9 +515,6 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { ...@@ -516,9 +515,6 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
// pInfo->serverIp = pConn->destIp; // pInfo->serverIp = pConn->destIp;
tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user)); tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
#else
strcpy(pInfo->user, "root");
#endif
return 0; return 0;
} }
......
...@@ -410,3 +410,10 @@ char *taosIpStr(uint32_t ipInt) { ...@@ -410,3 +410,10 @@ char *taosIpStr(uint32_t ipInt) {
return ipStr; return ipStr;
} }
void taosIp2String(uint32_t ip, char *str) {
sprintf(str, "%u.%u.%u.%u", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, (uint8_t)(ip >> 24));
}
void taosIpPort2String(uint32_t ip, uint16_t port, char *str) {
sprintf(str, "%u.%u.%u.%u:%u", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, (uint8_t)(ip >> 24), port);
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册