提交 f2a36718 编写于 作者: H hzcheng

Merge branch '2.0' into feature/2.0tsdb

CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
SET(CMAKE_C_STANDARD 11)
SET(CMAKE_VERBOSE_MAKEFILE ON)
#
# If need to set debug options
# 1.Generate debug version:
# mkdir debug; cd debug;
# cmake -DCMAKE_BUILD_TYPE=Debug ..
# 2.Generate release version:
# mkdir release; cd release;
# cmake -DCMAKE_BUILD_TYPE=Release ..
#
#
# If it is a Windows operating system
# 1.Use command line tool of VS2013 or higher version
# mkdir build; cd build;
# cmake -G "NMake Makefiles" ..
# nmake install
# 2.Use the VS development interface tool
# mkdir build; cd build;
# cmake -A x64 ..
# open the file named TDengine.sln
#
SET(TD_CLUSTER FALSE)
SET(TD_ACCOUNT FALSE)
SET(TD_GRANT FALSE)
SET(TD_COVER FALSE)
SET(TD_PAGMODE_LITE FALSE)
SET(TD_GODLL FALSE)
IF (${DLLTYPE} MATCHES "go")
ADD_DEFINITIONS(-D_TD_GO_DLL_)
MESSAGE(STATUS "input dll type: " ${DLLTYPE})
SET(TD_GODLL TRUE)
ENDIF ()
IF (NOT DEFINED TD_CLUSTER)
MESSAGE(STATUS "Build the Lite Version")
SET(TD_CLUSTER FALSE)
SET(TD_EDGE TRUE)
SET(TD_COMMUNITY_DIR ${PROJECT_SOURCE_DIR})
MESSAGE(STATUS "Community directory: " ${TD_COMMUNITY_DIR})
# Set macro definitions according to os platform
SET(TD_LINUX_64 FALSE)
SET(TD_LINUX_32 FALSE)
SET(TD_ARM FALSE)
SET(TD_ARM_64 FALSE)
SET(TD_ARM_32 FALSE)
SET(TD_MIPS FALSE)
SET(TD_MIPS_64 FALSE)
SET(TD_MIPS_32 FALSE)
SET(TD_DARWIN_64 FALSE)
SET(TD_WINDOWS_64 FALSE)
SET(TD_PAGMODE_LITE FALSE)
IF (${PAGMODE} MATCHES "lite")
SET(TD_PAGMODE_LITE TRUE)
ENDIF ()
# if generate ARM version:
# cmake -DCPUTYPE=aarch32 .. or cmake -DCPUTYPE=aarch64
IF (${CPUTYPE} MATCHES "aarch32")
SET(TD_ARM TRUE)
SET(TD_ARM_32 TRUE)
SET(TD_PAGMODE_LITE TRUE)
ADD_DEFINITIONS(-D_TD_ARM_)
ADD_DEFINITIONS(-D_TD_ARM_32_)
ELSEIF (${CPUTYPE} MATCHES "aarch64")
SET(TD_ARM TRUE)
SET(TD_ARM_64 TRUE)
ADD_DEFINITIONS(-D_TD_ARM_)
ADD_DEFINITIONS(-D_TD_ARM_64_)
ELSEIF (${CPUTYPE} MATCHES "mips64")
SET(TD_MIPS TRUE)
SET(TD_MIPS_64 TRUE)
ADD_DEFINITIONS(-D_TD_MIPS_)
ADD_DEFINITIONS(-D_TD_MIPS_64_)
ELSEIF (${CPUTYPE} MATCHES "x64")
MESSAGE(STATUS "input cpuType: " ${CPUTYPE})
ELSEIF (${CPUTYPE} MATCHES "x86")
MESSAGE(STATUS "input cpuType: " ${CPUTYPE})
ELSE ()
MESSAGE(STATUS "input cpuType: " ${CPUTYPE})
ENDIF ()
#
# Get OS information and store in variable TD_OS_INFO.
#
execute_process(COMMAND chmod 777 ${TD_COMMUNITY_DIR}/packaging/tools/get_os.sh)
execute_process(COMMAND ${TD_COMMUNITY_DIR}/packaging/tools/get_os.sh "" OUTPUT_VARIABLE TD_OS_INFO)
MESSAGE(STATUS "The current os is " ${TD_OS_INFO})
IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux")
IF (${CMAKE_SIZEOF_VOID_P} MATCHES 8)
SET(TD_LINUX_64 TRUE)
SET(TD_OS_DIR ${TD_COMMUNITY_DIR}/src/os/linux)
ADD_DEFINITIONS(-D_M_X64)
MESSAGE(STATUS "The current platform is Linux 64-bit")
ELSEIF (${CMAKE_SIZEOF_VOID_P} MATCHES 4)
IF (TD_ARM)
SET(TD_LINUX_32 TRUE)
SET(TD_OS_DIR ${TD_COMMUNITY_DIR}/src/os/linux)
#ADD_DEFINITIONS(-D_M_IX86)
MESSAGE(STATUS "The current platform is Linux 32-bit")
ELSE ()
MESSAGE(FATAL_ERROR "The current platform is Linux 32-bit, but no ARM not supported yet")
EXIT ()
ENDIF ()
ELSE ()
MESSAGE(FATAL_ERROR "The current platform is Linux neither 32-bit nor 64-bit, not supported yet")
EXIT ()
ENDIF ()
ELSEIF (${CMAKE_SYSTEM_NAME} MATCHES "Darwin")
IF (${CMAKE_SIZEOF_VOID_P} MATCHES 8)
SET(TD_DARWIN_64 TRUE)
SET(TD_OS_DIR ${TD_COMMUNITY_DIR}/src/os/darwin)
MESSAGE(STATUS "The current platform is Darwin 64-bit")
ELSE ()
MESSAGE(FATAL_ERROR "The current platform is Darwin 32-bit, not supported yet")
EXIT ()
ENDIF ()
ELSEIF (${CMAKE_SYSTEM_NAME} MATCHES "Windows")
IF (${CMAKE_SIZEOF_VOID_P} MATCHES 8)
SET(TD_WINDOWS_64 TRUE)
SET(TD_OS_DIR ${TD_COMMUNITY_DIR}/src/os/windows)
ADD_DEFINITIONS(-D_M_X64)
MESSAGE(STATUS "The current platform is Windows 64-bit")
ELSE ()
MESSAGE(FATAL_ERROR "The current platform is Windows 32-bit, not supported yet")
EXIT ()
ENDIF ()
ELSE()
MESSAGE(FATAL_ERROR "The current platform is not Linux/Darwin/Windows, stop compile")
EXIT ()
ENDIF ()
FIND_PROGRAM(TD_MVN_INSTALLED mvn)
IF (TD_MVN_INSTALLED)
MESSAGE(STATUS "MVN is installed and JDBC will be compiled")
ELSE ()
MESSAGE(STATUS "MVN is not installed and JDBC is not compiled")
ENDIF ()
#
# debug flag
#
# ADD_DEFINITIONS(-D_CHECK_HEADER_FILE_)
IF (${MEM_CHECK} MATCHES "true")
ADD_DEFINITIONS(-DTAOS_MEM_CHECK)
ENDIF ()
IF (TD_CLUSTER)
ADD_DEFINITIONS(-DCLUSTER)
ADD_DEFINITIONS(-DTSDB_REPLICA_MAX_NUM=3)
ELSE ()
ADD_DEFINITIONS(-DLITE)
ADD_DEFINITIONS(-DTSDB_REPLICA_MAX_NUM=1)
ENDIF ()
IF (TD_LINUX_64)
SET(DEBUG_FLAGS "-O0 -DDEBUG")
SET(RELEASE_FLAGS "-O0")
IF (NOT TD_ARM)
IF (${CMAKE_CXX_COMPILER_ID} MATCHES "Clang")
SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -malign-double -g -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ELSE ()
SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -malign-double -g -malign-stringops -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ENDIF ()
ELSE ()
SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -g -fsigned-char -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ENDIF ()
ADD_DEFINITIONS(-DLINUX)
ADD_DEFINITIONS(-D_REENTRANT -D__USE_POSIX -D_LIBC_REENTRANT)
IF (${TD_OS_INFO} MATCHES "Alpine")
MESSAGE(STATUS "The current OS is Alpine, append extra flags")
SET(COMMON_FLAGS "${COMMON_FLAGS} -largp")
link_libraries(/usr/lib/libargp.a)
ADD_DEFINITIONS(-D_ALPINE)
ENDIF ()
ELSEIF (TD_LINUX_32)
IF (NOT TD_ARM)
EXIT ()
ENDIF ()
SET(DEBUG_FLAGS "-O0 -DDEBUG")
SET(RELEASE_FLAGS "-O0")
SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -g -fsigned-char -munaligned-access -fpack-struct=8 -latomic -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ADD_DEFINITIONS(-DLINUX)
ADD_DEFINITIONS(-D_REENTRANT -D__USE_POSIX -D_LIBC_REENTRANT)
ADD_DEFINITIONS(-DUSE_LIBICONV)
IF (${TD_OS_INFO} MATCHES "Alpine")
MESSAGE(STATUS "The current OS is Alpine, add extra flags")
SET(COMMON_FLAGS "${COMMON_FLAGS} -largp")
link_library(/usr/lib/libargp.a)
ADD_DEFINITIONS(-D_ALPINE)
ENDIF ()
ELSEIF (TD_WINDOWS_64)
SET(CMAKE_GENERATOR "NMake Makefiles" CACHE INTERNAL "" FORCE)
IF (NOT TD_GODLL)
SET(COMMON_FLAGS "/nologo /WX- /Oi /Oy- /Gm- /EHsc /MT /GS /Gy /fp:precise /Zc:wchar_t /Zc:forScope /Gd /errorReport:prompt /analyze-")
SET(DEBUG_FLAGS "/Zi /W3 /GL")
SET(RELEASE_FLAGS "/W0 /GL")
ENDIF ()
ADD_DEFINITIONS(-DWINDOWS)
ADD_DEFINITIONS(-D__CLEANUP_C)
ADD_DEFINITIONS(-DPTW32_STATIC_LIB)
ADD_DEFINITIONS(-DPTW32_BUILD)
ADD_DEFINITIONS(-D_MBCS -D_CRT_SECURE_NO_DEPRECATE -D_CRT_NONSTDC_NO_DEPRECATE)
ELSEIF (TD_DARWIN_64)
SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -malign-double -g -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
SET(DEBUG_FLAGS "-O0 -DDEBUG")
SET(RELEASE_FLAGS "-O0")
ADD_DEFINITIONS(-DDARWIN)
ADD_DEFINITIONS(-D_REENTRANT -D__USE_POSIX -D_LIBC_REENTRANT)
ELSE ()
MESSAGE(FATAL_ERROR "The current platform is not support yet, stop compile")
EXIT ()
ENDIF ()
# Set compiler options
SET(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} ${COMMON_FLAGS} ${DEBUG_FLAGS}")
SET(CMAKE_C_FLAGS_RELEASE "${CMAKE_C_FLAGS_RELEASE} ${COMMON_FLAGS} ${RELEASE_FLAGS}")
# Set c++ compiler options
# SET(COMMON_CXX_FLAGS "${COMMON_FLAGS} -std=c++11")
# SET(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} ${COMMON_CXX_FLAGS} ${DEBUG_FLAGS}")
# SET(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} ${COMMON_CXX_FLAGS} ${RELEASE_FLAGS}")
IF (${CMAKE_BUILD_TYPE} MATCHES "Debug")
MESSAGE(STATUS "Build Debug Version")
ELSEIF (${CMAKE_BUILD_TYPE} MATCHES "Release")
MESSAGE(STATUS "Build Release Version")
ELSE ()
IF (TD_WINDOWS_64)
SET(CMAKE_BUILD_TYPE "Release")
MESSAGE(STATUS "Build Release Version in Windows as default")
ELSE ()
SET(CMAKE_BUILD_TYPE "Debug")
MESSAGE(STATUS "Build Debug Version as default")
ENDIF()
ENDIF ()
#set output directory
SET(LIBRARY_OUTPUT_PATH ${PROJECT_BINARY_DIR}/build/lib)
SET(EXECUTABLE_OUTPUT_PATH ${PROJECT_BINARY_DIR}/build/bin)
SET(TD_TESTS_OUTPUT_DIR ${PROJECT_BINARY_DIR}/test)
SET(TD_MEM_CHECK FALSE)
MESSAGE(STATUS "Operating system dependency directory: " ${TD_OS_DIR})
MESSAGE(STATUS "Project source directory: " ${PROJECT_SOURCE_DIR})
MESSAGE(STATUS "Project binary files output path: " ${PROJECT_BINARY_DIR})
MESSAGE(STATUS "Project executable files output path: " ${EXECUTABLE_OUTPUT_PATH})
MESSAGE(STATUS "Project library files output path: " ${LIBRARY_OUTPUT_PATH})
SET(TD_COMMUNITY_DIR ${PROJECT_SOURCE_DIR})
MESSAGE(STATUS "Community directory: " ${TD_COMMUNITY_DIR})
IF (TD_LINUX_64)
SET(TD_MAKE_INSTALL_SH "${TD_COMMUNITY_DIR}/packaging/tools/make_install.sh")
INSTALL(CODE "MESSAGE(\"make install script: ${TD_MAKE_INSTALL_SH}\")")
INSTALL(CODE "execute_process(COMMAND chmod 777 ${TD_MAKE_INSTALL_SH})")
INSTALL(CODE "execute_process(COMMAND ${TD_MAKE_INSTALL_SH} ${TD_COMMUNITY_DIR} ${PROJECT_BINARY_DIR})")
ELSEIF (TD_LINUX_32)
IF (NOT TD_ARM)
EXIT ()
ENDIF ()
SET(TD_MAKE_INSTALL_SH "${TD_COMMUNITY_DIR}/packaging/tools/make_install.sh")
INSTALL(CODE "MESSAGE(\"make install script: ${TD_MAKE_INSTALL_SH}\")")
INSTALL(CODE "execute_process(COMMAND chmod 777 ${TD_MAKE_INSTALL_SH})")
INSTALL(CODE "execute_process(COMMAND ${TD_MAKE_INSTALL_SH} ${TD_COMMUNITY_DIR} ${PROJECT_BINARY_DIR})")
ELSEIF (TD_WINDOWS_64)
SET(CMAKE_INSTALL_PREFIX C:/TDengine)
IF (NOT TD_GODLL)
INSTALL(DIRECTORY ${TD_COMMUNITY_DIR}/src/connector/go DESTINATION connector)
INSTALL(DIRECTORY ${TD_COMMUNITY_DIR}/src/connector/grafana DESTINATION connector)
INSTALL(DIRECTORY ${TD_COMMUNITY_DIR}/src/connector/python DESTINATION connector)
INSTALL(DIRECTORY ${TD_COMMUNITY_DIR}/tests/examples DESTINATION .)
INSTALL(DIRECTORY ${TD_COMMUNITY_DIR}/packaging/cfg DESTINATION .)
INSTALL(FILES ${TD_COMMUNITY_DIR}/src/inc/taos.h DESTINATION include)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos.lib DESTINATION driver)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos.exp DESTINATION driver)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos.dll DESTINATION driver)
INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/taos.exe DESTINATION .)
#INSTALL(TARGETS taos RUNTIME DESTINATION driver)
#INSTALL(TARGETS shell RUNTIME DESTINATION .)
IF (TD_MVN_INSTALLED)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos-jdbcdriver-1.0.2-dist.jar DESTINATION connector/jdbc)
ENDIF ()
ELSE ()
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/libtaos.dll DESTINATION driver)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/libtaos.dll.a DESTINATION driver)
ENDIF ()
ELSEIF (TD_DARWIN_64)
SET(TD_MAKE_INSTALL_SH "${TD_COMMUNITY_DIR}/packaging/tools/make_install.sh")
INSTALL(CODE "MESSAGE(\"make install script: ${TD_MAKE_INSTALL_SH}\")")
INSTALL(CODE "execute_process(COMMAND chmod 777 ${TD_MAKE_INSTALL_SH})")
INSTALL(CODE "execute_process(COMMAND ${TD_MAKE_INSTALL_SH} ${TD_COMMUNITY_DIR} ${PROJECT_BINARY_DIR} Darwin)")
ENDIF ()
ENDIF ()
INCLUDE(cmake/input.inc)
INCLUDE(cmake/platform.inc)
INCLUDE(cmake/env.inc)
INCLUDE(cmake/define.inc)
INCLUDE(cmake/install.inc)
ADD_SUBDIRECTORY(deps)
ADD_SUBDIRECTORY(src)
......
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
IF (TD_CLUSTER)
ADD_DEFINITIONS(-D_CLUSTER)
ADD_DEFINITIONS(-DTSDB_REPLICA_MAX_NUM=3)
ELSE ()
ADD_DEFINITIONS(-DLITE)
ADD_DEFINITIONS(-DTSDB_REPLICA_MAX_NUM=1)
ENDIF ()
IF (TD_ACCOUNT)
ADD_DEFINITIONS(-D_ACCOUNT)
ENDIF ()
IF (TD_GRANT)
ADD_DEFINITIONS(-D_GRANT)
ENDIF ()
IF (TD_GODLL)
ADD_DEFINITIONS(-D_TD_GO_DLL_)
ENDIF ()
IF (TD_MEM_CHECK)
ADD_DEFINITIONS(-DTAOS_MEM_CHECK)
ENDIF ()
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
SET(CMAKE_C_STANDARD 11)
SET(CMAKE_VERBOSE_MAKEFILE ON)
#set output directory
SET(LIBRARY_OUTPUT_PATH ${PROJECT_BINARY_DIR}/build/lib)
SET(EXECUTABLE_OUTPUT_PATH ${PROJECT_BINARY_DIR}/build/bin)
SET(TD_TESTS_OUTPUT_DIR ${PROJECT_BINARY_DIR}/test)
MESSAGE(STATUS "Operating system dependency directory: " ${TD_OS_DIR})
MESSAGE(STATUS "Project source directory: " ${PROJECT_SOURCE_DIR})
MESSAGE(STATUS "Project binary files output path: " ${PROJECT_BINARY_DIR})
MESSAGE(STATUS "Project executable files output path: " ${EXECUTABLE_OUTPUT_PATH})
MESSAGE(STATUS "Project library files output path: " ${LIBRARY_OUTPUT_PATH})
FIND_PROGRAM(TD_MVN_INSTALLED mvn)
IF (TD_MVN_INSTALLED)
MESSAGE(STATUS "MVN is installed and JDBC will be compiled")
ELSE ()
MESSAGE(STATUS "MVN is not installed and JDBC is not compiled")
ENDIF ()
#
# If need to set debug options
# 1.Generate debug version:
# mkdir debug; cd debug;
# cmake -DCMAKE_BUILD_TYPE=Debug ..
# 2.Generate release version:
# mkdir release; cd release;
# cmake -DCMAKE_BUILD_TYPE=Release ..
#
# Set compiler options
SET(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} ${COMMON_FLAGS} ${DEBUG_FLAGS}")
SET(CMAKE_C_FLAGS_RELEASE "${CMAKE_C_FLAGS_RELEASE} ${COMMON_FLAGS} ${RELEASE_FLAGS}")
# Set c++ compiler options
# SET(COMMON_CXX_FLAGS "${COMMON_FLAGS} -std=c++11")
# SET(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} ${COMMON_CXX_FLAGS} ${DEBUG_FLAGS}")
# SET(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} ${COMMON_CXX_FLAGS} ${RELEASE_FLAGS}")
IF (${CMAKE_BUILD_TYPE} MATCHES "Debug")
MESSAGE(STATUS "Build Debug Version")
ELSEIF (${CMAKE_BUILD_TYPE} MATCHES "Release")
MESSAGE(STATUS "Build Release Version")
ELSE ()
IF (TD_WINDOWS_64)
SET(CMAKE_BUILD_TYPE "Release")
MESSAGE(STATUS "Build Release Version in Windows as default")
ELSE ()
SET(CMAKE_BUILD_TYPE "Debug")
MESSAGE(STATUS "Build Debug Version as default")
ENDIF()
ENDIF ()
\ No newline at end of file
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
IF (${CLUSTER} MATCHES "true")
SET(TD_CLUSTER TRUE)
MESSAGE(STATUS "Build with cluster plugins")
ELSEIF (${CLUSTER} MATCHES "false")
SET(TD_CLUSTER FALSE)
MESSAGE(STATUS "Build without cluster plugins")
ENDIF ()
IF (${ACCOUNT} MATCHES "true")
SET(TD_ACCOUNT TRUE)
MESSAGE(STATUS "Build with account plugins")
ELSEIF (${ACCOUNT} MATCHES "false")
SET(TD_ACCOUNT FALSE)
MESSAGE(STATUS "Build without account plugins")
ENDIF ()
IF (${COVER} MATCHES "true")
SET(TD_COVER TRUE)
MESSAGE(STATUS "Build with test coverage")
ELSEIF (${COVER} MATCHES "false")
SET(TD_COVER FALSE)
MESSAGE(STATUS "Build without test coverage")
ENDIF ()
IF (${PAGMODE} MATCHES "lite")
SET(TD_PAGMODE_LITE TRUE)
MESSAGE(STATUS "Build with pagmode lite")
ENDIF ()
IF (${DLLTYPE} MATCHES "go")
SET(TD_GODLL TRUE)
MESSAGE(STATUS "input dll type: " ${DLLTYPE})
ENDIF ()
IF (${MEM_CHECK} MATCHES "true")
SET(TD_MEM_CHECK TRUE)
MESSAGE(STATUS "build with memory check")
ENDIF ()
\ No newline at end of file
IF (TD_LINUX_64)
SET(TD_MAKE_INSTALL_SH "${TD_COMMUNITY_DIR}/packaging/tools/make_install.sh")
INSTALL(CODE "MESSAGE(\"make install script: ${TD_MAKE_INSTALL_SH}\")")
INSTALL(CODE "execute_process(COMMAND chmod 777 ${TD_MAKE_INSTALL_SH})")
INSTALL(CODE "execute_process(COMMAND ${TD_MAKE_INSTALL_SH} ${TD_COMMUNITY_DIR} ${PROJECT_BINARY_DIR})")
ELSEIF (TD_LINUX_32)
IF (NOT TD_ARM)
EXIT ()
ENDIF ()
SET(TD_MAKE_INSTALL_SH "${TD_COMMUNITY_DIR}/packaging/tools/make_install.sh")
INSTALL(CODE "MESSAGE(\"make install script: ${TD_MAKE_INSTALL_SH}\")")
INSTALL(CODE "execute_process(COMMAND chmod 777 ${TD_MAKE_INSTALL_SH})")
INSTALL(CODE "execute_process(COMMAND ${TD_MAKE_INSTALL_SH} ${TD_COMMUNITY_DIR} ${PROJECT_BINARY_DIR})")
ELSEIF (TD_WINDOWS_64)
SET(CMAKE_INSTALL_PREFIX C:/TDengine)
IF (NOT TD_GODLL)
INSTALL(DIRECTORY ${TD_COMMUNITY_DIR}/src/connector/go DESTINATION connector)
INSTALL(DIRECTORY ${TD_COMMUNITY_DIR}/src/connector/grafana DESTINATION connector)
INSTALL(DIRECTORY ${TD_COMMUNITY_DIR}/src/connector/python DESTINATION connector)
INSTALL(DIRECTORY ${TD_COMMUNITY_DIR}/tests/examples DESTINATION .)
INSTALL(DIRECTORY ${TD_COMMUNITY_DIR}/packaging/cfg DESTINATION .)
INSTALL(FILES ${TD_COMMUNITY_DIR}/src/inc/taos.h DESTINATION include)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos.lib DESTINATION driver)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos.exp DESTINATION driver)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos.dll DESTINATION driver)
INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/taos.exe DESTINATION .)
#INSTALL(TARGETS taos RUNTIME DESTINATION driver)
#INSTALL(TARGETS shell RUNTIME DESTINATION .)
IF (TD_MVN_INSTALLED)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos-jdbcdriver-1.0.2-dist.jar DESTINATION connector/jdbc)
ENDIF ()
ELSE ()
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/libtaos.dll DESTINATION driver)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/libtaos.dll.a DESTINATION driver)
ENDIF ()
ELSEIF (TD_DARWIN_64)
SET(TD_MAKE_INSTALL_SH "${TD_COMMUNITY_DIR}/packaging/tools/make_install.sh")
INSTALL(CODE "MESSAGE(\"make install script: ${TD_MAKE_INSTALL_SH}\")")
INSTALL(CODE "execute_process(COMMAND chmod 777 ${TD_MAKE_INSTALL_SH})")
INSTALL(CODE "execute_process(COMMAND ${TD_MAKE_INSTALL_SH} ${TD_COMMUNITY_DIR} ${PROJECT_BINARY_DIR} Darwin)")
ENDIF ()
\ No newline at end of file
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
#
# If it is a Windows operating system
# 1.Use command line tool of VS2013 or higher version
# mkdir build; cd build;
# cmake -G "NMake Makefiles" ..
# nmake install
# 2.Use the VS development interface tool
# mkdir build; cd build;
# cmake -A x64 ..
# open the file named TDengine.sln
#
# Set macro definitions according to os platform
SET(TD_LINUX_64 FALSE)
SET(TD_LINUX_32 FALSE)
SET(TD_ARM FALSE)
SET(TD_ARM_64 FALSE)
SET(TD_ARM_32 FALSE)
SET(TD_MIPS FALSE)
SET(TD_MIPS_64 FALSE)
SET(TD_MIPS_32 FALSE)
SET(TD_DARWIN_64 FALSE)
SET(TD_WINDOWS_64 FALSE)
# if generate ARM version:
# cmake -DCPUTYPE=aarch32 .. or cmake -DCPUTYPE=aarch64
IF (${CPUTYPE} MATCHES "aarch32")
SET(TD_ARM TRUE)
SET(TD_ARM_32 TRUE)
SET(TD_PAGMODE_LITE TRUE)
ADD_DEFINITIONS(-D_TD_ARM_)
ADD_DEFINITIONS(-D_TD_ARM_32_)
ELSEIF (${CPUTYPE} MATCHES "aarch64")
SET(TD_ARM TRUE)
SET(TD_ARM_64 TRUE)
ADD_DEFINITIONS(-D_TD_ARM_)
ADD_DEFINITIONS(-D_TD_ARM_64_)
ELSEIF (${CPUTYPE} MATCHES "mips64")
SET(TD_MIPS TRUE)
SET(TD_MIPS_64 TRUE)
ADD_DEFINITIONS(-D_TD_MIPS_)
ADD_DEFINITIONS(-D_TD_MIPS_64_)
ELSEIF (${CPUTYPE} MATCHES "x64")
MESSAGE(STATUS "input cpuType: " ${CPUTYPE})
ELSEIF (${CPUTYPE} MATCHES "x86")
MESSAGE(STATUS "input cpuType: " ${CPUTYPE})
ELSE ()
MESSAGE(STATUS "input cpuType: " ${CPUTYPE})
ENDIF ()
#
# Get OS information and store in variable TD_OS_INFO.
#
execute_process(COMMAND chmod 777 ${TD_COMMUNITY_DIR}/packaging/tools/get_os.sh)
execute_process(COMMAND ${TD_COMMUNITY_DIR}/packaging/tools/get_os.sh "" OUTPUT_VARIABLE TD_OS_INFO)
MESSAGE(STATUS "The current os is " ${TD_OS_INFO})
IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux")
IF (${CMAKE_SIZEOF_VOID_P} MATCHES 8)
SET(TD_LINUX_64 TRUE)
SET(TD_OS_DIR ${TD_COMMUNITY_DIR}/src/os/linux)
ADD_DEFINITIONS(-D_M_X64)
MESSAGE(STATUS "The current platform is Linux 64-bit")
ELSEIF (${CMAKE_SIZEOF_VOID_P} MATCHES 4)
IF (TD_ARM)
SET(TD_LINUX_32 TRUE)
SET(TD_OS_DIR ${TD_COMMUNITY_DIR}/src/os/linux)
#ADD_DEFINITIONS(-D_M_IX86)
MESSAGE(STATUS "The current platform is Linux 32-bit")
ELSE ()
MESSAGE(FATAL_ERROR "The current platform is Linux 32-bit, but no ARM not supported yet")
EXIT ()
ENDIF ()
ELSE ()
MESSAGE(FATAL_ERROR "The current platform is Linux neither 32-bit nor 64-bit, not supported yet")
EXIT ()
ENDIF ()
ELSEIF (${CMAKE_SYSTEM_NAME} MATCHES "Darwin")
IF (${CMAKE_SIZEOF_VOID_P} MATCHES 8)
SET(TD_DARWIN_64 TRUE)
SET(TD_OS_DIR ${TD_COMMUNITY_DIR}/src/os/darwin)
MESSAGE(STATUS "The current platform is Darwin 64-bit")
ELSE ()
MESSAGE(FATAL_ERROR "The current platform is Darwin 32-bit, not supported yet")
EXIT ()
ENDIF ()
ELSEIF (${CMAKE_SYSTEM_NAME} MATCHES "Windows")
IF (${CMAKE_SIZEOF_VOID_P} MATCHES 8)
SET(TD_WINDOWS_64 TRUE)
SET(TD_OS_DIR ${TD_COMMUNITY_DIR}/src/os/windows)
ADD_DEFINITIONS(-D_M_X64)
MESSAGE(STATUS "The current platform is Windows 64-bit")
ELSE ()
MESSAGE(FATAL_ERROR "The current platform is Windows 32-bit, not supported yet")
EXIT ()
ENDIF ()
ELSE()
MESSAGE(FATAL_ERROR "The current platform is not Linux/Darwin/Windows, stop compile")
EXIT ()
ENDIF ()
IF (TD_LINUX_64)
SET(DEBUG_FLAGS "-O0 -DDEBUG")
SET(RELEASE_FLAGS "-O0")
IF (NOT TD_ARM)
IF (${CMAKE_CXX_COMPILER_ID} MATCHES "Clang")
SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -malign-double -g -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ELSE ()
SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -malign-double -g -malign-stringops -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ENDIF ()
ELSE ()
SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -g -fsigned-char -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ENDIF ()
ADD_DEFINITIONS(-DLINUX)
ADD_DEFINITIONS(-D_REENTRANT -D__USE_POSIX -D_LIBC_REENTRANT)
IF (${TD_OS_INFO} MATCHES "Alpine")
MESSAGE(STATUS "The current OS is Alpine, append extra flags")
SET(COMMON_FLAGS "${COMMON_FLAGS} -largp")
link_libraries(/usr/lib/libargp.a)
ADD_DEFINITIONS(-D_ALPINE)
ENDIF ()
ELSEIF (TD_LINUX_32)
IF (NOT TD_ARM)
EXIT ()
ENDIF ()
SET(DEBUG_FLAGS "-O0 -DDEBUG")
SET(RELEASE_FLAGS "-O0")
SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -g -fsigned-char -munaligned-access -fpack-struct=8 -latomic -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ADD_DEFINITIONS(-DLINUX)
ADD_DEFINITIONS(-D_REENTRANT -D__USE_POSIX -D_LIBC_REENTRANT)
ADD_DEFINITIONS(-DUSE_LIBICONV)
IF (${TD_OS_INFO} MATCHES "Alpine")
MESSAGE(STATUS "The current OS is Alpine, add extra flags")
SET(COMMON_FLAGS "${COMMON_FLAGS} -largp")
link_library(/usr/lib/libargp.a)
ADD_DEFINITIONS(-D_ALPINE)
ENDIF ()
ELSEIF (TD_WINDOWS_64)
SET(CMAKE_GENERATOR "NMake Makefiles" CACHE INTERNAL "" FORCE)
IF (NOT TD_GODLL)
SET(COMMON_FLAGS "/nologo /WX- /Oi /Oy- /Gm- /EHsc /MT /GS /Gy /fp:precise /Zc:wchar_t /Zc:forScope /Gd /errorReport:prompt /analyze-")
SET(DEBUG_FLAGS "/Zi /W3 /GL")
SET(RELEASE_FLAGS "/W0 /GL")
ENDIF ()
ADD_DEFINITIONS(-DWINDOWS)
ADD_DEFINITIONS(-D__CLEANUP_C)
ADD_DEFINITIONS(-DPTW32_STATIC_LIB)
ADD_DEFINITIONS(-DPTW32_BUILD)
ADD_DEFINITIONS(-D_MBCS -D_CRT_SECURE_NO_DEPRECATE -D_CRT_NONSTDC_NO_DEPRECATE)
ELSEIF (TD_DARWIN_64)
SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -malign-double -g -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
SET(DEBUG_FLAGS "-O0 -DDEBUG")
SET(RELEASE_FLAGS "-O0")
ADD_DEFINITIONS(-DDARWIN)
ADD_DEFINITIONS(-D_REENTRANT -D__USE_POSIX -D_LIBC_REENTRANT)
ELSE ()
MESSAGE(FATAL_ERROR "The current platform is not support yet, stop compile")
EXIT ()
ENDIF ()
\ No newline at end of file
......@@ -15,9 +15,16 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
ADD_EXECUTABLE(taosd ${SRC})
TARGET_LINK_LIBRARIES(taosd mnode taos_static monitor http tsdb)
#IF (TD_CLUSTER)
# TARGET_LINK_LIBRARIES(taosd dcluster)
#ENDIF ()
IF (TD_ACCOUNT)
TARGET_LINK_LIBRARIES(taosd account)
ENDIF ()
IF (TD_GRANT)
TARGET_LINK_LIBRARIES(taosd grant)
ENDIF ()
IF (TD_CLUSTER)
TARGET_LINK_LIBRARIES(taosd cluster)
ENDIF ()
SET(PREPARE_ENV_CMD "prepare_env_cmd")
SET(PREPARE_ENV_TARGET "prepare_env_target")
......
......@@ -28,20 +28,9 @@
#include "dnodeRead.h"
#include "dnodeShell.h"
#include "dnodeWrite.h"
#ifdef CLUSTER
#include "account.h"
#include "admin.h"
#include "balance.h"
#include "cluster.h"
#include "grant.h"
#include "mpeer.h"
#include "storage.h"
#include "vpeer.h"
#endif
static int32_t dnodeInitSystem();
static int32_t dnodeInitStorage();
static void dnodeInitPlugins();
static void dnodeCleanupStorage();
static void dnodeCleanUpSystem();
static void dnodeSetRunStatus(SDnodeRunStatus status);
......@@ -51,8 +40,6 @@ static SDnodeRunStatus tsDnodeRunStatus = TSDB_DNODE_RUN_STATUS_STOPPED;
void (*dnodeParseParameterKFp)() = NULL;
int32_t main(int32_t argc, char *argv[]) {
dnodeInitPlugins();
// Set global configuration file
for (int32_t i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-c") == 0) {
......@@ -244,15 +231,3 @@ static int32_t dnodeInitStorage() {
}
static void dnodeCleanupStorage() {}
static void dnodeInitPlugins() {
#ifdef CLUSTER
// acctInit();
// adminInit();
// balanceInit();
// clusterInit();
// grantInit();
// mpeerInit();
// storageInit();
#endif
}
......@@ -38,6 +38,10 @@ extern "C" {
#include "ttimer.h"
#include "tutil.h"
struct _vg_obj;
struct _db_obj;
struct _acctObj;
typedef struct {
int32_t mnodeId;
uint32_t privateIp;
......@@ -103,8 +107,6 @@ typedef struct {
int8_t dirty;
} STableInfo;
struct _vg_obj;
typedef struct SSuperTableObj {
STableInfo info;
uint64_t uid;
......@@ -137,8 +139,6 @@ typedef struct {
SSuperTableObj *superTable;
} SChildTableObj;
struct _db_obj;
typedef struct _vg_obj {
uint32_t vgId;
char dbName[TSDB_DB_NAME_LEN + 1];
......@@ -170,10 +170,9 @@ typedef struct _db_obj {
int32_t numOfSuperTables;
SVgObj *pHead;
SVgObj *pTail;
struct _acctObj *pAcct;
} SDbObj;
struct _acctObj;
typedef struct _user_obj {
char user[TSDB_USER_LEN + 1];
char pass[TSDB_KEY_LEN + 1];
......@@ -213,7 +212,8 @@ typedef struct _acctObj {
SAcctCfg cfg;
int32_t acctId;
int64_t createdTime;
int8_t reserved[15];
int8_t dirty;
int8_t reserved[14];
int8_t updateEnd[1];
SAcctInfo acctInfo;
SDbObj * pHead;
......
......@@ -14,10 +14,6 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
ADD_LIBRARY(mnode ${SRC})
TARGET_LINK_LIBRARIES(mnode trpc tutil pthread)
IF (TD_CLUSTER)
TARGET_LINK_LIBRARIES(mnode)
ENDIF ()
ENDIF ()
......@@ -21,18 +21,21 @@ extern "C" {
#endif
#include "mnode.h"
int32_t mgmtInitAccts();
void mgmtCleanUpAccts();
SAcctObj *mgmtGetAcct(char *acctName);
typedef enum {
TSDB_ACCT_USER,
TSDB_ACCT_DB,
TSDB_ACCT_TABLE
} EAcctGrantType;
int32_t mgmtCheckUserLimit(SAcctObj *pAcct);
int32_t mgmtCheckDbLimit(SAcctObj *pAcct);
int32_t mgmtCheckTableLimit(SAcctObj *pAcct, int32_t numOfTimeSeries);
int32_t acctInit();
void acctCleanUp();
SAcctObj *acctGetAcct(char *acctName);
int32_t acctCheck(SAcctObj *pAcct, EAcctGrantType type);
int32_t mgmtAddDbIntoAcct(SAcctObj *pAcct, SDbObj *pDb);
int32_t mgmtRemoveDbFromAcct(SAcctObj *pAcct, SDbObj *pDb);
int32_t mgmtAddUserIntoAcct(SAcctObj *pAcct, SUserObj *pUser);
int32_t mgmtRemoveUserFromAcct(SAcctObj *pAcct, SUserObj *pUser);
int32_t acctAddDb(SAcctObj *pAcct, SDbObj *pDb);
int32_t acctRemoveDb(SAcctObj *pAcct, SDbObj *pDb);
int32_t acctAddUser(SAcctObj *pAcct, SUserObj *pUser);
int32_t acctRemoveUser(SAcctObj *pAcct, SUserObj *pUser);
#ifdef __cplusplus
}
......
......@@ -28,6 +28,7 @@ void mgmtCleanUpDbs();
SDbObj *mgmtGetDb(char *db);
SDbObj *mgmtGetDbByTableId(char *db);
bool mgmtCheckIsMonitorDB(char *db, char *monitordb);
void mgmtDropAllDbs(SAcctObj *pAcct);
// util func
void mgmtAddSuperTableIntoDb(SDbObj *pDb);
......
......@@ -24,19 +24,9 @@ extern "C" {
int32_t mgmtInitDnodes();
void mgmtCleanUpDnodes();
int32_t mgmtGetDnodesNum();
int32_t mgmtUpdateDnode(SDnodeObj *pDnode);
SDnodeObj* mgmtGetDnode(int32_t dnodeId);
SDnodeObj* mgmtGetDnodeByIp(uint32_t ip);
bool mgmtCheckDnodeInRemoveState(SDnodeObj *pDnode);
bool mgmtCheckDnodeInOfflineState(SDnodeObj *pDnode);
bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType);
void mgmtSetDnodeUnRemove(SDnodeObj *pDnode);
void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode);
void mgmtCalcNumOfFreeVnodes(SDnodeObj *pDnode);
void mgmtSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes, int32_t vgId);
void mgmtUnSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes);
#ifdef __cplusplus
}
#endif
......
......@@ -19,14 +19,30 @@
#ifdef __cplusplus
"C" {
#endif
#include "mnode.h"
bool mgmtCheckExpired();
void mgmtAddTimeSeries(SAcctObj *pAcct, uint32_t timeSeriesNum);
void mgmtRestoreTimeSeries(SAcctObj *pAcct, uint32_t timeseries);
int32_t mgmtCheckTimeSeries(uint32_t timeseries);
int32_t mgmtCheckUserGrant();
int32_t mgmtCheckDbGrant();
typedef enum {
TSDB_GRANT_ALL,
TSDB_GRANT_TIME,
TSDB_GRANT_USER,
TSDB_GRANT_DB,
TSDB_GRANT_TIMESERIES,
TSDB_GRANT_DNODE,
TSDB_GRANT_ACCT,
TSDB_GRANT_STORAGE,
TSDB_GRANT_SPEED,
TSDB_GRANT_QUERY_TIME,
TSDB_GRANT_CONNS,
TSDB_GRANT_STREAMS,
TSDB_GRANT_CPU_CORES,
} EGrantType;
int32_t grantInit();
void grantCleanUp();
void grantParseParameter();
int32_t grantCheck(EGrantType grant);
void grantReset(EGrantType grant, uint64_t value);
void grantAdd(EGrantType grant, uint64_t value);
void grantRestore(EGrantType grant, uint64_t value);
#ifdef __cplusplus
}
......
......@@ -63,6 +63,7 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper);
void *sdbGetRow(void *handle, void *key);
void *sdbFetchRow(void *handle, void *pNode, void **ppRow);
int64_t sdbGetNumOfRows(void *handle);
int64_t sdbGetId(void *handle);
uint64_t sdbGetVersion();
#ifdef __cplusplus
......
......@@ -25,6 +25,8 @@ int32_t mgmtInitUsers();
void mgmtCleanUpUsers();
SUserObj *mgmtGetUser(char *name);
SUserObj *mgmtGetUserFromConn(void *pConn, bool *usePublicIp);
int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass);
void mgmtDropAllUsers(SAcctObj *pAcct);
#ifdef __cplusplus
}
......
......@@ -15,21 +15,29 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "mnode.h"
#include "mgmtAcct.h"
#ifndef _ACCOUNT
static SAcctObj tsAcctObj;
static SAcctObj tsAcctObj = {0};
int32_t (*mgmtInitAcctsFp)() = NULL;
void (*mgmtCleanUpAcctsFp)() = NULL;
SAcctObj *(*mgmtGetAcctFp)(char *acctName) = NULL;
int32_t (*mgmtCheckUserLimitFp)(SAcctObj *pAcct) = NULL;
int32_t (*mgmtCheckDbLimitFp)(SAcctObj *pAcct) = NULL;
int32_t (*mgmtCheckTableLimitFp)(SAcctObj *pAcct, int32_t numOfTimeSeries) = NULL;
int32_t acctInit() {
tsAcctObj.acctId = 0;
strcpy(tsAcctObj.user, "root");
return TSDB_CODE_SUCCESS;
}
void acctCleanUp() {}
SAcctObj *acctGetAcct(char *acctName) { return &tsAcctObj; }
int32_t acctCheck(SAcctObj *pAcct, EAcctGrantType type) { return TSDB_CODE_SUCCESS; }
#endif
int32_t mgmtAddDbIntoAcct(SAcctObj *pAcct, SDbObj *pDb) {
int32_t acctAddDb(SAcctObj *pAcct, SDbObj *pDb) {
pthread_mutex_lock(&pAcct->mutex);
pDb->next = pAcct->pHead;
pDb->prev = NULL;
pDb->pAcct = pAcct;
if (pAcct->pHead) {
pAcct->pHead->prev = pDb;
......@@ -42,7 +50,7 @@ int32_t mgmtAddDbIntoAcct(SAcctObj *pAcct, SDbObj *pDb) {
return 0;
}
int32_t mgmtRemoveDbFromAcct(SAcctObj *pAcct, SDbObj *pDb) {
int32_t acctRemoveDb(SAcctObj *pAcct, SDbObj *pDb) {
pthread_mutex_lock(&pAcct->mutex);
if (pDb->prev) {
pDb->prev->next = pDb->next;
......@@ -62,7 +70,7 @@ int32_t mgmtRemoveDbFromAcct(SAcctObj *pAcct, SDbObj *pDb) {
return 0;
}
int32_t mgmtAddUserIntoAcct(SAcctObj *pAcct, SUserObj *pUser) {
int32_t acctAddUser(SAcctObj *pAcct, SUserObj *pUser) {
pthread_mutex_lock(&pAcct->mutex);
pUser->next = pAcct->pUser;
pUser->prev = NULL;
......@@ -79,7 +87,7 @@ int32_t mgmtAddUserIntoAcct(SAcctObj *pAcct, SUserObj *pUser) {
return 0;
}
int32_t mgmtRemoveUserFromAcct(SAcctObj *pAcct, SUserObj *pUser) {
int32_t acctRemoveUser(SAcctObj *pAcct, SUserObj *pUser) {
pthread_mutex_lock(&pAcct->mutex);
if (pUser->prev) {
pUser->prev->next = pUser->next;
......@@ -88,7 +96,7 @@ int32_t mgmtRemoveUserFromAcct(SAcctObj *pAcct, SUserObj *pUser) {
if (pUser->next) {
pUser->next->prev = pUser->prev;
}
if (pUser->prev == NULL) {
pAcct->pUser = pUser->next;
}
......@@ -97,51 +105,4 @@ int32_t mgmtRemoveUserFromAcct(SAcctObj *pAcct, SUserObj *pUser) {
pthread_mutex_unlock(&pAcct->mutex);
return 0;
}
int32_t mgmtInitAccts() {
if (mgmtInitAcctsFp) {
return (*mgmtInitAcctsFp)();
} else {
tsAcctObj.acctId = 0;
strcpy(tsAcctObj.user, "root");
return 0;
}
}
SAcctObj *mgmtGetAcct(char *acctName) {
if (mgmtGetAcctFp) {
return (*mgmtGetAcctFp)(acctName);
} else {
return &tsAcctObj;
}
}
void mgmtCleanUpAccts() {
if (mgmtCleanUpAcctsFp) {
(*mgmtCleanUpAcctsFp)();
}
}
int32_t mgmtCheckUserLimit(SAcctObj *pAcct) {
if (mgmtCheckUserLimitFp) {
return (*mgmtCheckUserLimitFp)(pAcct);
}
return 0;
}
int32_t mgmtCheckDbLimit(SAcctObj *pAcct) {
if (mgmtCheckDbLimitFp) {
return (*mgmtCheckDbLimitFp)(pAcct);
} else {
return 0;
}
}
int32_t mgmtCheckTableLimit(SAcctObj *pAcct, int32_t numOfTimeSeries) {
if (mgmtCheckTableLimitFp) {
return (*mgmtCheckTableLimitFp)(pAcct, numOfTimeSeries);
} else {
return 0;
}
}
\ No newline at end of file
......@@ -48,7 +48,7 @@
#include "mgmtVgroup.h"
#include "mgmtUser.h"
static void *tsChildTableSdb;
void *tsChildTableSdb;
static int32_t tsChildTableUpdateSize;
static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *queueMsg);
static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg);
......@@ -84,7 +84,7 @@ static int32_t mgmtChildTableActionInsert(SSdbOperDesc *pOper) {
return TSDB_CODE_INVALID_DB;
}
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct);
if (pAcct == NULL) {
mError("ctable:%s, account:%s not exists", pTable->info.tableId, pDb->cfg.acct);
return TSDB_CODE_INVALID_ACCT;
......@@ -93,9 +93,11 @@ static int32_t mgmtChildTableActionInsert(SSdbOperDesc *pOper) {
if (pTable->info.type == TSDB_CHILD_TABLE) {
pTable->superTable = mgmtGetSuperTable(pTable->superTableId);
pTable->superTable->numOfTables++;
mgmtAddTimeSeries(pAcct, pTable->superTable->numOfColumns - 1);
grantAdd(TSDB_GRANT_TIMESERIES, pTable->superTable->numOfColumns - 1);
pAcct->acctInfo.numOfTimeSeries += (pTable->superTable->numOfColumns - 1);
} else {
mgmtAddTimeSeries(pAcct, pTable->numOfColumns - 1);
grantAdd(TSDB_GRANT_TIMESERIES, pTable->numOfColumns - 1);
pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1);
}
mgmtAddTableIntoDb(pDb);
mgmtAddTableIntoVgroup(pVgroup, pTable);
......@@ -120,17 +122,19 @@ static int32_t mgmtChildTableActionDelete(SSdbOperDesc *pOper) {
return TSDB_CODE_INVALID_DB;
}
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct);
if (pAcct == NULL) {
mError("ctable:%s, account:%s not exists", pTable->info.tableId, pDb->cfg.acct);
return TSDB_CODE_INVALID_ACCT;
}
if (pTable->info.type == TSDB_CHILD_TABLE) {
mgmtRestoreTimeSeries(pAcct, pTable->superTable->numOfColumns - 1);
grantRestore(TSDB_GRANT_TIMESERIES, pTable->superTable->numOfColumns - 1);
pAcct->acctInfo.numOfTimeSeries -= (pTable->superTable->numOfColumns - 1);
pTable->superTable->numOfTables--;
} else {
mgmtRestoreTimeSeries(pAcct, pTable->numOfColumns - 1);
grantRestore(TSDB_GRANT_TIMESERIES, pTable->numOfColumns - 1);
pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1);
}
mgmtRemoveTableFromDb(pDb);
mgmtRemoveTableFromVgroup(pVgroup, pTable);
......@@ -464,9 +468,9 @@ static SChildTableObj* mgmtDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj
void mgmtCreateChildTable(SQueuedMsg *pMsg) {
SCMCreateTableMsg *pCreate = pMsg->pCont;
int32_t code = mgmtCheckTimeSeries(htons(pCreate->numOfColumns));
int32_t code = grantCheck(TSDB_GRANT_TIMESERIES);
if (code != TSDB_CODE_SUCCESS) {
mError("table:%s, failed to create, timeseries exceed the limit", pCreate->tableId);
mError("table:%s, failed to create, grant not", pCreate->tableId);
mgmtSendSimpleResp(pMsg->thandle, code);
return;
}
......@@ -634,7 +638,7 @@ static int32_t mgmtAddNormalTableColumn(SChildTableObj *pTable, SSchema schema[]
return TSDB_CODE_APP_ERROR;
}
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct);
if (pAcct == NULL) {
mError("DB: %s not belongs to andy account", pDb->name);
return TSDB_CODE_APP_ERROR;
......@@ -677,7 +681,7 @@ static int32_t mgmtDropNormalTableColumnByName(SChildTableObj *pTable, char *col
return TSDB_CODE_APP_ERROR;
}
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct);
if (pAcct == NULL) {
mError("DB: %s not belongs to any account", pDb->name);
return TSDB_CODE_APP_ERROR;
......
......@@ -35,7 +35,7 @@
#include "mgmtUser.h"
#include "mgmtVgroup.h"
static void *tsDbSdb = NULL;
void * tsDbSdb = NULL;
static int32_t tsDbUpdateSize;
static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate);
......@@ -54,7 +54,7 @@ static int32_t mgmtDbActionDestroy(SSdbOperDesc *pOper) {
static int32_t mgmtDbActionInsert(SSdbOperDesc *pOper) {
SDbObj *pDb = pOper->pObj;
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct);
pDb->pHead = NULL;
pDb->pTail = NULL;
......@@ -65,7 +65,7 @@ static int32_t mgmtDbActionInsert(SSdbOperDesc *pOper) {
pDb->numOfSuperTables = 0;
if (pAcct != NULL) {
mgmtAddDbIntoAcct(pAcct, pDb);
acctAddDb(pAcct, pDb);
}
else {
mError("db:%s, acct:%s info not exist in sdb", pDb->name, pDb->cfg.acct);
......@@ -77,9 +77,9 @@ static int32_t mgmtDbActionInsert(SSdbOperDesc *pOper) {
static int32_t mgmtDbActionDelete(SSdbOperDesc *pOper) {
SDbObj *pDb = pOper->pObj;
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct);
mgmtRemoveDbFromAcct(pAcct, pDb);
acctRemoveDb(pAcct, pDb);
mgmtDropAllChildTables(pDb);
mgmtDropAllSuperTables(pDb);
mgmtDropAllVgroups(pDb);
......@@ -277,7 +277,7 @@ static int32_t mgmtCheckDbParams(SCMCreateDbMsg *pCreate) {
}
static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) {
int32_t code = mgmtCheckDbLimit(pAcct);
int32_t code = acctCheck(pAcct, TSDB_ACCT_DB);
if (code != 0) {
return code;
}
......@@ -292,7 +292,7 @@ static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) {
assert(pCreate->daysToKeep1 <= pCreate->daysToKeep2 && pCreate->daysToKeep2 <= pCreate->daysToKeep);
code = mgmtCheckDbGrant();
code = grantCheck(TSDB_GRANT_DB);
if (code != 0) {
return code;
}
......@@ -692,7 +692,7 @@ static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg) {
pCreate->rowsInFileBlock = htonl(pCreate->rowsInFileBlock);
int32_t code;
if (mgmtCheckExpired()) {
if (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_GRANT_EXPIRED;
} else if (!pMsg->pUser->writeAuth) {
code = TSDB_CODE_NO_RIGHTS;
......@@ -771,7 +771,7 @@ static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) {
SCMAlterDbMsg *pAlter = pMsg->pCont;
mTrace("db:%s, alter db msg is received from thandle:%p", pAlter->db, pMsg->thandle);
if (mgmtCheckExpired()) {
if (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS) {
mError("db:%s, failed to alter, grant expired", pAlter->db);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_GRANT_EXPIRED);
return;
......@@ -842,7 +842,7 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) {
SCMDropDbMsg *pDrop = pMsg->pCont;
mTrace("db:%s, drop db msg is received from thandle:%p", pDrop->db, pMsg->thandle);
if (mgmtCheckExpired()) {
if (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS) {
mError("db:%s, failed to drop, grant expired", pDrop->db);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_GRANT_EXPIRED);
return;
......@@ -899,3 +899,20 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) {
newMsg->ahandle = pDb;
taosTmrReset(mgmtDropDb, 10, newMsg, tsMgmtTmr, &tmpTmr);
}
void mgmtDropAllDbs(SAcctObj *pAcct) {
int32_t numOfDbs = 0;
SDbObj *pDb = NULL;
while (1) {
void *pNode = sdbFetchRow(tsDbSdb, pNode, (void **)&pDb);
if (pDb == NULL) break;
if (pDb->pAcct == pAcct) {
mgmtSetDbDirty(pDb);
numOfDbs++;
}
}
mTrace("acct:%s, all dbs is is set dirty", pAcct->acctId, numOfDbs);
}
\ No newline at end of file
......@@ -26,454 +26,76 @@
#include "mgmtUser.h"
#include "mgmtVgroup.h"
int32_t (*mgmtInitDnodesFp)() = NULL;
void (*mgmtCleanUpDnodesFp)() = NULL;
SDnodeObj *(*mgmtGetDnodeFp)(uint32_t ip) = NULL;
SDnodeObj *(*mgmtGetDnodeByIpFp)(int32_t dnodeId) = NULL;
int32_t (*mgmtGetDnodesNumFp)() = NULL;
int32_t (*mgmtUpdateDnodeFp)(SDnodeObj *pDnode) = NULL;
void * (*mgmtGetNextDnodeFp)(SShowObj *pShow, SDnodeObj **pDnode) = NULL;
void (*mgmtSetDnodeUnRemoveFp)(SDnodeObj *pDnode) = NULL;
static SDnodeObj tsDnodeObj = {0};
static void * mgmtGetNextDnode(SShowObj *pShow, SDnodeObj **pDnode);
static bool mgmtCheckConfigShow(SGlobalConfig *cfg);
static int32_t mgmtGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mgmtGetConfigMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static void mgmtProcessCfgDnodeMsg(SQueuedMsg *pMsg);
static void mgmtProcessCfgDnodeMsgRsp(SRpcMsg *rpcMsg) ;
static void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg);
void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode) {
int32_t maxVnodes = pDnode->numOfCores * tsNumOfVnodesPerCore;
maxVnodes = maxVnodes > TSDB_MAX_VNODES ? TSDB_MAX_VNODES : maxVnodes;
maxVnodes = maxVnodes < TSDB_MIN_VNODES ? TSDB_MIN_VNODES : maxVnodes;
if (pDnode->numOfTotalVnodes == 0) {
pDnode->numOfTotalVnodes = maxVnodes;
}
if (pDnode->alternativeRole == TSDB_DNODE_ROLE_MGMT) {
pDnode->numOfTotalVnodes = 0;
}
pDnode->openVnodes = 0;
pDnode->status = TSDB_DN_STATUS_OFFLINE;
mgmtUpdateDnode(pDnode);
}
bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) {
uint32_t status = pDnode->moduleStatus & (1 << moduleType);
return status > 0;
}
int32_t mgmtGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0;
SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL);
if (pUser == NULL) return 0;
if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
SSchema *pSchema = pMeta->schema;
pShow->bytes[cols] = 16;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "IP");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 10;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "module type");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 10;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "module status");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = 0;
SDnodeObj *pDnode = NULL;
while (1) {
pShow->pNode = mgmtGetNextDnode(pShow, (SDnodeObj **)&pDnode);
if (pDnode == NULL) break;
for (int32_t moduleType = 0; moduleType < TSDB_MOD_MAX; ++moduleType) {
if (mgmtCheckModuleInDnode(pDnode, moduleType)) {
pShow->numOfRows++;
}
}
}
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->pNode = NULL;
return 0;
}
int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
SDnodeObj *pDnode = NULL;
char * pWrite;
int32_t cols = 0;
char ipstr[20];
while (numOfRows < rows) {
pShow->pNode = mgmtGetNextDnode(pShow, (SDnodeObj **)&pDnode);
if (pDnode == NULL) break;
for (int32_t moduleType = 0; moduleType < TSDB_MOD_MAX; ++moduleType) {
if (!mgmtCheckModuleInDnode(pDnode, moduleType)) {
continue;
}
cols = 0;
tinet_ntoa(ipstr, pDnode->privateIp);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, ipstr);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, tsModule[moduleType].name);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, taosGetDnodeStatusStr(pDnode->status) );
cols++;
numOfRows++;
}
}
pShow->numOfReads += numOfRows;
return numOfRows;
}
static int32_t mgmtGetConfigMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0;
SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL);
if (pUser == NULL) return 0;
if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
SSchema *pSchema = pMeta->schema;
pShow->bytes[cols] = TSDB_CFG_OPTION_LEN;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "config name");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = TSDB_CFG_VALUE_LEN;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "config value");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
pShow->numOfRows = 0;
for (int32_t i = tsGlobalConfigNum - 1; i >= 0; --i) {
SGlobalConfig *cfg = tsGlobalConfig + i;
if (!mgmtCheckConfigShow(cfg)) continue;
pShow->numOfRows++;
}
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->pNode = NULL;
return 0;
}
static int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
for (int32_t i = tsGlobalConfigNum - 1; i >= 0 && numOfRows < rows; --i) {
SGlobalConfig *cfg = tsGlobalConfig + i;
if (!mgmtCheckConfigShow(cfg)) continue;
char *pWrite;
int32_t cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
snprintf(pWrite, TSDB_CFG_OPTION_LEN, "%s", cfg->option);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
switch (cfg->valType) {
case TSDB_CFG_VTYPE_SHORT:
snprintf(pWrite, TSDB_CFG_VALUE_LEN, "%d", *((int16_t *)cfg->ptr));
numOfRows++;
break;
case TSDB_CFG_VTYPE_INT:
snprintf(pWrite, TSDB_CFG_VALUE_LEN, "%d", *((int32_t *)cfg->ptr));
numOfRows++;
break;
case TSDB_CFG_VTYPE_UINT:
snprintf(pWrite, TSDB_CFG_VALUE_LEN, "%d", *((uint32_t *)cfg->ptr));
numOfRows++;
break;
case TSDB_CFG_VTYPE_FLOAT:
snprintf(pWrite, TSDB_CFG_VALUE_LEN, "%f", *((float *)cfg->ptr));
numOfRows++;
break;
case TSDB_CFG_VTYPE_STRING:
case TSDB_CFG_VTYPE_IPSTR:
case TSDB_CFG_VTYPE_DIRECTORY:
snprintf(pWrite, TSDB_CFG_VALUE_LEN, "%s", (char *)cfg->ptr);
numOfRows++;
break;
default:
break;
}
}
pShow->numOfReads += numOfRows;
return numOfRows;
}
static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0;
SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL);
if (pUser == NULL) return 0;
if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
SSchema *pSchema = pMeta->schema;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "vnode");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 12;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "status");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 12;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "sync_status");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
SDnodeObj *pDnode = NULL;
if (pShow->payloadLen > 0 ) {
uint32_t ip = ip2uint(pShow->payload);
pDnode = mgmtGetDnodeByIp(ip);
if (NULL == pDnode) {
return TSDB_CODE_NODE_OFFLINE;
}
SVnodeLoad* pVnode;
pShow->numOfRows = 0;
for (int32_t i = 0 ; i < TSDB_MAX_VNODES; i++) {
pVnode = &pDnode->vload[i];
if (0 != pVnode->vgId) {
pShow->numOfRows++;
}
}
pShow->pNode = pDnode;
} else {
while (true) {
pShow->pNode = mgmtGetNextDnode(pShow, (SDnodeObj **)&pDnode);
if (pDnode == NULL) break;
pShow->numOfRows += pDnode->openVnodes;
if (0 == pShow->numOfRows) return TSDB_CODE_NODE_OFFLINE;
}
pShow->pNode = NULL;
}
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
return 0;
}
static int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
SDnodeObj *pDnode = NULL;
char * pWrite;
int32_t cols = 0;
if (0 == rows) return 0;
if (pShow->payloadLen) {
// output the vnodes info of the designated dnode. And output all vnodes of this dnode, instead of rows (max 100)
pDnode = (SDnodeObj *)(pShow->pNode);
if (pDnode != NULL) {
SVnodeLoad* pVnode;
for (int32_t i = 0 ; i < TSDB_MAX_VNODES; i++) {
pVnode = &pDnode->vload[i];
if (0 == pVnode->vgId) {
continue;
}
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(uint32_t *)pWrite = pVnode->vgId;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, taosGetVnodeStatusStr(pVnode->status));
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, taosGetVnodeSyncStatusStr(pVnode->syncStatus));
cols++;
numOfRows++;
}
}
} else {
// TODO: output all vnodes of all dnodes
numOfRows = 0;
}
pShow->numOfReads += numOfRows;
return numOfRows;
}
extern int32_t clusterInit();
extern void clusterCleanUp();
extern int32_t clusterGetDnodesNum();
extern SDnodeObj* clusterGetDnode(int32_t dnodeId);
extern SDnodeObj* clusterGetDnodeByIp(uint32_t ip);
static SDnodeObj tsDnodeObj = {0};
int32_t mgmtInitDnodes() {
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_MODULE, mgmtGetModuleMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_MODULE, mgmtRetrieveModules);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_CONFIGS, mgmtGetConfigMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_CONFIGS, mgmtRetrieveConfigs);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_VNODES, mgmtGetVnodeMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_VNODES, mgmtRetrieveVnodes);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CONFIG_DNODE, mgmtProcessCfgDnodeMsg);
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP, mgmtProcessCfgDnodeMsgRsp);
mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_STATUS, mgmtProcessDnodeStatusMsg);
if (mgmtInitDnodesFp) {
return mgmtInitDnodesFp();
} else {
tsDnodeObj.dnodeId = 1;
tsDnodeObj.privateIp = inet_addr(tsPrivateIp);
tsDnodeObj.publicIp = inet_addr(tsPublicIp);
tsDnodeObj.createdTime = taosGetTimestampMs();
tsDnodeObj.numOfTotalVnodes = tsNumOfTotalVnodes;
tsDnodeObj.numOfCores = (uint16_t) tsNumOfCores;
tsDnodeObj.alternativeRole = TSDB_DNODE_ROLE_ANY;
tsDnodeObj.status = TSDB_DN_STATUS_OFFLINE;
tsDnodeObj.lastReboot = taosGetTimestampSec();
sprintf(tsDnodeObj.dnodeName, "%d", tsDnodeObj.dnodeId);
mgmtSetDnodeMaxVnodes(&tsDnodeObj);
tsDnodeObj.moduleStatus |= (1 << TSDB_MOD_MGMT);
if (tsEnableHttpModule) {
tsDnodeObj.moduleStatus |= (1 << TSDB_MOD_HTTP);
}
if (tsEnableMonitorModule) {
tsDnodeObj.moduleStatus |= (1 << TSDB_MOD_MONITOR);
}
return 0;
#ifdef _CLUSTER
return clusterInit();
#else
tsDnodeObj.dnodeId = 1;
tsDnodeObj.privateIp = inet_addr(tsPrivateIp);
tsDnodeObj.publicIp = inet_addr(tsPublicIp);
tsDnodeObj.createdTime = taosGetTimestampMs();
tsDnodeObj.numOfTotalVnodes = tsNumOfTotalVnodes;
tsDnodeObj.status = TSDB_DN_STATUS_OFFLINE;
tsDnodeObj.lastReboot = taosGetTimestampSec();
sprintf(tsDnodeObj.dnodeName, "%d", tsDnodeObj.dnodeId);
tsDnodeObj.moduleStatus |= (1 << TSDB_MOD_MGMT);
if (tsEnableHttpModule) {
tsDnodeObj.moduleStatus |= (1 << TSDB_MOD_HTTP);
}
if (tsEnableMonitorModule) {
tsDnodeObj.moduleStatus |= (1 << TSDB_MOD_MONITOR);
}
return 0;
#endif
}
void mgmtCleanUpDnodes() {
if (mgmtCleanUpDnodesFp) {
(*mgmtCleanUpDnodesFp)();
}
#ifdef _CLUSTER
clusterCleanUp();
#endif
}
SDnodeObj *mgmtGetDnode(int32_t dnodeId) {
if (mgmtGetDnodeFp) {
return (*mgmtGetDnodeFp)(dnodeId);
}
#ifdef _CLUSTER
return clusterGetDnode(dnodeId);
#else
if (dnodeId == 1) {
return &tsDnodeObj;
} else {
return NULL;
}
return NULL;
#endif
}
SDnodeObj *mgmtGetDnodeByIp(uint32_t ip) {
if (mgmtGetDnodeByIpFp) {
return (*mgmtGetDnodeByIpFp)(ip);
}
#ifdef _CLUSTER
return clusterGetDnodeByIp(ip);
#else
return &tsDnodeObj;
#endif
}
int32_t mgmtGetDnodesNum() {
if (mgmtGetDnodesNumFp) {
return (*mgmtGetDnodesNumFp)();
} else {
return 1;
}
}
int32_t mgmtUpdateDnode(SDnodeObj *pDnode) {
if (mgmtUpdateDnodeFp) {
return (*mgmtUpdateDnodeFp)(pDnode);
} else {
return 0;
}
}
void *mgmtGetNextDnode(SShowObj *pShow, SDnodeObj **pDnode) {
if (mgmtGetNextDnodeFp) {
return (*mgmtGetNextDnodeFp)(pShow, pDnode);
} else {
if (*pDnode == NULL) {
*pDnode = &tsDnodeObj;
} else {
*pDnode = NULL;
}
}
return *pDnode;
}
void mgmtSetDnodeUnRemove(SDnodeObj *pDnode) {
if (mgmtSetDnodeUnRemoveFp) {
(*mgmtSetDnodeUnRemoveFp)(pDnode);
}
}
bool mgmtCheckConfigShow(SGlobalConfig *cfg) {
if (!(cfg->cfgType & TSDB_CFG_CTYPE_B_SHOW))
return false;
return true;
}
bool mgmtCheckDnodeInRemoveState(SDnodeObj *pDnode) {
return pDnode->lbStatus == TSDB_DN_LB_STATUS_OFFLINE_REMOVING || pDnode->lbStatus == TSDB_DN_LB_STATE_SHELL_REMOVING;
}
bool mgmtCheckDnodeInOfflineState(SDnodeObj *pDnode) {
return pDnode->status == TSDB_DN_STATUS_OFFLINE;
#ifdef _CLUSTER
return clusterGetDnodesNum();
#else
return 1;
#endif
}
void mgmtProcessCfgDnodeMsg(SQueuedMsg *pMsg) {
......@@ -553,14 +175,10 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
pDnode->numOfCores = htons(pStatus->numOfCores);
pDnode->diskAvailable = pStatus->diskAvailable;
pDnode->alternativeRole = pStatus->alternativeRole;
if (pDnode->numOfTotalVnodes == 0) {
pDnode->numOfTotalVnodes = htons(pStatus->numOfTotalVnodes);
}
pDnode->numOfTotalVnodes = htons(pStatus->numOfTotalVnodes);
if (pStatus->dnodeId == 0) {
mTrace("dnode:%d, first access, privateIp:%s, name:%s, ", pDnode->dnodeId, taosIpStr(pDnode->privateIp), pDnode->dnodeName);
mgmtSetDnodeMaxVnodes(pDnode);
}
int32_t openVnodes = htons(pStatus->openVnodes);
......
......@@ -14,58 +14,18 @@
*/
#define _DEFAULT_SOURCE
#ifndef _GRANT
#include "os.h"
#include "mgmtAcct.h"
int32_t (*mgmtCheckUserGrantFp)() = NULL;
int32_t (*mgmtCheckDbGrantFp)() = NULL;
void (*mgmtAddTimeSeriesFp)(uint32_t timeSeriesNum) = NULL;
void (*mgmtRestoreTimeSeriesFp)(uint32_t timeSeriesNum) = NULL;
int32_t (*mgmtCheckTimeSeriesFp)(uint32_t timeseries) = NULL;
bool (*mgmtCheckExpiredFp)() = NULL;
int32_t mgmtCheckUserGrant() {
if (mgmtCheckUserGrantFp) {
return (*mgmtCheckUserGrantFp)();
} else {
return 0;
}
}
int32_t mgmtCheckDbGrant() {
if (mgmtCheckDbGrantFp) {
return (*mgmtCheckDbGrantFp)();
} else {
return 0;
}
}
void mgmtAddTimeSeries(SAcctObj *pAcct, uint32_t timeSeriesNum) {
pAcct->acctInfo.numOfTimeSeries += timeSeriesNum;
if (mgmtAddTimeSeriesFp) {
(*mgmtAddTimeSeriesFp)(timeSeriesNum);
}
}
void mgmtRestoreTimeSeries(SAcctObj *pAcct, uint32_t timeSeriesNum) {
pAcct->acctInfo.numOfTimeSeries -= timeSeriesNum;
if (mgmtRestoreTimeSeriesFp) {
(*mgmtRestoreTimeSeriesFp)(timeSeriesNum);
}
}
int32_t mgmtCheckTimeSeries(uint32_t timeseries) {
if (mgmtCheckTimeSeriesFp) {
return (*mgmtCheckTimeSeriesFp)(timeseries);
} else {
return 0;
}
}
bool mgmtCheckExpired() {
if (mgmtCheckExpiredFp) {
return mgmtCheckExpiredFp();
} else {
return false;
}
}
#include "taoserror.h"
#include "tlog.h"
#include "mgmtGrant.h"
int32_t grantInit() { return TSDB_CODE_SUCCESS; }
void grantCleanUp() {}
void grantParseParameter() { mError("can't parsed parameter k"); }
int32_t grantCheck(EGrantType grant) { return TSDB_CODE_SUCCESS; }
void grantReset(EGrantType grant, uint64_t value) {}
void grantAdd(EGrantType grant, uint64_t value) {}
void grantRestore(EGrantType grant, uint64_t value) {}
#endif
\ No newline at end of file
......@@ -25,6 +25,7 @@
#include "mgmtDClient.h"
#include "mgmtDnode.h"
#include "mgmtDServer.h"
#include "mgmtGrant.h"
#include "mgmtMnode.h"
#include "mgmtSdb.h"
#include "mgmtVgroup.h"
......@@ -73,11 +74,16 @@ int32_t mgmtStartSystem() {
return -1;
}
if (mgmtInitAccts() < 0) {
if (acctInit() < 0) {
mError("failed to init accts");
return -1;
}
if (grantInit() < 0) {
mError("failed to init grants");
return -1;
}
if (mgmtInitUsers() < 0) {
mError("failed to init users");
return -1;
......@@ -138,6 +144,7 @@ void mgmtStopSystem() {
void mgmtCleanUpSystem() {
mPrint("starting to clean up mgmt");
grantCleanUp();
mgmtCleanupMnodes();
mgmtCleanupBalance();
mgmtCleanUpShell();
......@@ -148,7 +155,7 @@ void mgmtCleanUpSystem() {
mgmtCleanUpDbs();
mgmtCleanUpDnodes();
mgmtCleanUpUsers();
mgmtCleanUpAccts();
acctCleanUp();
taosTmrCleanUp(tsMgmtTmr);
mPrint("mgmt is cleaned up");
}
......
......@@ -137,7 +137,7 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
return;
}
if (mgmtCheckExpired()) {
if (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS) {
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_GRANT_EXPIRED);
return;
}
......@@ -373,12 +373,12 @@ static void mgmtProcessConnectMsg(SQueuedMsg *pMsg) {
goto connect_over;
}
if (mgmtCheckExpired()) {
if (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_GRANT_EXPIRED;
goto connect_over;
}
SAcctObj *pAcct = mgmtGetAcct(pUser->acct);
SAcctObj *pAcct = acctGetAcct(pUser->acct);
if (pAcct == NULL) {
code = TSDB_CODE_INVALID_ACCT;
goto connect_over;
......
......@@ -253,7 +253,7 @@ static int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], i
return TSDB_CODE_APP_ERROR;
}
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct);
if (pAcct == NULL) {
mError("DB: %s not belongs to andy account", pDb->name);
return TSDB_CODE_APP_ERROR;
......@@ -293,7 +293,7 @@ static int32_t mgmtDropSuperTableTag(SSuperTableObj *pStable, char *tagName) {
return TSDB_CODE_APP_ERROR;
}
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct);
if (pAcct == NULL) {
mError("DB: %s not belongs to any account", pDb->name);
return TSDB_CODE_APP_ERROR;
......@@ -381,7 +381,7 @@ static int32_t mgmtAddSuperTableColumn(SSuperTableObj *pStable, SSchema schema[]
return TSDB_CODE_APP_ERROR;
}
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct);
if (pAcct == NULL) {
mError("DB: %s not belongs to andy account", pDb->name);
return TSDB_CODE_APP_ERROR;
......@@ -420,7 +420,7 @@ static int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pStable, char *col
return TSDB_CODE_APP_ERROR;
}
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct);
if (pAcct == NULL) {
mError("DB: %s not belongs to any account", pDb->name);
return TSDB_CODE_APP_ERROR;
......
......@@ -25,10 +25,9 @@
#include "mgmtShell.h"
#include "mgmtUser.h"
static void *tsUserSdb = NULL;
void * tsUserSdb = NULL;
static int32_t tsUserUpdateSize = 0;
static int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass);
static int32_t mgmtDropUser(SAcctObj *pAcct, char *name);
static int32_t mgmtUpdateUser(SUserObj *pUser);
static int32_t mgmtGetUserMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
......@@ -45,10 +44,10 @@ static int32_t mgmtUserActionDestroy(SSdbOperDesc *pOper) {
static int32_t mgmtUserActionInsert(SSdbOperDesc *pOper) {
SUserObj *pUser = pOper->pObj;
SAcctObj *pAcct = mgmtGetAcct(pUser->acct);
SAcctObj *pAcct = acctGetAcct(pUser->acct);
if (pAcct != NULL) {
mgmtAddUserIntoAcct(pAcct, pUser);
acctAddUser(pAcct, pUser);
}
else {
mError("user:%s, acct:%s info not exist in sdb", pUser->user, pUser->acct);
......@@ -60,9 +59,9 @@ static int32_t mgmtUserActionInsert(SSdbOperDesc *pOper) {
static int32_t mgmtUserActionDelete(SSdbOperDesc *pOper) {
SUserObj *pUser = pOper->pObj;
SAcctObj *pAcct = mgmtGetAcct(pUser->acct);
SAcctObj *pAcct = acctGetAcct(pUser->acct);
mgmtRemoveUserFromAcct(pAcct, pUser);
acctRemoveUser(pAcct, pUser);
return TSDB_CODE_SUCCESS;
}
......@@ -115,7 +114,7 @@ int32_t mgmtInitUsers() {
return -1;
}
SAcctObj *pAcct = mgmtGetAcct("root");
SAcctObj *pAcct = acctGetAcct("root");
mgmtCreateUser(pAcct, "root", "taosdata");
mgmtCreateUser(pAcct, "monitor", tsInternalPass);
mgmtCreateUser(pAcct, "_root", tsInternalPass);
......@@ -155,8 +154,8 @@ static int32_t mgmtUpdateUser(SUserObj *pUser) {
return code;
}
static int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass) {
int32_t code = mgmtCheckUserLimit(pAcct);
int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass) {
int32_t code = acctCheck(pAcct, TSDB_ACCT_USER);
if (code != 0) {
return code;
}
......@@ -171,8 +170,8 @@ static int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass) {
return TSDB_CODE_USER_ALREADY_EXIST;
}
code = mgmtCheckUserGrant();
if (code != 0) {
code = grantCheck(TSDB_GRANT_USER);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -482,3 +481,30 @@ static void mgmtProcessDropUserMsg(SQueuedMsg *pMsg) {
mgmtSendSimpleResp(pMsg->thandle, code);
}
void mgmtDropAllUsers(SAcctObj *pAcct) {
void *pNode = NULL;
void *pLastNode = NULL;
int32_t numOfUsers = 0;
int32_t acctNameLen = strlen(pAcct->user);
SUserObj *pUser = NULL;
while (1) {
pNode = sdbFetchRow(tsUserSdb, pNode, (void **)&pUser);
if (pUser == NULL) break;
if (strncmp(pUser->acct, pAcct->user, acctNameLen) == 0) {
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_LOCAL,
.table = tsUserSdb,
.pObj = pUser,
};
sdbDeleteRow(&oper);
pNode = pLastNode;
numOfUsers++;
continue;
}
}
mTrace("acct:%s, all users is dropped from sdb", pAcct->acctId, numOfUsers);
}
\ No newline at end of file
......@@ -32,7 +32,7 @@
#include "mgmtTable.h"
#include "mgmtVgroup.h"
static void *tsVgroupSdb = NULL;
void *tsVgroupSdb = NULL;
static int32_t tsVgUpdateSize = 0;
static int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
......
......@@ -9,25 +9,20 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY(tutil ${SRC})
TARGET_LINK_LIBRARIES(tutil thirdparty pthread os m rt)
IF (TD_CLUSTER)
FIND_PATH(ICONV_INCLUDE_EXIST iconv.h /usr/include/ /usr/local/include/)
IF (ICONV_INCLUDE_EXIST)
ADD_DEFINITIONS(-DUSE_LIBICONV)
TARGET_LINK_LIBRARIES(tutil iconv)
ELSE()
FIND_PATH(ICONV_INCLUDE_EXIST iconv.h /usr/include/ /usr/local/include/)
IF (ICONV_INCLUDE_EXIST)
ADD_DEFINITIONS(-DUSE_LIBICONV)
FIND_PATH(ICONV_LIBRARY_A_EXIST libiconv.a /usr/lib/ /usr/local/lib/ /lib64)
FIND_PATH(ICONV_LIBRARY_SO_EXIST libiconv.so /usr/lib/ /usr/local/lib/ /lib64)
IF (ICONV_LIBRARY_A_EXIST OR ICONV_LIBRARY_SO_EXIST)
MESSAGE(STATUS "Use the installed libiconv library")
TARGET_LINK_LIBRARIES(tutil iconv)
ELSE ()
# libiconv library is already included in GLIBC,
MESSAGE(STATUS "Use the iconv functions in GLIBC")
ENDIF ()
FIND_PATH(ICONV_LIBRARY_A_EXIST libiconv.a /usr/lib/ /usr/local/lib/ /lib64)
FIND_PATH(ICONV_LIBRARY_SO_EXIST libiconv.so /usr/lib/ /usr/local/lib/ /lib64)
IF (ICONV_LIBRARY_A_EXIST OR ICONV_LIBRARY_SO_EXIST)
MESSAGE(STATUS "Use the installed libiconv library")
TARGET_LINK_LIBRARIES(tutil iconv)
ELSE ()
MESSAGE(STATUS "Failed to find iconv, use default encoding method")
# libiconv library is already included in GLIBC,
MESSAGE(STATUS "Use the iconv functions in GLIBC")
ENDIF ()
ELSE ()
MESSAGE(STATUS "Failed to find iconv, use default encoding method")
ENDIF ()
ADD_SUBDIRECTORY(tests)
......
......@@ -81,7 +81,7 @@ float tsRatioOfQueryThreads = 0.5;
char tsPublicIp[TSDB_IPv4ADDR_LEN] = {0};
char tsPrivateIp[TSDB_IPv4ADDR_LEN] = {0};
short tsNumOfVnodesPerCore = 8;
short tsNumOfTotalVnodes = 0;
short tsNumOfTotalVnodes = TSDB_INVALID_VNODE_NUM;
short tsCheckHeaderFile = 0;
#ifdef _TD_ARM_32_
......@@ -189,7 +189,7 @@ int tsEnableCoreFile = 0;
int tsAnyIp = 1;
uint32_t tsPublicIpInt = 0;
#ifdef CLUSTER
#ifdef _CLUSTER
int tsIsCluster = 1;
#else
int tsIsCluster = 0;
......@@ -946,7 +946,7 @@ bool tsReadGlobalConfig() {
if (tsSecondIp[0] == 0) {
strcpy(tsSecondIp, tsMasterIp);
}
taosGetSystemInfo();
tsSetLocale();
......@@ -960,6 +960,12 @@ bool tsReadGlobalConfig() {
tsNumOfCores = 1;
}
if (tsNumOfTotalVnodes == TSDB_INVALID_VNODE_NUM) {
tsNumOfTotalVnodes = tsNumOfCores * tsNumOfVnodesPerCore;
tsNumOfTotalVnodes = tsNumOfTotalVnodes > TSDB_MAX_VNODES ? TSDB_MAX_VNODES : tsNumOfTotalVnodes;
tsNumOfTotalVnodes = tsNumOfTotalVnodes < TSDB_MIN_VNODES ? TSDB_MIN_VNODES : tsNumOfTotalVnodes;
}
if (strlen(tsPrivateIp) == 0) {
pError("privateIp is null");
return false;
......@@ -1052,12 +1058,12 @@ void tsPrintGlobalConfig() {
if (tscEmbedded == 0 && !(cfg->cfgType & TSDB_CFG_CTYPE_B_CLIENT)) continue;
if (cfg->cfgType & TSDB_CFG_CTYPE_B_NOT_PRINT) continue;
if (cfg->cfgType & TSDB_CFG_CTYPE_B_LITE) {
#ifdef CLUSTER
#ifdef _CLUSTER
continue;
#endif
}
if (cfg->cfgType & TSDB_CFG_CTYPE_B_CLUSTER) {
#ifndef CLUSTER
#ifndef _CLUSTER
continue;
#endif
}
......
char version[64] = "1.6.5.4";
char compatible_version[64] = "1.6.1.0";
char version[64] = "2.0.0.0";
char compatible_version[64] = "2.0.0.0";
char gitinfo[128] = "3264067e97300c84caa61ac909d548c9ca56de6b";
char gitinfoOfInternal[128] = "da88f4a2474737d1f9c76adcf0ff7fd0975e7342";
char buildinfo[512] = "Built by root at 2020-02-05 14:38";
char buildinfo[512] = "Built by root at 2020-04-01 14:38";
void libtaos_1_6_5_4_Linux_x64() {};
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/mnode/detail/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/vnode/detail/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(./src SRC)
LIST(REMOVE_ITEM SRC ./src/vnodeFileUtil.c)
LIST(REMOVE_ITEM SRC ./src/taosGrant.c)
ADD_LIBRARY(vnode ${SRC})
IF (TD_CLUSTER)
TARGET_LINK_LIBRARIES(vnode vcluster)
ELSEIF (TD_LITE)
TARGET_LINK_LIBRARIES(vnode vlite)
ENDIF ()
ENDIF ()
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODE_H
#define TDENGINE_VNODE_H
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
#include "tglobalcfg.h"
#include "tidpool.h"
#include "tlog.h"
#include "tmempool.h"
#include "trpc.h"
#include "tsclient.h"
#include "taosdef.h"
#include "tsocket.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
#include "vnodeCache.h"
#include "vnodeFile.h"
#include "vnodePeer.h"
#include "vnodeShell.h"
#define TSDB_FILE_HEADER_LEN 512
#define TSDB_FILE_HEADER_VERSION_SIZE 32
#define TSDB_CACHE_POS_BITS 13
#define TSDB_CACHE_POS_MASK 0x1FFF
#define TSDB_ACTION_INSERT 0
#define TSDB_ACTION_IMPORT 1
#define TSDB_ACTION_DELETE 2
#define TSDB_ACTION_UPDATE 3
#define TSDB_ACTION_MAX 4
enum _data_source {
TSDB_DATA_SOURCE_METER,
TSDB_DATA_SOURCE_VNODE,
TSDB_DATA_SOURCE_SHELL,
TSDB_DATA_SOURCE_QUEUE,
TSDB_DATA_SOURCE_LOG,
};
enum _sync_cmd {
TSDB_SYNC_CMD_FILE,
TSDB_SYNC_CMD_CACHE,
TSDB_SYNC_CMD_CREATE,
TSDB_SYNC_CMD_REMOVE,
};
typedef struct {
int64_t offset : 48;
int64_t length : 16;
} SMeterObjHeader;
typedef struct {
int64_t len;
char data[];
} SData;
#pragma pack(push, 8)
typedef struct {
SVnodeStatisticInfo vnodeStatistic;
int vnode;
SVnodeCfg cfg;
// SDiskDesc tierDisk[TSDB_MAX_TIER];
SVPeerDesc vpeers[TSDB_VNODES_SUPPORT];
SVnodePeer * peerInfo[TSDB_VNODES_SUPPORT];
char selfIndex;
char vnodeStatus;
char accessState; // Vnode access state, Readable/Writable
char syncStatus;
char commitInProcess;
pthread_t commitThread;
TSKEY firstKey; // minimum key uncommitted, it may be smaller than
// commitFirstKey
TSKEY commitFirstKey; // minimum key for a commit file, it shall be
// xxxx00000, calculated from fileId
TSKEY commitLastKey; // maximum key for a commit file, it shall be xxxx99999,
// calculated fromm fileId
int commitFileId;
TSKEY lastCreate;
TSKEY lastRemove;
TSKEY lastKey; // last key for the whole vnode, updated by every insert
// operation
uint64_t version;
int streamRole;
int numOfStreams;
void *streamTimer;
TSKEY lastKeyOnFile; // maximum key on the last file, is shall be xxxx99999
int fileId;
int badFileId;
int numOfFiles;
int maxFiles;
int maxFile1;
int maxFile2;
int nfd; // temp head file FD
int hfd; // head file FD
int lfd; // last file FD
int tfd; // temp last file FD
int dfd; // data file FD
int64_t dfSize;
int64_t lfSize;
uint64_t * fmagic; // hold magic number for each file
char cfn[TSDB_FILENAME_LEN];
char nfn[TSDB_FILENAME_LEN];
char lfn[TSDB_FILENAME_LEN]; // last file name
char tfn[TSDB_FILENAME_LEN]; // temp last file name
pthread_mutex_t vmutex;
int logFd;
char * pMem;
char * pWrite;
pthread_mutex_t logMutex;
char logFn[TSDB_FILENAME_LEN];
char logOFn[TSDB_FILENAME_LEN];
int64_t mappingSize;
int64_t mappingThreshold;
void * commitTimer;
void ** meterList;
void * pCachePool;
void * pQueue;
pthread_t thread;
int peersOnline;
int shellConns;
int meterConns;
struct _qinfo *pQInfoList;
TAOS * dbConn;
SMeterObjHeader *meterIndex;
} SVnodeObj;
#pragma pack(pop)
typedef struct SColumn {
short colId;
short bytes;
char type;
} SColumn;
typedef struct _meter_obj {
uint64_t uid;
char meterId[TSDB_TABLE_ID_LEN];
int sid;
short vnode;
short numOfColumns;
short bytesPerPoint;
short maxBytes;
int32_t pointsPerBlock;
int32_t pointsPerFileBlock;
int freePoints;
TSKEY lastKey; // updated by insert operation
TSKEY lastKeyOnFile; // last key on file, updated by commit action
TSKEY timeStamp; // delete or added time
uint64_t commitCount;
int32_t sversion;
short sqlLen;
char searchAlgorithm : 4;
char compAlgorithm : 4;
char status; // 0: ok, 1: stop stream computing
char reserved[16];
int state;
int numOfQueries;
char * pSql;
void * pStream;
void * pCache;
SColumn *schema;
} SMeterObj;
typedef struct {
char type;
char pversion; // protocol version
char action; // insert, import, delete, update
int32_t sversion; // only for insert
int32_t sid;
int32_t len;
uint64_t lastVersion; // latest version
char cont[];
} SVMsgHeader;
struct tSQLBinaryExpr;
typedef struct SColumnInfoEx {
SColumnInfo data;
int16_t colIdx;
int16_t colIdxInBuf;
/*
* 0: denotes if its is required in the first round of scan of data block
* 1: denotes if its is required in the secondary scan
*/
int16_t req[2];
} SColumnInfoEx;
struct SColumnFilterElem;
typedef bool (*__filter_func_t)(struct SColumnFilterElem *pFilter, char *val1, char *val2);
typedef struct SColumnFilterElem {
int16_t bytes; // column length
__filter_func_t fp;
SColumnFilterInfo filterInfo;
} SColumnFilterElem;
typedef struct SSingleColumnFilterInfo {
SColumnInfoEx info;
int32_t numOfFilters;
SColumnFilterElem *pFilters;
char * pData;
} SSingleColumnFilterInfo;
typedef struct SQuery {
short numOfCols;
SOrderVal order;
char keyIsMet; // if key is met, it will be set
char over;
int fileId; // only for query in file
int hfd; // only for query in file, head file handle
int dfd; // only for query in file, data file handle
int lfd; // only for query in file, last file handle
SCompBlock *pBlock; // only for query in file
SField ** pFields;
int numOfBlocks; // only for query in file
int blockBufferSize; // length of pBlock buffer
int currentSlot;
int firstSlot;
/*
* the two parameters are utilized to handle the data missing situation, caused by import operation.
* When the commit slot is the first slot, and commitPoints != 0
*/
int32_t commitSlot; // which slot is committed,
int32_t commitPoint; // starting point for next commit
int slot;
int pos;
TSKEY key;
int compBlockLen; // only for import
int64_t blockId;
TSKEY skey;
TSKEY ekey;
int64_t intervalTime;
int64_t slidingTime; // sliding time for sliding window query
char intervalTimeUnit; // interval data type, used for daytime revise
int8_t precision;
int16_t numOfOutputCols;
int16_t interpoType;
int16_t checkBufferInLoop; // check if the buffer is full during scan each block
SLimitVal limit;
int32_t rowSize;
SSqlGroupbyExpr * pGroupbyExpr;
SSqlFunctionExpr * pSelectExpr;
SColumnInfoEx * colList;
int32_t numOfFilterCols;
SSingleColumnFilterInfo *pFilterInfo;
int64_t * defaultVal;
TSKEY lastKey;
// buffer info
int64_t pointsRead; // the number of points returned
int64_t pointsToRead; // maximum number of points to read
int64_t pointsOffset; // the number of points offset to save read data
SData **sdata;
SData * tsData; // timestamp column/primary key column
} SQuery;
typedef struct {
char spi;
char encrypt;
char secret[TSDB_KEY_LEN];
char cipheringKey[TSDB_KEY_LEN];
} SConnSec;
typedef struct {
char * buffer;
char * offset;
int trans;
int bufferSize;
pthread_mutex_t qmutex;
} STranQueue;
// internal globals
extern int tsMeterSizeOnFile;
extern void * tsQueryQhandle;
extern int tsVnodePeers;
extern int tsMaxVnode;
extern int tsMaxQueues;
extern int tsOpenVnodes;
extern SVnodeObj *vnodeList;
extern void * vnodeTmrCtrl;
// read API
extern int (*vnodeSearchKeyFunc[])(char *pValue, int num, TSKEY key, int order);
void *vnodeQueryOnSingleTable(SMeterObj **pMeterObj, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *sqlExprs,
SQueryMeterMsg *pQueryMsg, int *code);
void *vnodeQueryOnMultiMeters(SMeterObj **pMeterObj, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs,
SQueryMeterMsg *pQueryMsg, int *code);
// assistant/tool functions
SSqlGroupbyExpr *vnodeCreateGroupbyExpr(SQueryMeterMsg *pQuery, int32_t *code);
SSqlFunctionExpr *vnodeCreateSqlFunctionExpr(SQueryMeterMsg *pQuery, int32_t *code);
bool vnodeValidateExprColumnInfo(SQueryMeterMsg *pQueryMsg, SSqlFuncExprMsg *pExprMsg);
bool vnodeIsValidVnodeCfg(SVnodeCfg *pCfg);
int32_t vnodeGetResultSize(void *handle, int32_t *numOfRows);
int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows);
int64_t vnodeGetOffsetVal(void *thandle);
bool vnodeHasRemainResults(void *handle);
int vnodeRetrieveQueryResult(void *handle, int *pNum, char *argv[]);
int vnodeSaveQueryResult(void *handle, char *data, int32_t* size);
int vnodeRetrieveQueryInfo(void *handle, int *numOfRows, int *rowSize, int16_t *timePrec);
void vnodeFreeQInfo(void *, bool);
void vnodeFreeQInfoInQueue(void *param);
bool vnodeIsQInfoValid(void *param);
void vnodeDecRefCount(void *param);
void vnodeAddRefCount(void *param);
int32_t vnodeConvertQueryMeterMsg(SQueryMeterMsg *pQuery);
void vnodeQueryData(SSchedMsg *pMsg);
// meter API
int vnodeOpenMetersVnode(int vnode);
void vnodeCloseMetersVnode(int vnode);
int vnodeCreateMeterObj(SMeterObj *pNew, SConnSec *pSec);
int vnodeRemoveMeterObj(int vnode, int sid);
int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *, int sversion, int *numOfPoints, TSKEY now);
int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *, int sversion, int *numOfPoints, TSKEY now);
int vnodeInsertBufferedPoints(int vnode);
int vnodeSaveAllMeterObjToFile(int vnode);
int vnodeSaveMeterObjToFile(SMeterObj *pObj);
int vnodeSaveVnodeCfg(int vnode, SVnodeCfg *pCfg, SVPeerDesc *pDesc);
int vnodeSaveVnodeInfo(int vnode);
// cache API
void *vnodeOpenCachePool(int vnode);
void vnodeCloseCachePool(int vnode);
void *vnodeAllocateCacheInfo(SMeterObj *pObj);
void vnodeFreeCacheInfo(SMeterObj *pObj);
void vnodeSetCommitQuery(SMeterObj *pObj, SQuery *pQuery);
int vnodeInsertPointToCache(SMeterObj *pObj, char *pData);
int vnodeQueryFromCache(SMeterObj *pObj, SQuery *pQuery);
uint64_t vnodeGetPoolCount(SVnodeObj *pVnode);
void vnodeUpdateCommitInfo(SMeterObj *pObj, int slot, int pos, uint64_t count);
void vnodeCommitOver(SVnodeObj *pVnode);
TSKEY vnodeGetFirstKey(int vnode);
int vnodeSyncRetrieveCache(int vnode, int fd);
int vnodeSyncRestoreCache(int vnode, int fd);
pthread_t vnodeCreateCommitThread(SVnodeObj *pVnode);
void vnodeCancelCommit(SVnodeObj *pVnode);
void vnodeCloseStream(SVnodeObj *pVnode);
void vnodeProcessCommitTimer(void *param, void *tmrId);
void vnodeSearchPointInCache(SMeterObj *pObj, SQuery *pQuery);
int vnodeAllocateCacheBlock(SMeterObj *pObj);
int vnodeFreeCacheBlock(SCacheBlock *pCacheBlock);
int vnodeIsCacheCommitted(SMeterObj *pObj);
// file API
int vnodeInitFile(int vnode);
int vnodeQueryFromFile(SMeterObj *pObj, SQuery *pQuery);
void *vnodeCommitToFile(void *param);
void *vnodeCommitMultiToFile(SVnodeObj *pVnode, int ssid, int esid);
int vnodeSyncRetrieveFile(int vnode, int fd, uint32_t fileId, uint64_t *fmagic);
int vnodeSyncRestoreFile(int vnode, int sfd);
int vnodeWriteBlockToFile(SMeterObj *pObj, SCompBlock *pBlock, SData *data[], SData *cdata[], int pointsRead);
int vnodeSearchPointInFile(SMeterObj *pObj, SQuery *pQuery);
int vnodeReadCompBlockToMem(SMeterObj *pObj, SQuery *pQuery, SData *sdata[]);
int vnodeOpenCommitFiles(SVnodeObj *pVnode, int noTempLast);
void vnodeCloseCommitFiles(SVnodeObj *pVnode);
int vnodeReadLastBlockToMem(SMeterObj *pObj, SCompBlock *pBlock, SData *sdata[]);
// vnode API
void vnodeUpdateStreamRole(SVnodeObj *pVnode);
int vnodeInitPeer(int numOfThreads);
void vnodeCleanUpPeer();
int vnodeOpenPeerVnode(int vnode);
void vnodeClosePeerVnode(int vnode);
void *vnodeGetMeterPeerConnection(SMeterObj *pObj, int index);
int vnodeForwardToPeer(SMeterObj *pObj, char *msg, int msgLen, char action, int sversion);
void vnodeCloseAllSyncFds(int vnode);
void vnodeConfigVPeers(int vnode, int numOfPeers, SVPeerDesc peerDesc[]);
void vnodeStartSyncProcess(SVnodeObj *pVnode);
void vnodeCancelSync(int vnode);
void vnodeListPeerStatus(char *buffer);
void vnodeCheckOwnStatus(SVnodeObj *pVnode);
int vnodeSaveMeterObjToFile(SMeterObj *pObj);
int vnodeRecoverFromPeer(SVnodeObj *pVnode, int fileId);
// vnodes API
int vnodeInitVnodes();
int vnodeInitStore();
void vnodeCleanUpVnodes();
int vnodeRemoveVnode(int vnode);
int vnodeCreateVnode(int vnode, SVnodeCfg *pCfg, SVPeerDesc *pDesc);
void vnodeOpenStreams(void *param, void *tmrId);
void vnodeCreateStream(SMeterObj *pObj);
void vnodeRemoveStream(SMeterObj *pObj);
// shell API
int vnodeInitShell();
void vnodeCleanUpShell();
int vnodeOpenShellVnode(int vnode);
void vnodeCloseShellVnode(int vnode);
// memter mgmt
int vnodeInitMeterMgmt();
void vnodeCleanUpMeterMgmt();
int vnodeOpenMeterMgmtVnode(int vnode);
int vnodeOpenMeterMgmtStoreVnode(int vnode);
void vnodeCloseMeterMgmtVnode(int vnode);
int vnodeCreateMeterMgmt(SMeterObj *pObj, SConnSec *pSec);
void vnodeRemoveMeterMgmt(SMeterObj *pObj);
SConnSec *vnodeGetMeterSec(int vnode, int sid);
int vnodeCreateMeterObjFile(int vnode);
// mgmt
void vnodeCleanUpMgmt();
int vnodeRetrieveMissedCreateMsg(int vnode, int fd, uint64_t stime);
int vnodeRestoreMissedCreateMsg(int vnode, int fd);
int vnodeRetrieveMissedRemoveMsg(int vid, int fd, uint64_t stime);
int vnodeRestoreMissedRemoveMsg(int vnode, int fd);
int vnodeProcessBufferedCreateMsgs(int vnode);
void vnodeSendVpeerCfgMsg(int vnode);
int vnodeSendMeterCfgMsg(int vnode, int sid);
int vnodeMgmtConns();
void vnodeRemoveFile(int vnode, int fileId);
// commit
int vnodeInitCommit(int vnode);
void vnodeCleanUpCommit(int vnode);
int vnodeRenewCommitLog(int vnode);
void vnodeRemoveCommitLog(int vnode);
int vnodeWriteToCommitLog(SMeterObj *pObj, char action, char *cont, int contLen, int sversion);
extern int (*vnodeProcessAction[])(SMeterObj *, char *, int, char, void *, int, int *, TSKEY);
extern int (*pCompFunc[])(const char *const input, int inputSize, const int elements, char *const output,
int outputSize, char algorithm, char *const buffer, int bufferSize);
extern int (*pDecompFunc[])(const char *const input, int compressedSize, const int elements, char *const output,
int outputSize, char algorithm, char *const buffer, int bufferSize);
// global variable and APIs provided by mgmt
extern char mgmtStatus;
extern char tsMgmtDirectory[];
extern const int16_t vnodeFileVersion;
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_VNODE_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODECACHE_H
#define TDENGINE_VNODECACHE_H
#ifdef __cplusplus
extern "C" {
#endif
typedef struct {
short notFree;
short numOfPoints;
int slot;
int index;
int64_t blockId;
struct _meter_obj *pMeterObj;
char * offset[];
} SCacheBlock;
typedef struct {
int64_t blocks;
int maxBlocks;
int numOfBlocks;
int unCommittedBlocks;
int32_t currentSlot;
int32_t commitSlot; // which slot is committed
int32_t commitPoint; // starting point for next commit
SCacheBlock **cacheBlocks; // cache block list, circular list
} SCacheInfo;
typedef struct {
int vnode;
char ** pMem;
int64_t freeSlot;
pthread_mutex_t vmutex;
uint64_t count; // kind of transcation ID
int64_t notFreeSlots;
int64_t threshold;
char commitInProcess;
int cacheBlockSize;
int cacheNumOfBlocks;
} SCachePool;
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_VNODECACHE_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODEDATAFILTERFUNC_H
#define TDENGINE_VNODEDATAFILTERFUNC_H
#ifdef __cplusplus
extern "C" {
#endif
#include "vnode.h"
__filter_func_t *vnodeGetRangeFilterFuncArray(int32_t type);
__filter_func_t *vnodeGetValueFilterFuncArray(int32_t type);
bool vnodeSupportPrefilter(int32_t type);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_VNODEDATAFILTERFUNC_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODEFILE_H
#define TDENGINE_VNODEFILE_H
#ifdef __cplusplus
extern "C" {
#endif
#include "tchecksum.h"
#define TSDB_VNODE_DELIMITER 0xF00AFA0F
typedef struct { int64_t compInfoOffset; } SCompHeader;
typedef struct {
short colId;
short bytes;
int32_t numOfNullPoints;
int32_t type : 8;
int32_t offset : 24;
int32_t len; // data length
int64_t sum;
int64_t max;
int64_t min;
int16_t maxIndex;
int16_t minIndex;
char reserved[20];
} SField;
typedef struct {
int64_t last : 1;
int64_t offset : 63;
int32_t algorithm : 8; // compression algorithm can be changed
int32_t numOfPoints : 24; // how many points have been written into this block
int32_t sversion;
int32_t len; // total length of this data block
uint16_t numOfCols;
char reserved[16];
TSKEY keyFirst; // time stamp for the first point
TSKEY keyLast; // time stamp for the last point
} SCompBlock;
typedef struct {
SCompBlock *compBlock;
SField * fields;
} SCompBlockFields;
typedef struct {
uint64_t uid;
int64_t last : 1;
int64_t numOfBlocks : 62;
uint32_t delimiter; // delimiter for recovery
TSCKSUM checksum;
SCompBlock compBlocks[]; // comp block list
} SCompInfo;
typedef struct {
int64_t tempHeadOffset;
int64_t compInfoOffset;
int64_t oldCompBlockOffset;
int64_t oldNumOfBlocks;
int64_t newNumOfBlocks;
int64_t finalNumOfBlocks;
int64_t oldCompBlockLen;
int64_t newCompBlockLen;
int64_t finalCompBlockLen;
int64_t committedPoints;
int commitSlot;
int32_t last : 1;
int32_t changed : 1;
int32_t commitPos : 30;
int64_t commitCount;
SCompBlock lastBlock;
} SMeterInfo;
typedef struct { int64_t totalStorage; } SVnodeHeadInfo;
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_VNODEFILE_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODEQUERYIMPL_H
#define TDENGINE_VNODEQUERYIMPL_H
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
#include "hash.h"
#include "hashfunc.h"
#define GET_QINFO_ADDR(x) ((char*)(x)-offsetof(SQInfo, query))
#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0)
/*
* set the output buffer page size is 16k
* The page size should be sufficient for at least one output result or intermediate result.
* Some intermediate results may be extremely large, such as top/bottom(100) query.
*/
#define DEFAULT_INTERN_BUF_SIZE 16384L
#define INIT_ALLOCATE_DISK_PAGES 60L
#define DEFAULT_DATA_FILE_MAPPING_PAGES 2L
#define DEFAULT_DATA_FILE_MMAP_WINDOW_SIZE (DEFAULT_DATA_FILE_MAPPING_PAGES * DEFAULT_INTERN_BUF_SIZE)
#define IO_ENGINE_MMAP 0
#define IO_ENGINE_SYNC 1
#define DEFAULT_IO_ENGINE IO_ENGINE_SYNC
/**
* check if the primary column is load by default, otherwise, the program will
* forced to load primary column explicitly.
*/
#define PRIMARY_TSCOL_LOADED(query) ((query)->colList[0].data.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX)
typedef enum {
/*
* the program will call this function again, if this status is set.
* used to transfer from QUERY_RESBUF_FULL
*/
QUERY_NOT_COMPLETED = 0x1u,
/*
* output buffer is full, so, the next query will be employed,
* in this case, we need to set the appropriated start scan point for
* the next query.
*
* this status is only exist in group-by clause and
* diff/add/division/multiply/ query.
*/
QUERY_RESBUF_FULL = 0x2u,
/*
* query is over
* 1. this status is used in one row result query process, e.g.,
* count/sum/first/last/
* avg...etc.
* 2. when the query range on timestamp is satisfied, it is also denoted as
* query_compeleted
*/
QUERY_COMPLETED = 0x4u,
/*
* all data has been scanned, so current search is stopped,
* At last, the function will transfer this status to QUERY_COMPLETED
*/
QUERY_NO_DATA_TO_CHECK = 0x8u,
} vnodeQueryStatus;
typedef struct SPointInterpoSupporter {
int32_t numOfCols;
char** pPrevPoint;
char** pNextPoint;
} SPointInterpoSupporter;
typedef struct SBlockInfo {
TSKEY keyFirst;
TSKEY keyLast;
int32_t numOfCols;
int32_t size;
} SBlockInfo;
typedef struct SMeterDataBlockInfoEx {
SCompBlockFields pBlock;
SMeterDataInfo* pMeterDataInfo;
int32_t blockIndex;
int32_t groupIdx; /* number of group is less than the total number of meters */
} SMeterDataBlockInfoEx;
typedef enum {
DISK_DATA_LOAD_FAILED = -0x1,
DISK_DATA_LOADED = 0x0,
DISK_DATA_DISCARDED = 0x01,
} vnodeDiskLoadStatus;
#define IS_MASTER_SCAN(runtime) (((runtime)->scanFlag & 1u) == MASTER_SCAN)
#define IS_SUPPLEMENT_SCAN(runtime) ((runtime)->scanFlag == SUPPLEMENTARY_SCAN)
#define SET_SUPPLEMENT_SCAN_FLAG(runtime) ((runtime)->scanFlag = SUPPLEMENTARY_SCAN)
#define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN)
typedef int (*__block_search_fn_t)(char* data, int num, int64_t key, int order);
static FORCE_INLINE SMeterObj* getMeterObj(void* hashHandle, int32_t sid) {
return *(SMeterObj**)taosHashGet(hashHandle, (const char*)&sid, sizeof(sid));
}
bool isQueryKilled(SQuery* pQuery);
bool isFixedOutputQuery(SQuery* pQuery);
bool isPointInterpoQuery(SQuery* pQuery);
bool isSumAvgRateQuery(SQuery *pQuery);
bool isTopBottomQuery(SQuery* pQuery);
bool isFirstLastRowQuery(SQuery* pQuery);
bool isTSCompQuery(SQuery* pQuery);
bool notHasQueryTimeRange(SQuery* pQuery);
bool needSupplementaryScan(SQuery* pQuery);
bool onDemandLoadDatablock(SQuery* pQuery, int16_t queryRangeSet);
void setQueryStatus(SQuery* pQuery, int8_t status);
bool doRevisedResultsByLimit(SQInfo* pQInfo);
void truncateResultByLimit(SQInfo* pQInfo, int64_t* final, int32_t* interpo);
void initCtxOutputBuf(SQueryRuntimeEnv* pRuntimeEnv);
void resetCtxOutputBuf(SQueryRuntimeEnv* pRuntimeEnv);
void forwardCtxOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, int64_t output);
bool needPrimaryTimestampCol(SQuery* pQuery, SBlockInfo* pBlockInfo);
void vnodeScanAllData(SQueryRuntimeEnv* pRuntimeEnv);
int32_t vnodeQueryResultInterpolate(SQInfo* pQInfo, tFilePage** pDst, tFilePage** pDataSrc, int32_t numOfRows,
int32_t* numOfInterpo);
void copyResToQueryResultBuf(STableQuerySupportObj* pSupporter, SQuery* pQuery);
void doSkipResults(SQueryRuntimeEnv* pRuntimeEnv);
void doFinalizeResult(SQueryRuntimeEnv* pRuntimeEnv);
int64_t getNumOfResult(SQueryRuntimeEnv* pRuntimeEnv);
void forwardQueryStartPosition(SQueryRuntimeEnv* pRuntimeEnv);
bool normalizedFirstQueryRange(bool dataInDisk, bool dataInCache, STableQuerySupportObj* pSupporter,
SPointInterpoSupporter* pPointInterpSupporter, int64_t* key);
void pointInterpSupporterInit(SQuery* pQuery, SPointInterpoSupporter* pInterpoSupport);
void pointInterpSupporterDestroy(SPointInterpoSupporter* pPointInterpSupport);
void pointInterpSupporterSetData(SQInfo* pQInfo, SPointInterpoSupporter* pPointInterpSupport);
int64_t loadRequiredBlockIntoMem(SQueryRuntimeEnv* pRuntimeEnv, SPositionInfo* position);
void disableFunctForSuppleScan(STableQuerySupportObj* pSupporter, int32_t order);
void enableFunctForMasterScan(SQueryRuntimeEnv* pRuntimeEnv, int32_t order);
int32_t mergeMetersResultToOneGroups(STableQuerySupportObj* pSupporter);
void copyFromWindowResToSData(SQInfo* pQInfo, SWindowResult* result);
SBlockInfo getBlockInfo(SQueryRuntimeEnv *pRuntimeEnv);
SBlockInfo getBlockBasicInfo(SQueryRuntimeEnv *pRuntimeEnv, void* pBlock, int32_t type);
SCacheBlock* getCacheDataBlock(SMeterObj* pMeterObj, SQueryRuntimeEnv* pRuntimeEnv, int32_t slot);
void stableApplyFunctionsOnBlock(STableQuerySupportObj* pSupporter, SMeterDataInfo* pMeterDataInfo,
SBlockInfo* pBlockInfo, SField* pFields, __block_search_fn_t searchFn);
int32_t vnodeFilterQualifiedMeters(SQInfo* pQInfo, int32_t vid, tSidSet* pSidSet, SMeterDataInfo* pMeterDataInfo,
int32_t* numOfMeters, SMeterDataInfo*** pReqMeterDataInfo);
int32_t vnodeGetVnodeHeaderFileIndex(int32_t* fid, SQueryRuntimeEnv* pRuntimeEnv, int32_t order);
int32_t createDataBlocksInfoEx(SMeterDataInfo** pMeterDataInfo, int32_t numOfMeters,
SMeterDataBlockInfoEx** pDataBlockInfoEx, int32_t numOfCompBlocks,
int32_t* nAllocBlocksInfoSize, int64_t addr);
void freeMeterBlockInfoEx(SMeterDataBlockInfoEx* pDataBlockInfoEx, int32_t len);
void setExecutionContext(STableQuerySupportObj* pSupporter, SMeterQueryInfo* pMeterQueryInfo, int32_t meterIdx,
int32_t groupIdx, TSKEY nextKey);
int32_t setAdditionalInfo(STableQuerySupportObj *pSupporter, int32_t meterIdx, SMeterQueryInfo *pMeterQueryInfo);
void doGetAlignedIntervalQueryRangeImpl(SQuery* pQuery, int64_t pKey, int64_t keyFirst, int64_t keyLast,
int64_t* actualSkey, int64_t* actualEkey, int64_t* skey, int64_t* ekey);
int64_t getQueryStartPositionInCache(SQueryRuntimeEnv* pRuntimeEnv, int32_t* slot, int32_t* pos, bool ignoreQueryRange);
int32_t getDataBlocksForMeters(STableQuerySupportObj* pSupporter, SQuery* pQuery, int32_t numOfMeters,
const char* filePath, SMeterDataInfo** pMeterDataInfo, uint32_t* numOfBlocks);
int32_t LoadDatablockOnDemand(SCompBlock* pBlock, SField** pFields, uint8_t* blkStatus, SQueryRuntimeEnv* pRuntimeEnv,
int32_t fileIdx, int32_t slotIdx, __block_search_fn_t searchFn, bool onDemand);
int32_t vnodeGetHeaderFile(SQueryRuntimeEnv* pRuntimeEnv, int32_t fileIndex);
/**
* Create SMeterQueryInfo.
* The MeterQueryInfo is created one for each table during super table query
*
* @param skey
* @param ekey
* @return
*/
SMeterQueryInfo* createMeterQueryInfo(STableQuerySupportObj* pSupporter, int32_t sid, TSKEY skey, TSKEY ekey);
/**
* Destroy meter query info
* @param pMeterQInfo
* @param numOfCols
*/
void destroyMeterQueryInfo(SMeterQueryInfo* pMeterQueryInfo, int32_t numOfCols);
/**
* change the meter query info for supplement scan
* @param pMeterQueryInfo
* @param skey
* @param ekey
*/
void changeMeterQueryInfoForSuppleQuery(SQuery* pQuery, SMeterQueryInfo* pMeterQueryInfo,
TSKEY skey, TSKEY ekey);
/**
* add the new allocated disk page to meter query info
* the new allocated disk page is used to keep the intermediate (interval) results
* @param pQuery
* @param pMeterQueryInfo
* @param pSupporter
*/
tFilePage* addDataPageForMeterQueryInfo(SQuery* pQuery, SMeterQueryInfo* pMeterQueryInfo,
STableQuerySupportObj* pSupporter);
/**
* restore the query range data from SMeterQueryInfo to runtime environment
*
* @param pRuntimeEnv
* @param pMeterQueryInfo
*/
void restoreIntervalQueryRange(SQueryRuntimeEnv* pRuntimeEnv, SMeterQueryInfo* pMeterQueryInfo);
/**
* set the interval query range for the interval query, when handling a data(cache) block
*
* @param pMeterQueryInfo
* @param pSupporter
* @param key
*/
void setIntervalQueryRange(SMeterQueryInfo* pMeterQueryInfo, STableQuerySupportObj* pSupporter, int64_t key);
/**
* set the meter data information
* @param pMeterDataInfo
* @param pMeterObj current query meter object
* @param meterIdx meter index in the sid list
* @param groupId group index, which the meter is belonged to
*/
void setMeterDataInfo(SMeterDataInfo* pMeterDataInfo, SMeterObj* pMeterObj, int32_t meterIdx, int32_t groupId);
void vnodeSetTagValueInParam(tSidSet* pSidSet, SQueryRuntimeEnv* pRuntimeEnv, SMeterSidExtInfo* pMeterInfo);
void vnodeCheckIfDataExists(SQueryRuntimeEnv* pRuntimeEnv, SMeterObj* pMeterObj, bool* dataInDisk, bool* dataInCache);
void displayInterResult(SData** pdata, SQuery* pQuery, int32_t numOfRows);
void vnodePrintQueryStatistics(STableQuerySupportObj* pSupporter);
void clearTimeWindowResBuf(SQueryRuntimeEnv* pRuntimeEnv, SWindowResult* pOneOutputRes);
void copyTimeWindowResBuf(SQueryRuntimeEnv* pRuntimeEnv, SWindowResult* dst, const SWindowResult* src);
int32_t initWindowResInfo(SWindowResInfo* pWindowResInfo, SQueryRuntimeEnv* pRuntimeEnv, int32_t size,
int32_t threshold, int16_t type);
void cleanupTimeWindowInfo(SWindowResInfo* pWindowResInfo, SQueryRuntimeEnv* pRuntimeEnv);
void resetTimeWindowInfo(SQueryRuntimeEnv* pRuntimeEnv, SWindowResInfo* pWindowResInfo);
void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num);
void clearClosedTimeWindow(SQueryRuntimeEnv* pRuntimeEnv);
int32_t numOfClosedTimeWindow(SWindowResInfo* pWindowResInfo);
void closeTimeWindow(SWindowResInfo* pWindowResInfo, int32_t slot);
void closeAllTimeWindow(SWindowResInfo* pWindowResInfo);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_VNODEQUERYIMPL_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODEREAD_H
#define TDENGINE_VNODEREAD_H
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
#include "qresultBuf.h"
#include "qinterpolation.h"
#include "vnodeTagMgmt.h"
/*
* use to keep the first point position, consisting of position in blk and block
* id, file id
*/
typedef struct {
int32_t pos;
int32_t slot;
int32_t fileId;
} SPositionInfo;
typedef struct SLoadDataBlockInfo {
int32_t fileListIndex; /* index of this file in files list of this vnode */
int32_t fileId;
int32_t slotIdx;
int32_t sid;
bool tsLoaded; // if timestamp column of current block is loaded or not
} SLoadDataBlockInfo;
typedef struct SLoadCompBlockInfo {
int32_t sid; /* meter sid */
int32_t fileId;
int32_t fileListIndex;
} SLoadCompBlockInfo;
/*
* the header file info for one vnode
*/
typedef struct SHeaderFileInfo {
int32_t fileID; // file id
} SHeaderFileInfo;
typedef struct SQueryCostSummary {
double cacheTimeUs;
double fileTimeUs;
int64_t numOfFiles; // opened files during query
int64_t numOfTables; // num of queries tables
int64_t numOfSeek; // number of seek operation
int64_t readDiskBlocks; // accessed disk block
int64_t skippedFileBlocks; // skipped blocks
int64_t blocksInCache; // accessed cache blocks
int64_t readField; // field size
int64_t totalFieldSize; // total read fields size
double loadFieldUs; // total elapsed time to read fields info
int64_t totalBlockSize; // read data blocks
double loadBlocksUs; // total elapsed time to read data blocks
int64_t totalGenData; // in-memory generated data
int64_t readCompInfo; // read compblock info
int64_t totalCompInfoSize; // total comp block size
double loadCompInfoUs; // total elapsed time to read comp block info
int64_t tmpBufferInDisk; // size of buffer for intermediate result
} SQueryCostSummary;
typedef struct SPosInfo {
int16_t pageId;
int16_t rowId;
} SPosInfo;
typedef struct STimeWindow {
TSKEY skey;
TSKEY ekey;
} STimeWindow;
typedef struct SWindowStatus {
bool closed;
} SWindowStatus;
typedef struct SWindowResult {
uint16_t numOfRows;
SPosInfo pos; // Position of current result in disk-based output buffer
SResultInfo* resultInfo; // For each result column, there is a resultInfo
STimeWindow window; // The time window that current result covers.
SWindowStatus status;
} SWindowResult;
/*
* header files info, avoid to iterate the directory, the data is acquired
* during in query preparation function
*/
typedef struct SQueryFilesInfo {
SHeaderFileInfo* pFileInfo;
uint32_t numOfFiles; // the total available number of files for this virtual node during query execution
int32_t current; // the memory mapped header file, NOTE: only one header file can be mmap.
int32_t vnodeId;
int32_t headerFd; // header file fd
int64_t headerFileSize;
int32_t dataFd;
int32_t lastFd;
char headerFilePath[PATH_MAX]; // current opened header file name
char dataFilePath[PATH_MAX]; // current opened data file name
char lastFilePath[PATH_MAX]; // current opened last file path
char dbFilePathPrefix[PATH_MAX];
} SQueryFilesInfo;
typedef struct SWindowResInfo {
SWindowResult* pResult; // reference to SQuerySupporter->pResult
void* hashList; // hash list for quick access
int16_t type; // data type for hash key
int32_t capacity; // max capacity
int32_t curIndex; // current start active index
int32_t size;
int64_t startTime; // start time of the first time window for sliding query
int64_t prevSKey; // previous (not completed) sliding window start key
int64_t threshold; // threshold for return completed results.
} SWindowResInfo;
typedef struct SQueryRuntimeEnv {
SPositionInfo startPos; /* the start position, used for secondary/third iteration */
SPositionInfo endPos; /* the last access position in query, served as the start pos of reversed order query */
SPositionInfo nextPos; /* start position of the next scan */
SData* colDataBuffer[TSDB_MAX_COLUMNS];
SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo
uint8_t blockStatus; // Indicate if data block is loaded, the block is first/last/internal block
int32_t unzipBufSize;
SData* primaryColBuffer;
char* unzipBuffer;
char* secondaryUnzipBuffer;
SQuery* pQuery;
SMeterObj* pMeterObj;
SQLFunctionCtx* pCtx;
SLoadDataBlockInfo loadBlockInfo; /* record current block load information */
SLoadCompBlockInfo loadCompBlockInfo; /* record current compblock information in SQuery */
SQueryFilesInfo vnodeFileInfo;
int16_t numOfRowsPerPage;
int16_t offset[TSDB_MAX_COLUMNS];
uint16_t scanFlag; // denotes reversed scan of data or not
SInterpolationInfo interpoInfo;
SData** pInterpoBuf;
SWindowResInfo windowResInfo;
STSBuf* pTSBuf;
STSCursor cur;
SQueryCostSummary summary;
bool stableQuery; // is super table query or not
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
/*
* Temporarily hold the in-memory cache block info during scan cache blocks
* Here we do not use the cache block info from pMeterObj, simple because it may change anytime
* during the query by the submit/insert handling threads.
* So we keep a copy of the support structure as well as the cache block data itself.
*/
SCacheBlock cacheBlock;
} SQueryRuntimeEnv;
/* intermediate pos during multimeter query involves interval */
typedef struct SMeterQueryInfo {
int64_t lastKey;
int64_t skey;
int64_t ekey;
int32_t numOfRes;
int16_t queryRangeSet; // denote if the query range is set, only available for interval query
int64_t tag;
STSCursor cur;
int32_t sid; // for retrieve the page id list
SWindowResInfo windowResInfo;
} SMeterQueryInfo;
typedef struct SMeterDataInfo {
uint64_t offsetInHeaderFile;
int32_t numOfBlocks;
int32_t start; // start block index
SCompBlock* pBlock;
int32_t meterOrderIdx;
SMeterObj* pMeterObj;
int32_t groupIdx; // group id in meter list
SMeterQueryInfo* pMeterQInfo;
} SMeterDataInfo;
typedef struct STableQuerySupportObj {
void* pMetersHashTable; // meter table hash list
SMeterSidExtInfo** pMeterSidExtInfo;
int32_t numOfMeters;
/*
* multimeter query resultset.
* In multimeter queries, the result is temporarily stored on this structure, instead of
* directly put result into output buffer, since we have no idea how many number of
* rows may be generated by a specific subgroup. When query on all subgroups is executed,
* the result is copy to output buffer. This attribution is not used during single meter query processing.
*/
SQueryRuntimeEnv runtimeEnv;
int64_t rawSKey;
int64_t rawEKey;
int32_t subgroupIdx;
int32_t offset; /* offset in group result set of subgroup */
tSidSet* pSidSet;
/*
* the query is executed position on which meter of the whole list.
* when the index reaches the last one of the list, it means the query is completed.
* We later may refactor to remove this attribution by using another flag to denote
* whether a multimeter query is completed or not.
*/
int32_t meterIdx;
int32_t numOfGroupResultPages;
int32_t groupResultSize;
SMeterDataInfo* pMeterDataInfo;
TSKEY* tsList;
} STableQuerySupportObj;
typedef struct _qinfo {
uint64_t signature;
int32_t refCount; // QInfo reference count, when the value is 0, it can be released safely
char user[TSDB_TABLE_ID_LEN + 1];
char sql[TSDB_SHOW_SQL_LEN];
uint8_t stream;
uint16_t port;
uint32_t ip;
uint64_t startTime;
int64_t useconds;
int killed;
struct _qinfo *prev, *next;
SQuery query;
int totalPoints;
int pointsRead;
int pointsReturned;
int pointsInterpo;
int code;
char bufIndex;
char changed;
char over;
SMeterObj* pObj;
sem_t dataReady;
STableQuerySupportObj* pTableQuerySupporter;
int (*fp)(SMeterObj*, SQuery*);
} SQInfo;
int32_t vnodeQueryTablePrepare(SQInfo* pQInfo, SMeterObj* pMeterObj, STableQuerySupportObj* pSMultiMeterObj,
void* param);
void vnodeQueryFreeQInfoEx(SQInfo* pQInfo);
bool vnodeParametersSafetyCheck(SQuery* pQuery);
int32_t vnodeSTableQueryPrepare(SQInfo* pQInfo, SQuery* pQuery, void* param);
/**
* decrease the numofQuery of each table that is queried, enable the
* remove/close operation can be executed
* @param pQInfo
*/
void vnodeDecMeterRefcnt(SQInfo* pQInfo);
/* sql query handle in dnode */
void vnodeSingleTableQuery(SSchedMsg* pMsg);
/*
* handle multi-meter query process
*/
void vnodeMultiMeterQuery(SSchedMsg* pMsg);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_VNODEREAD_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODESHELL_H
#define TDENGINE_VNODESHELL_H
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_VNODESHELL_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODESTORE_H
#define TDENGINE_VNODESTORE_H
#ifdef __cplusplus
extern "C" {
#endif
void vnodeProcessDataFromVnode(SIntMsg *msg, void *tcpHandle);
void vnodeCalcOpenVnodes();
bool vnodeRemoveDataFileFromLinkFile(char* linkFile, char* de_name);
int vnodeInitInfo();
#ifdef __cplusplus
}
#endif
#endif // TDEGINE_VNODESTORE_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TBASE_MNODE_SUPER_TABLE_QUERY_H
#define TBASE_MNODE_SUPER_TABLE_QUERY_H
#include "os.h"
#include "mnode.h"
#include "qast.h"
int32_t mgmtDoJoin(SSuperTableMetaMsg* pSuperTableMetaMsg, tQueryResultset* pRes);
void mgmtReorganizeMetersInMetricMeta(SSuperTableMetaMsg* pInfo, int32_t index, tQueryResultset* pRes);
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODESYSTEM_H
#define TDENGINE_VNODESYSTEM_H
#ifdef __cplusplus
extern "C" {
#endif
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_VNODESYSTEM_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODETAGMGMT_H
#define TDENGINE_VNODETAGMGMT_H
#ifdef __cplusplus
extern "C" {
#endif
/*
* @version 0.1
* @date 2018/01/02
* @author liaohj
* management of the tag value of tables
* in query, client need the vnode to aggregate results according to tags
* values,
* the grouping operation is done here.
* Note:
* 1. we implement a quick sort algorithm, may remove it later.
*/
typedef int32_t (*__ext_compar_fn_t)(const void *p1, const void *p2, void *param);
tSidSet *tSidSetCreate(struct SMeterSidExtInfo **pMeterSidExtInfo, int32_t numOfMeters, SSchema *pSchema,
int32_t numOfTags, SColIndexEx *colList, int32_t numOfOrderCols);
int32_t *calculateSubGroup(void **pSids, int32_t numOfMeters, int32_t *numOfSubset, tOrderDescriptor *pOrderDesc,
__ext_compar_fn_t compareFn);
void tSidSetDestroy(tSidSet **pSets);
void tSidSetSort(tSidSet *pSets);
int32_t meterSidComparator(const void *s1, const void *s2, void *param);
int32_t doCompare(char *f1, char *f2, int32_t type, int32_t size);
void tQSortEx(void **pMeterSids, size_t size, int32_t start, int32_t end, void *param, __ext_compar_fn_t compareFn);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_VNODETAGMGMT_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODE_UTIL_H
#define TDENGINE_VNODE_UTIL_H
#ifdef __cplusplus
extern "C" {
#endif
/* get the qinfo struct address from the query struct address */
#define GET_COLUMN_BYTES(query, colidx) \
((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdxInBuf].data.bytes)
#define GET_COLUMN_TYPE(query, colidx) \
((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdxInBuf].data.type)
#define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP)
#define EXTRA_BYTES 2 // for possible compression deflation
#define GET_COL_DATA_POS(query, index, step) ((query)->pos + (index)*(step))
int vnodeGetEid(int days);
int vnodeCheckFileIntegrity(FILE *fp);
void vnodeCreateFileHeader(FILE *fp);
void vnodeCreateFileHeaderFd(int fd);
void vnodeGetHeadFileHeaderInfo(int fd, SVnodeHeadInfo *pHeadInfo);
void vnodeUpdateHeadFileHeader(int fd, SVnodeHeadInfo *pHeadInfo);
/**
* check if two schema is identical or not
* This function does not check if a schema is valid or not
*
* @param pSSchemaFirst
* @param numOfCols1
* @param pSSchemaSecond
* @param numOfCols2
* @return
*/
bool vnodeMeterSchemaIdentical(SColumn *pSchema1, int32_t numOfCols1, SColumn *pSchema2, int32_t numOfCols2);
/**
* free SFields in SQuery
* vnodeFreeFields must be called before free(pQuery->pBlock);
* @param pQuery
*/
void vnodeFreeFields(SQuery *pQuery);
void vnodeUpdateFilterColumnIndex(SQuery* pQuery);
void vnodeUpdateQueryColumnIndex(SQuery* pQuery, SMeterObj* pMeterObj);
int32_t vnodeCreateFilterInfo(void* pQInfo, SQuery *pQuery);
bool vnodeFilterData(SQuery* pQuery, int32_t* numOfActualRead, int32_t index);
bool vnodeDoFilterData(SQuery* pQuery, int32_t elemPos);
bool vnodeIsProjectionQuery(SSqlFunctionExpr *pExpr, int32_t numOfOutput);
int32_t vnodeIncQueryRefCount(SQueryMeterMsg *pQueryMsg, SMeterSidExtInfo **pSids, SMeterObj **pMeterObjList,
int32_t *numOfInc);
void vnodeDecQueryRefCount(SQueryMeterMsg *pQueryMsg, SMeterObj **pMeterObjList, int32_t numOfInc);
int32_t vnodeSetMeterState(SMeterObj* pMeterObj, int32_t state);
void vnodeClearMeterState(SMeterObj* pMeterObj, int32_t state);
bool vnodeIsMeterState(SMeterObj* pMeterObj, int32_t state);
void vnodeSetMeterDeleting(SMeterObj* pMeterObj);
int32_t vnodeSetMeterInsertImportStateEx(SMeterObj* pObj, int32_t st);
bool vnodeIsSafeToDeleteMeter(SVnodeObj* pVnode, int32_t sid);
void vnodeFreeColumnInfo(SColumnInfo* pColumnInfo);
bool isGroupbyNormalCol(SSqlGroupbyExpr* pExpr);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_VNODE_UTIL_H
此差异已折叠。
此差异已折叠。
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "vnode.h"
#include "vnodeFile.h"
char* vnodeGetDiskFromHeadFile(char *headName) { return tsDirectory; }
char* vnodeGetDataDir(int vnode, int fileId) { return dataDir; }
void vnodeAdustVnodeFile(SVnodeObj *pVnode) {
// Retention policy here
int fileId = pVnode->fileId - pVnode->numOfFiles + 1;
int cfile = taosGetTimestamp(pVnode->cfg.precision)/pVnode->cfg.daysPerFile/tsMsPerDay[(uint8_t)pVnode->cfg.precision];
while (fileId <= cfile - pVnode->maxFiles) {
vnodeRemoveFile(pVnode->vnode, fileId);
pVnode->numOfFiles--;
fileId++;
}
}
int vnodeCheckNewHeaderFile(int fd, SVnodeObj *pVnode) {
SCompHeader *pHeader = NULL;
SCompBlock *pBlocks = NULL;
int blockSize = 0;
SCompInfo compInfo;
int tmsize = 0;
tmsize = sizeof(SCompHeader) * pVnode->cfg.maxSessions + sizeof(TSCKSUM);
pHeader = (SCompHeader *)malloc(tmsize);
if (pHeader == NULL) return 0;
lseek(fd, TSDB_FILE_HEADER_LEN, SEEK_SET);
if (read(fd, (void *)pHeader, tmsize) != tmsize) {
goto _broken_exit;
}
if (!taosCheckChecksumWhole((uint8_t *)pHeader, tmsize)) {
goto _broken_exit;
}
for (int sid = 0; sid < pVnode->cfg.maxSessions; sid++) {
if (pVnode->meterList == NULL) goto _correct_exit;
if (pVnode->meterList[sid] == NULL || pHeader[sid].compInfoOffset == 0) continue;
lseek(fd, pHeader[sid].compInfoOffset, SEEK_SET);
if (read(fd, (void *)(&compInfo), sizeof(SCompInfo)) != sizeof(SCompInfo)) {
goto _broken_exit;
}
if (!taosCheckChecksumWhole((uint8_t *)(&compInfo), sizeof(SCompInfo))) {
goto _broken_exit;
}
if (compInfo.uid != ((SMeterObj *)pVnode->meterList[sid])->uid) continue;
int expectedSize = sizeof(SCompBlock) * compInfo.numOfBlocks + sizeof(TSCKSUM);
if (blockSize < expectedSize) {
pBlocks = (SCompBlock *)realloc(pBlocks, expectedSize);
if (pBlocks == NULL) {
tfree(pHeader);
return 0;
}
blockSize = expectedSize;
}
if (read(fd, (void *)pBlocks, expectedSize) != expectedSize) {
dError("failed to read block part");
goto _broken_exit;
}
if (!taosCheckChecksumWhole((uint8_t *)pBlocks, expectedSize)) {
dError("block part is broken");
goto _broken_exit;
}
for (int i = 0; i < compInfo.numOfBlocks; i++) {
if (pBlocks[i].last && i != compInfo.numOfBlocks-1) {
dError("last block in middle, block:%d", i);
goto _broken_exit;
}
}
}
_correct_exit:
dPrint("vid: %d new header file %s is correct", pVnode->vnode, pVnode->nfn);
tfree(pBlocks);
tfree(pHeader);
return 0;
_broken_exit:
dError("vid: %d new header file %s is broken", pVnode->vnode, pVnode->nfn);
tfree(pBlocks);
tfree(pHeader);
return -1;
}
\ No newline at end of file
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include <stdbool.h>
int vnodeInitInfo() { return 0; }
bool vnodeRemoveDataFileFromLinkFile(char* linkFile, char* de_name) { return true; }
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册