提交 6d8a2be8 编写于 作者: T tangfangzhi

fix: rebase conflicts

...@@ -47,31 +47,74 @@ def pre_test(){ ...@@ -47,31 +47,74 @@ def pre_test(){
script { script {
if (env.CHANGE_TARGET == 'master') { if (env.CHANGE_TARGET == 'master') {
sh ''' sh '''
cd ${WK}
git checkout master
cd ${WKC} cd ${WKC}
git checkout master git checkout master
''' '''
} else if(env.CHANGE_TARGET == '2.0') { } else if(env.CHANGE_TARGET == '2.0') {
sh ''' sh '''
cd ${WK}
git checkout 2.0
cd ${WKC} cd ${WKC}
git checkout 2.0 git checkout 2.0
''' '''
} else if(env.CHANGE_TARGET == '3.0') { } else if(env.CHANGE_TARGET == '3.0') {
sh ''' sh '''
cd ${WK}
git checkout 3.0
cd ${WKC} cd ${WKC}
git checkout 3.0 git checkout 3.0
''' '''
} else { } else {
sh ''' sh '''
cd ${WK}
git checkout develop
cd ${WKC} cd ${WKC}
git checkout develop git checkout develop
''' '''
} }
} }
if (env.CHANGE_URL =~ /\/TDengine\//) {
sh ''' sh '''
cd ${WKC} cd ${WKC}
git pull >/dev/null git pull >/dev/null
git fetch origin +refs/pull/${CHANGE_ID}/merge git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD git checkout -qf FETCH_HEAD
git log|head -n20
cd ${WK}
git pull >/dev/null
git log|head -n20
'''
} else if (env.CHANGE_URL =~ /\/TDinternal\//) {
sh '''
cd ${WK}
git pull >/dev/null
git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD
git log|head -n20
cd ${WKC}
git pull >/dev/null
git log|head -n20
'''
} else {
sh '''
echo "unmatched reposiotry ${CHANGE_URL}"
'''
}
sh '''
cd ${WKC}
git submodule update --init --recursive
'''
sh '''
cd ${WK}
export TZ=Asia/Harbin
date
rm -rf debug
mkdir debug
cd debug
cmake .. > /dev/null
make -j4> /dev/null
''' '''
sh ''' sh '''
cd ${WKPY} cd ${WKPY}
...@@ -137,8 +180,8 @@ def pre_test_build_win() { ...@@ -137,8 +180,8 @@ def pre_test_build_win() {
cd debug cd debug
call "C:\\Program Files (x86)\\Microsoft Visual Studio\\2017\\Community\\VC\\Auxiliary\\Build\\vcvarsall.bat" x64 call "C:\\Program Files (x86)\\Microsoft Visual Studio\\2017\\Community\\VC\\Auxiliary\\Build\\vcvarsall.bat" x64
set CL=/MP8 set CL=/MP8
cmake ../ -G "NMake Makefiles" cmake .. -G "NMake Makefiles JOM"
nmake || exit 8 jom -j 4 || exit 8
time /t time /t
''' '''
return 1 return 1
...@@ -148,20 +191,14 @@ pipeline { ...@@ -148,20 +191,14 @@ pipeline {
agent none agent none
options { skipDefaultCheckout() } options { skipDefaultCheckout() }
environment{ environment{
WKDIR = '/var/lib/jenkins/workspace/v3.0' WKDIR = '/var/lib/jenkins/workspace'
WKC= '/var/lib/jenkins/workspace/v3.0/TDengine' WK = '/var/lib/jenkins/workspace/TDinternal'
WKPY= '/var/lib/jenkins/workspace/v3.0/taos-connector-python' WKC = '/var/lib/jenkins/workspace/TDinternal/community'
WKPY = '/var/lib/jenkins/workspace/taos-connector-python'
} }
stages { stages {
stage('run test') { stage('run test') {
parallel { parallel {
stage('windows test') {
agent {label " windows11 "}
steps {
pre_test_win()
pre_test_build_win()
}
}
stage('linux test') { stage('linux test') {
agent{label " slave3_0 || slave15 || slave16 || slave17 "} agent{label " slave3_0 || slave15 || slave16 || slave17 "}
options { skipDefaultCheckout() } options { skipDefaultCheckout() }
...@@ -171,6 +208,8 @@ pipeline { ...@@ -171,6 +208,8 @@ pipeline {
steps { steps {
timeout(time: 20, unit: 'MINUTES'){ timeout(time: 20, unit: 'MINUTES'){
pre_test() pre_test()
script {
if (env.CHANGE_URL =~ /\/TDengine\//) {
sh ''' sh '''
cd ${WKC}/tests/parallel_test cd ${WKC}/tests/parallel_test
date date
...@@ -180,6 +219,22 @@ pipeline { ...@@ -180,6 +219,22 @@ pipeline {
date date
time ./run.sh -m /home/m.json -t /tmp/cases.task -b ${CHANGE_TARGET} -l ${WKDIR}/log time ./run.sh -m /home/m.json -t /tmp/cases.task -b ${CHANGE_TARGET} -l ${WKDIR}/log
''' '''
} else if (env.CHANGE_URL =~ /\/TDinternal\//) {
sh '''
cd ${WKC}/tests/parallel_test
date
time ./container_build.sh -w ${WKDIR} -t 8 -e
rm -f /tmp/cases.task
./collect_cases.sh -e
date
time ./run.sh -e -m /home/m.json -t /tmp/cases.task -b ${CHANGE_TARGET} -l ${WKDIR}/log
'''
} else {
sh '''
echo "unmatched reposiotry ${CHANGE_URL}"
'''
}
}
} }
} }
} }
......
# addr2line
ExternalProject_Add(addr2line
GIT_REPOSITORY https://github.com/davea42/libdwarf-addr2line.git
GIT_TAG master
SOURCE_DIR "${TD_CONTRIB_DIR}/addr2line"
BINARY_DIR "${TD_CONTRIB_DIR}/addr2line"
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
)
...@@ -2,6 +2,8 @@ cmake_minimum_required(VERSION 3.16) ...@@ -2,6 +2,8 @@ cmake_minimum_required(VERSION 3.16)
set(CMAKE_VERBOSE_MAKEFILE OFF) set(CMAKE_VERBOSE_MAKEFILE OFF)
SET(BUILD_SHARED_LIBS "OFF")
#set output directory #set output directory
SET(LIBRARY_OUTPUT_PATH ${PROJECT_BINARY_DIR}/build/lib) SET(LIBRARY_OUTPUT_PATH ${PROJECT_BINARY_DIR}/build/lib)
SET(EXECUTABLE_OUTPUT_PATH ${PROJECT_BINARY_DIR}/build/bin) SET(EXECUTABLE_OUTPUT_PATH ${PROJECT_BINARY_DIR}/build/bin)
...@@ -12,6 +14,25 @@ MESSAGE(STATUS "Project binary files output path: " ${PROJECT_BINARY_DIR}) ...@@ -12,6 +14,25 @@ MESSAGE(STATUS "Project binary files output path: " ${PROJECT_BINARY_DIR})
MESSAGE(STATUS "Project executable files output path: " ${EXECUTABLE_OUTPUT_PATH}) MESSAGE(STATUS "Project executable files output path: " ${EXECUTABLE_OUTPUT_PATH})
MESSAGE(STATUS "Project library files output path: " ${LIBRARY_OUTPUT_PATH}) MESSAGE(STATUS "Project library files output path: " ${LIBRARY_OUTPUT_PATH})
find_package(Git QUIET)
if(GIT_FOUND AND EXISTS "${TD_SOURCE_DIR}/.git")
# Update submodules as needed
option(GIT_SUBMODULE "Check submodules during build" ON)
if(GIT_SUBMODULE)
message(STATUS "Submodule update")
execute_process(COMMAND cd ${TD_SOURCE_DIR} && ${GIT_EXECUTABLE} submodule update --init --recursive
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
RESULT_VARIABLE GIT_SUBMOD_RESULT)
if(NOT GIT_SUBMOD_RESULT EQUAL "0")
message(WARNING "git submodule update --init --recursive failed with ${GIT_SUBMOD_RESULT}, please checkout submodules")
endif()
endif()
endif()
if(NOT EXISTS "${TD_SOURCE_DIR}/tools/taos-tools/CMakeLists.txt")
message(WARNING "The submodules were not downloaded! GIT_SUBMODULE was turned off or failed. Please update submodules manually if you need build them.")
endif()
if (NOT DEFINED TD_GRANT) if (NOT DEFINED TD_GRANT)
SET(TD_GRANT FALSE) SET(TD_GRANT FALSE)
endif() endif()
...@@ -44,9 +65,8 @@ ENDIF () ...@@ -44,9 +65,8 @@ ENDIF ()
IF (TD_WINDOWS) IF (TD_WINDOWS)
MESSAGE("${Yellow} set compiler flag for Windows! ${ColourReset}") MESSAGE("${Yellow} set compiler flag for Windows! ${ColourReset}")
SET(CMAKE_GENERATOR "NMake Makefiles" CACHE INTERNAL "" FORCE)
SET(COMMON_FLAGS "/W3 /D_WIN32") SET(COMMON_FLAGS "/W3 /D_WIN32")
SET(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} /MANIFEST:NO")
# IF (MSVC AND (MSVC_VERSION GREATER_EQUAL 1900)) # IF (MSVC AND (MSVC_VERSION GREATER_EQUAL 1900))
# SET(COMMON_FLAGS "${COMMON_FLAGS} /Wv:18") # SET(COMMON_FLAGS "${COMMON_FLAGS} /Wv:18")
# ENDIF () # ENDIF ()
...@@ -55,20 +75,34 @@ IF (TD_WINDOWS) ...@@ -55,20 +75,34 @@ IF (TD_WINDOWS)
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${COMMON_FLAGS}") SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${COMMON_FLAGS}")
ELSE () ELSE ()
IF (${COVER} MATCHES "true")
MESSAGE(STATUS "Test coverage mode, add extra flags")
SET(GCC_COVERAGE_COMPILE_FLAGS "-fprofile-arcs -ftest-coverage")
SET(GCC_COVERAGE_LINK_FLAGS "-lgcov --coverage")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${GCC_COVERAGE_COMPILE_FLAGS} ${GCC_COVERAGE_LINK_FLAGS}")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${GCC_COVERAGE_COMPILE_FLAGS} ${GCC_COVERAGE_LINK_FLAGS}")
ENDIF ()
IF (${SANITIZER} MATCHES "true") IF (${SANITIZER} MATCHES "true")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fno-sanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -g3") SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fno-sanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -g3")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fno-sanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -g3") SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fno-sanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -g3")
MESSAGE(STATUS "Will compile with Address Sanitizer!") MESSAGE(STATUS "Will compile with Address Sanitizer!")
ELSE () ELSE ()
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3") SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3") SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -g3")
ENDIF () ENDIF ()
MESSAGE("System processor ID: ${CMAKE_SYSTEM_PROCESSOR}") MESSAGE("System processor ID: ${CMAKE_SYSTEM_PROCESSOR}")
IF (${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm64" OR ${CMAKE_SYSTEM_PROCESSOR} MATCHES "aarch64") IF (${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm64" OR ${CMAKE_SYSTEM_PROCESSOR} MATCHES "aarch64")
ADD_DEFINITIONS("-D_TD_ARM_") ADD_DEFINITIONS("-D_TD_ARM_")
ELSE () ELSE ()
ADD_DEFINITIONS("-msse4.2 -mfma") ADD_DEFINITIONS("-msse4.2")
IF("${FMA_SUPPORT}" MATCHES "true")
MESSAGE(STATUS "turn fma function support on")
ADD_DEFINITIONS("-mfma")
ELSE ()
MESSAGE(STATUS "turn fma function support off")
ENDIF()
ENDIF () ENDIF ()
ENDIF () ENDIF ()
...@@ -48,6 +48,12 @@ IF(${TD_WINDOWS}) ...@@ -48,6 +48,12 @@ IF(${TD_WINDOWS})
ENDIF () ENDIF ()
option(
BUILD_ADDR2LINE
"If build addr2line"
OFF
)
option( option(
BUILD_TEST BUILD_TEST
"If build unit tests using googletest" "If build unit tests using googletest"
......
# libdwarf
ExternalProject_Add(libdwarf
GIT_REPOSITORY https://github.com/davea42/libdwarf-code.git
GIT_TAG libdwarf-0.3.1
SOURCE_DIR "${TD_CONTRIB_DIR}/libdwarf"
BINARY_DIR "${TD_CONTRIB_DIR}/libdwarf"
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
)
...@@ -98,6 +98,12 @@ if(${BUILD_WITH_NURAFT}) ...@@ -98,6 +98,12 @@ if(${BUILD_WITH_NURAFT})
cat("${TD_SUPPORT_DIR}/nuraft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) cat("${TD_SUPPORT_DIR}/nuraft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${BUILD_WITH_NURAFT}) endif(${BUILD_WITH_NURAFT})
# addr2line
if(${BUILD_ADDR2LINE})
cat("${TD_SUPPORT_DIR}/libdwarf_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
cat("${TD_SUPPORT_DIR}/addr2line_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${BUILD_ADDR2LINE})
# download dependencies # download dependencies
configure_file(${CONTRIB_TMP_FILE} "${TD_CONTRIB_DIR}/deps-download/CMakeLists.txt") configure_file(${CONTRIB_TMP_FILE} "${TD_CONTRIB_DIR}/deps-download/CMakeLists.txt")
execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" . execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" .
...@@ -228,7 +234,7 @@ endif() ...@@ -228,7 +234,7 @@ endif()
# iconv # iconv
if(${BUILD_WITH_ICONV}) if(${BUILD_WITH_ICONV})
add_subdirectory(iconv EXCLUDE_FROM_ALL) add_library(iconv STATIC iconv/win_iconv.c)
endif(${BUILD_WITH_ICONV}) endif(${BUILD_WITH_ICONV})
# wingetopt # wingetopt
...@@ -327,7 +333,48 @@ if(${BUILD_WITH_SQLITE}) ...@@ -327,7 +333,48 @@ if(${BUILD_WITH_SQLITE})
endif(NOT TD_WINDOWS) endif(NOT TD_WINDOWS)
endif(${BUILD_WITH_SQLITE}) endif(${BUILD_WITH_SQLITE})
# pthread # addr2line
if(${BUILD_ADDR2LINE})
check_include_file( "sys/types.h" HAVE_SYS_TYPES_H)
check_include_file( "sys/stat.h" HAVE_SYS_STAT_H )
check_include_file( "inttypes.h" HAVE_INTTYPES_H )
check_include_file( "stddef.h" HAVE_STDDEF_H )
check_include_file( "stdlib.h" HAVE_STDLIB_H )
check_include_file( "string.h" HAVE_STRING_H )
check_include_file( "memory.h" HAVE_MEMORY_H )
check_include_file( "strings.h" HAVE_STRINGS_H )
check_include_file( "stdint.h" HAVE_STDINT_H )
check_include_file( "unistd.h" HAVE_UNISTD_H )
check_include_file( "sgidefs.h" HAVE_SGIDEFS_H )
check_include_file( "stdafx.h" HAVE_STDAFX_H )
check_include_file( "elf.h" HAVE_ELF_H )
check_include_file( "libelf.h" HAVE_LIBELF_H )
check_include_file( "libelf/libelf.h" HAVE_LIBELF_LIBELF_H)
check_include_file( "alloca.h" HAVE_ALLOCA_H )
check_include_file( "elfaccess.h" HAVE_ELFACCESS_H)
check_include_file( "sys/elf_386.h" HAVE_SYS_ELF_386_H )
check_include_file( "sys/elf_amd64.h" HAVE_SYS_ELF_AMD64_H)
check_include_file( "sys/elf_sparc.h" HAVE_SYS_ELF_SPARC_H)
check_include_file( "sys/ia64/elf.h" HAVE_SYS_IA64_ELF_H )
set(VERSION 0.3.1)
set(PACKAGE_VERSION "\"${VERSION}\"")
configure_file(libdwarf/cmake/config.h.cmake config.h)
file(GLOB_RECURSE LIBDWARF_SOURCES "libdwarf/src/lib/libdwarf/*.c")
add_library(libdwarf STATIC ${LIBDWARF_SOURCES})
set_target_properties(libdwarf PROPERTIES OUTPUT_NAME "libdwarf")
if(HAVE_LIBELF_H OR HAVE_LIBELF_LIBELF_H)
target_link_libraries(libdwarf PUBLIC libelf)
endif()
target_include_directories(libdwarf SYSTEM PUBLIC "libdwarf/src/lib/libdwarf" ${CMAKE_BINARY_DIR}/contrib)
file(READ "addr2line/addr2line.c" ADDR2LINE_CONTENT)
string(REPLACE "static int" "int" ADDR2LINE_CONTENT "${ADDR2LINE_CONTENT}")
string(REPLACE "static void" "void" ADDR2LINE_CONTENT "${ADDR2LINE_CONTENT}")
string(REPLACE "main(" "main_addr2line(" ADDR2LINE_CONTENT "${ADDR2LINE_CONTENT}")
file(WRITE "addr2line/addr2line.c" "${ADDR2LINE_CONTENT}")
add_library(addr2line STATIC "addr2line/addr2line.c")
target_link_libraries(addr2line PUBLIC libdwarf dl z)
target_include_directories(addr2line PUBLIC "libdwarf/src/lib/libdwarf" )
endif(${BUILD_ADDR2LINE})
# ================================================================================================ # ================================================================================================
......
...@@ -24,7 +24,7 @@ static void msg_process(TAOS_RES* msg) { ...@@ -24,7 +24,7 @@ static void msg_process(TAOS_RES* msg) {
char buf[1024]; char buf[1024];
memset(buf, 0, 1024); memset(buf, 0, 1024);
printf("topic: %s\n", tmq_get_topic_name(msg)); printf("topic: %s\n", tmq_get_topic_name(msg));
printf("vg:%d\n", tmq_get_vgroup_id(msg)); printf("vg: %d\n", tmq_get_vgroup_id(msg));
while (1) { while (1) {
TAOS_ROW row = taos_fetch_row(msg); TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break; if (row == NULL) break;
...@@ -140,8 +140,8 @@ int32_t create_topic() { ...@@ -140,8 +140,8 @@ int32_t create_topic() {
return 0; return 0;
} }
void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets) { void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets, void* param) {
printf("commit %d\n", resp); printf("commit %d tmq %p offsets %p param %p\n", resp, tmq, offsets, param);
} }
tmq_t* build_consumer() { tmq_t* build_consumer() {
...@@ -161,7 +161,7 @@ tmq_t* build_consumer() { ...@@ -161,7 +161,7 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "td.connect.pass", "taosdata");
/*tmq_conf_set(conf, "td.connect.db", "abc1");*/ /*tmq_conf_set(conf, "td.connect.db", "abc1");*/
tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print); tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
assert(tmq); assert(tmq);
return tmq; return tmq;
...@@ -215,12 +215,24 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { ...@@ -215,12 +215,24 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
return; return;
} }
tmq_list_t* subList = NULL;
tmq_subscription(tmq, &subList);
char** subTopics = tmq_list_to_c_array(subList);
int32_t sz = tmq_list_get_size(subList);
printf("subscribed topics: ");
for (int32_t i = 0; i < sz; i++) {
printf("%s, ", subTopics[i]);
}
printf("\n");
tmq_list_destroy(subList);
while (running) { while (running) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000); TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000);
if (tmqmessage) { if (tmqmessage) {
msg_process(tmqmessage); msg_process(tmqmessage);
taos_free_result(tmqmessage); taos_free_result(tmqmessage);
tmq_commit(tmq, NULL, 1);
/*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/ /*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
} }
} }
......
...@@ -25,7 +25,7 @@ int32_t init_env() { ...@@ -25,7 +25,7 @@ int32_t init_env() {
return -1; return -1;
} }
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2"); TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes)); printf("error in create db, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
...@@ -78,11 +78,15 @@ int32_t create_stream() { ...@@ -78,11 +78,15 @@ int32_t create_stream() {
taos_free_result(pRes); taos_free_result(pRes);
/*const char* sql = "select min(k), max(k), sum(k) from tu1";*/ /*const char* sql = "select min(k), max(k), sum(k) from tu1";*/
const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1"; /*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/ /*const char* sql = "select sum(k) from tu1 interval(10m)";*/
pRes = tmq_create_stream(pConn, "stream1", "out1", sql); /*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
pRes = taos_query(
pConn,
"create stream stream1 trigger at_once into outstb as select _wstartts, min(k), max(k), sum(k) as sum_of_k "
"from tu1 interval(10m)");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create stream out1, reason:%s\n", taos_errstr(pRes)); printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
......
...@@ -86,9 +86,9 @@ typedef struct taosField { ...@@ -86,9 +86,9 @@ typedef struct taosField {
} TAOS_FIELD; } TAOS_FIELD;
#ifdef WINDOWS #ifdef WINDOWS
#define DLL_EXPORT __declspec(dllexport) #define DLL_EXPORT __declspec(dllexport)
#else #else
#define DLL_EXPORT #define DLL_EXPORT
#endif #endif
typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code); typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code);
...@@ -213,7 +213,7 @@ typedef struct tmq_topic_vgroup_list_t tmq_topic_vgroup_list_t; ...@@ -213,7 +213,7 @@ typedef struct tmq_topic_vgroup_list_t tmq_topic_vgroup_list_t;
typedef struct tmq_conf_t tmq_conf_t; typedef struct tmq_conf_t tmq_conf_t;
typedef struct tmq_list_t tmq_list_t; typedef struct tmq_list_t tmq_list_t;
typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *)); typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *, void *param));
DLL_EXPORT tmq_list_t *tmq_list_new(); DLL_EXPORT tmq_list_t *tmq_list_new();
DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *); DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
...@@ -221,15 +221,12 @@ DLL_EXPORT void tmq_list_destroy(tmq_list_t *); ...@@ -221,15 +221,12 @@ DLL_EXPORT void tmq_list_destroy(tmq_list_t *);
DLL_EXPORT int32_t tmq_list_get_size(const tmq_list_t *); DLL_EXPORT int32_t tmq_list_get_size(const tmq_list_t *);
DLL_EXPORT char **tmq_list_to_c_array(const tmq_list_t *); DLL_EXPORT char **tmq_list_to_c_array(const tmq_list_t *);
#if 0
DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen);
#endif
DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen); DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t); DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */ /* ------------------------TMQ CONSUMER INTERFACE------------------------ */
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list); DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq); DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics); DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
...@@ -240,6 +237,7 @@ DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t * ...@@ -240,6 +237,7 @@ DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *
DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async); DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async);
DLL_EXPORT tmq_resp_err_t tmq_seek(tmq_t *tmq, const tmq_topic_vgroup_t *offset); DLL_EXPORT tmq_resp_err_t tmq_seek(tmq_t *tmq, const tmq_topic_vgroup_t *offset);
#endif #endif
/* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */ /* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */
enum tmq_conf_res_t { enum tmq_conf_res_t {
...@@ -253,11 +251,11 @@ typedef enum tmq_conf_res_t tmq_conf_res_t; ...@@ -253,11 +251,11 @@ typedef enum tmq_conf_res_t tmq_conf_res_t;
DLL_EXPORT tmq_conf_t *tmq_conf_new(); DLL_EXPORT tmq_conf_t *tmq_conf_new();
DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value); DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value);
DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf); DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb); DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param);
/* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */ /* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */
DLL_EXPORT char *tmq_get_topic_name(TAOS_RES *res); DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
// TODO // TODO
#if 0 #if 0
...@@ -268,14 +266,9 @@ DLL_EXPORT char *tmq_get_table_name(TAOS_RES *res); ...@@ -268,14 +266,9 @@ DLL_EXPORT char *tmq_get_table_name(TAOS_RES *res);
DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message); DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message);
DLL_EXPORT int64_t tmq_get_response_offset(tmq_message_t *message); DLL_EXPORT int64_t tmq_get_response_offset(tmq_message_t *message);
#endif #endif
/* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */
#if 0
DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen);
#endif
DLL_EXPORT TAOS_RES *tmq_create_stream(TAOS *taos, const char *streamName, const char *tbName, const char *sql);
/* ------------------------------ TMQ END -------------------------------- */ /* ------------------------------ TMQ END -------------------------------- */
#if 1 // Shuduo: temporary enable for app build #if 1 // Shuduo: temporary enable for app build
typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code); typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code);
#endif #endif
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
#ifndef TDENGINE_SYSTABLE_H
#define TDENGINE_SYSTABLE_H
#define TSDB_INFORMATION_SCHEMA_DB "information_schema"
#define TSDB_INS_TABLE_DNODES "dnodes"
#define TSDB_INS_TABLE_MNODES "mnodes"
#define TSDB_INS_TABLE_MODULES "modules"
#define TSDB_INS_TABLE_QNODES "qnodes"
#define TSDB_INS_TABLE_BNODES "bnodes"
#define TSDB_INS_TABLE_SNODES "snodes"
#define TSDB_INS_TABLE_CLUSTER "cluster"
#define TSDB_INS_TABLE_USER_DATABASES "user_databases"
#define TSDB_INS_TABLE_USER_FUNCTIONS "user_functions"
#define TSDB_INS_TABLE_USER_INDEXES "user_indexes"
#define TSDB_INS_TABLE_USER_STABLES "user_stables"
#define TSDB_INS_TABLE_USER_STREAMS "user_streams"
#define TSDB_INS_TABLE_USER_TABLES "user_tables"
#define TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED "user_table_distributed"
#define TSDB_INS_TABLE_USER_USERS "user_users"
#define TSDB_INS_TABLE_LICENCES "grants"
#define TSDB_INS_TABLE_VGROUPS "vgroups"
#define TSDB_INS_TABLE_VNODES "vnodes"
#define TSDB_INS_TABLE_CONFIGS "configs"
#define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema"
#define TSDB_PERFS_TABLE_SMAS "smas"
#define TSDB_PERFS_TABLE_CONNECTIONS "connections"
#define TSDB_PERFS_TABLE_QUERIES "queries"
#define TSDB_PERFS_TABLE_TOPICS "topics"
#define TSDB_PERFS_TABLE_CONSUMERS "consumers"
#define TSDB_PERFS_TABLE_SUBSCRIPTIONS "subscriptions"
#define TSDB_PERFS_TABLE_OFFSETS "offsets"
#define TSDB_PERFS_TABLE_TRANS "trans"
#define TSDB_PERFS_TABLE_STREAMS "streams"
typedef struct SSysDbTableSchema {
const char* name;
const int32_t type;
const int32_t bytes;
} SSysDbTableSchema;
typedef struct SSysTableMeta {
const char* name;
const SSysDbTableSchema* schema;
const int32_t colNum;
} SSysTableMeta;
void getInfosDbMeta(const SSysTableMeta** pInfosTableMeta, size_t* size);
void getPerfDbMeta(const SSysTableMeta** pPerfsTableMeta, size_t* size);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_SYSTABLE_H
...@@ -28,6 +28,7 @@ typedef int64_t tb_uid_t; ...@@ -28,6 +28,7 @@ typedef int64_t tb_uid_t;
#define TSWINDOW_INITIALIZER ((STimeWindow){INT64_MIN, INT64_MAX}) #define TSWINDOW_INITIALIZER ((STimeWindow){INT64_MIN, INT64_MAX})
#define TSWINDOW_DESC_INITIALIZER ((STimeWindow){INT64_MAX, INT64_MIN}) #define TSWINDOW_DESC_INITIALIZER ((STimeWindow){INT64_MAX, INT64_MIN})
#define IS_TSWINDOW_SPECIFIED(win) (((win).skey != INT64_MIN) || ((win).ekey != INT64_MAX)) #define IS_TSWINDOW_SPECIFIED(win) (((win).skey != INT64_MIN) || ((win).ekey != INT64_MAX))
#define TSWINDOW_IS_EQUAL(t1, t2) (((t1).skey == (t2).skey) && ((t1).ekey == (t2).ekey))
typedef enum { typedef enum {
TSDB_SUPER_TABLE = 1, // super table TSDB_SUPER_TABLE = 1, // super table
...@@ -71,13 +72,19 @@ typedef enum { ...@@ -71,13 +72,19 @@ typedef enum {
TSDB_SMA_STAT_DROPPED = 2, // sma dropped TSDB_SMA_STAT_DROPPED = 2, // sma dropped
} ETsdbSmaStat; // bit operation } ETsdbSmaStat; // bit operation
typedef enum { typedef enum {
TSDB_SMA_TYPE_BLOCK = 0, // Block-wise SMA TSDB_SMA_TYPE_BLOCK = 0, // Block-wise SMA
TSDB_SMA_TYPE_TIME_RANGE = 1, // Time-range-wise SMA TSDB_SMA_TYPE_TIME_RANGE = 1, // Time-range-wise SMA
TSDB_SMA_TYPE_ROLLUP = 2, // Rollup SMA TSDB_SMA_TYPE_ROLLUP = 2, // Rollup SMA
} ETsdbSmaType; } ETsdbSmaType;
typedef enum {
TSDB_RETENTION_L0 = 0,
TSDB_RETENTION_L1 = 1,
TSDB_RETENTION_L2 = 2,
TSDB_RETENTION_MAX = 3
} ERetentionLevel;
extern char *qtypeStr[]; extern char *qtypeStr[];
#define TSDB_PORT_HTTP 11 #define TSDB_PORT_HTTP 11
......
...@@ -45,6 +45,12 @@ enum { ...@@ -45,6 +45,12 @@ enum {
STREAM_TRIGGER__BY_EVENT_TIME, STREAM_TRIGGER__BY_EVENT_TIME,
}; };
typedef enum EStreamType {
STREAM_NORMAL = 1,
STREAM_INVERT,
STREAM_INVALID,
} EStreamType;
typedef struct { typedef struct {
uint32_t numOfTables; uint32_t numOfTables;
SArray* pGroupList; SArray* pGroupList;
...@@ -65,18 +71,17 @@ typedef struct SDataBlockInfo { ...@@ -65,18 +71,17 @@ typedef struct SDataBlockInfo {
STimeWindow window; STimeWindow window;
int32_t rows; int32_t rows;
int32_t rowSize; int32_t rowSize;
union { int64_t uid; // the uid of table, from which current data block comes
int64_t uid; // from which table of uid, comes from this data block int64_t blockId; // block id, generated by physical planner
int64_t blockId;
};
uint64_t groupId; // no need to serialize uint64_t groupId; // no need to serialize
int16_t numOfCols; int16_t numOfCols;
int16_t hasVarCol; int16_t hasVarCol;
int16_t capacity; int32_t capacity;
EStreamType type;
} SDataBlockInfo; } SDataBlockInfo;
typedef struct SSDataBlock { typedef struct SSDataBlock {
SColumnDataAgg* pBlockAgg; SColumnDataAgg** pBlockAgg;
SArray* pDataBlock; // SArray<SColumnInfoData> SArray* pDataBlock; // SArray<SColumnInfoData>
SDataBlockInfo info; SDataBlockInfo info;
} SSDataBlock; } SSDataBlock;
...@@ -117,8 +122,6 @@ void* tDecodeDataBlocks(const void* buf, SArray** blocks); ...@@ -117,8 +122,6 @@ void* tDecodeDataBlocks(const void* buf, SArray** blocks);
void colDataDestroy(SColumnInfoData* pColData); void colDataDestroy(SColumnInfoData* pColData);
static FORCE_INLINE void blockDestroyInner(SSDataBlock* pBlock) { static FORCE_INLINE void blockDestroyInner(SSDataBlock* pBlock) {
// WARNING: do not use info.numOfCols,
// sometimes info.numOfCols != array size
int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock); int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
...@@ -196,6 +199,21 @@ typedef struct SExprInfo { ...@@ -196,6 +199,21 @@ typedef struct SExprInfo {
struct tExprNode* pExpr; struct tExprNode* pExpr;
} SExprInfo; } SExprInfo;
typedef struct {
const char* key;
int32_t keyLen;
uint8_t type;
int16_t length;
union{
const char* value;
int64_t i;
uint64_t u;
double d;
float f;
};
int32_t valueLen;
} SSmlKv;
#define QUERY_ASC_FORWARD_STEP 1 #define QUERY_ASC_FORWARD_STEP 1
#define QUERY_DESC_FORWARD_STEP -1 #define QUERY_DESC_FORWARD_STEP -1
......
...@@ -13,19 +13,33 @@ ...@@ -13,19 +13,33 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _TDB_TXN_H_ #ifndef _TD_TDATA_H_
#define _TDB_TXN_H_ #define _TD_TDATA_H_
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
int tdbTxnOpen(TXN *pTxn, int64_t txnid, void *(*xMalloc)(void *, size_t), void (*xFree)(void *, void *), void *xArg, #include "os.h"
int flags);
int tdbTxnClose(TXN *pTxn); typedef struct STaosData TDATA, tdata_t;
typedef enum {
TAOS_META_STABLE_DATA = 0, // super table meta
TAOS_META_TABLE_DATA, // non-super table meta
TAOS_TS_ROW_DATA, // row time-series data
TAOS_TS_COL_DATA, // col time-series data
TAOS_DATA_MAX
} ETaosDataT;
struct STaosData {
ETaosDataT type;
uint32_t nPayload;
uint8_t *pPayload;
};
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif /*_TDB_TXN_H_*/ #endif /*_TD_TDATA_H_*/
\ No newline at end of file \ No newline at end of file
...@@ -54,6 +54,11 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet); ...@@ -54,6 +54,11 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet);
BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \ BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \
} while (0) } while (0)
#define colDataSetNotNull_f(bm_, r_) \
do { \
BMCharPos(bm_, r_) &= ~(1u << (7u - BitPos(r_))); \
} while (0)
#define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1) #define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1)
#define colDataSetNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] = -1) #define colDataSetNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] = -1)
...@@ -64,15 +69,14 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet); ...@@ -64,15 +69,14 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet);
#define colDataGetNumData(p1_, r_) ((p1_)->pData + ((r_) * (p1_)->info.bytes)) #define colDataGetNumData(p1_, r_) ((p1_)->pData + ((r_) * (p1_)->info.bytes))
// SColumnInfoData, rowNumber // SColumnInfoData, rowNumber
#define colDataGetData(p1_, r_) \ #define colDataGetData(p1_, r_) \
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? colDataGetVarData(p1_, r_) \ ((IS_VAR_DATA_TYPE((p1_)->info.type)) ? colDataGetVarData(p1_, r_) : colDataGetNumData(p1_, r_))
: colDataGetNumData(p1_, r_))
static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData, uint32_t row) { static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData, uint32_t row) {
if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON){ if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON) {
if(colDataIsNull_var(pColumnInfoData, row)){ if (colDataIsNull_var(pColumnInfoData, row)) {
return true; return true;
} }
char *data = colDataGetVarData(pColumnInfoData, row); char* data = colDataGetVarData(pColumnInfoData, row);
return (*data == TSDB_DATA_TYPE_NULL); return (*data == TSDB_DATA_TYPE_NULL);
} }
...@@ -80,7 +84,7 @@ static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData, ...@@ -80,7 +84,7 @@ static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData,
return false; return false;
} }
if (pColumnInfoData->info.type== TSDB_DATA_TYPE_VARCHAR || pColumnInfoData->info.type == TSDB_DATA_TYPE_NCHAR) { if (pColumnInfoData->info.type == TSDB_DATA_TYPE_VARCHAR || pColumnInfoData->info.type == TSDB_DATA_TYPE_NCHAR) {
return colDataIsNull_var(pColumnInfoData, row); return colDataIsNull_var(pColumnInfoData, row);
} else { } else {
if (pColumnInfoData->nullbitmap == NULL) { if (pColumnInfoData->nullbitmap == NULL) {
...@@ -132,7 +136,7 @@ static FORCE_INLINE void colDataAppendNULL(SColumnInfoData* pColumnInfoData, uin ...@@ -132,7 +136,7 @@ static FORCE_INLINE void colDataAppendNULL(SColumnInfoData* pColumnInfoData, uin
static FORCE_INLINE void colDataAppendNNULL(SColumnInfoData* pColumnInfoData, uint32_t start, size_t nRows) { static FORCE_INLINE void colDataAppendNNULL(SColumnInfoData* pColumnInfoData, uint32_t start, size_t nRows) {
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
for (int32_t i = start; i < start + nRows; ++i) { for (int32_t i = start; i < start + nRows; ++i) {
colDataSetNull_var(pColumnInfoData,i); // it is a null value of VAR type. colDataSetNull_var(pColumnInfoData, i); // it is a null value of VAR type.
} }
} else { } else {
for (int32_t i = start; i < start + nRows; ++i) { for (int32_t i = start; i < start + nRows; ++i) {
...@@ -183,8 +187,8 @@ static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, u ...@@ -183,8 +187,8 @@ static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, u
} }
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull); int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource, int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity,
uint32_t numOfRow2); const SColumnInfoData* pSource, uint32_t numOfRow2);
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows); int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows);
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock); int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock);
...@@ -225,6 +229,11 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData); ...@@ -225,6 +229,11 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
void blockDebugShowData(const SArray* dataBlocks); void blockDebugShowData(const SArray* dataBlocks);
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
tb_uid_t uid, tb_uid_t suid);
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid, int32_t vgId);
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) { static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock); return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock);
} }
...@@ -238,10 +247,10 @@ static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32 ...@@ -238,10 +247,10 @@ static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32
static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols,
int8_t needCompress) { int8_t needCompress) {
int32_t* actualLen = (int32_t*) data; int32_t* actualLen = (int32_t*)data;
data += sizeof(int32_t); data += sizeof(int32_t);
uint64_t* groupId = (uint64_t*) data; uint64_t* groupId = (uint64_t*)data;
data += sizeof(uint64_t); data += sizeof(uint64_t);
int32_t* colSizes = (int32_t*)data; int32_t* colSizes = (int32_t*)data;
......
此差异已折叠。
...@@ -17,8 +17,8 @@ ...@@ -17,8 +17,8 @@
#define _TD_COMMON_GLOBAL_H_ #define _TD_COMMON_GLOBAL_H_
#include "tarray.h" #include "tarray.h"
#include "tdef.h"
#include "tconfig.h" #include "tconfig.h"
#include "tdef.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -121,11 +121,19 @@ extern char tsCompressor[]; ...@@ -121,11 +121,19 @@ extern char tsCompressor[];
extern int32_t tsDiskCfgNum; extern int32_t tsDiskCfgNum;
extern SDiskCfg tsDiskCfg[]; extern SDiskCfg tsDiskCfg[];
// udf
extern bool tsStartUdfd;
// internal
extern int32_t tsTransPullupInterval;
extern int32_t tsMqRebalanceInterval;
#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd, const char *envFile, int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd,
char *apolloUrl, SArray *pArgs, bool tsc); const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc);
int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc); int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile, char *apolloUrl, SArray *pArgs,
bool tsc);
void taosCleanupCfg(); void taosCleanupCfg();
void taosCfgDynamicOptions(const char *option, const char *value); void taosCfgDynamicOptions(const char *option, const char *value);
void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary); void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary);
......
此差异已折叠。
...@@ -42,6 +42,7 @@ typedef int32_t (*GetQueueSizeFp)(SMgmtWrapper* pWrapper, int32_t vgId, EQueueTy ...@@ -42,6 +42,7 @@ typedef int32_t (*GetQueueSizeFp)(SMgmtWrapper* pWrapper, int32_t vgId, EQueueTy
typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, const SEpSet* epSet, SRpcMsg* pReq); typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, const SEpSet* epSet, SRpcMsg* pReq);
typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq);
typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, const SRpcMsg* pRsp); typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, const SRpcMsg* pRsp);
typedef void (*SendRedirectRspFp)(SMgmtWrapper* pWrapper, const SRpcMsg* pRsp, const SEpSet* pNewEpSet);
typedef void (*RegisterBrokenLinkArgFp)(SMgmtWrapper* pWrapper, SRpcMsg* pMsg); typedef void (*RegisterBrokenLinkArgFp)(SMgmtWrapper* pWrapper, SRpcMsg* pMsg);
typedef void (*ReleaseHandleFp)(SMgmtWrapper* pWrapper, void* handle, int8_t type); typedef void (*ReleaseHandleFp)(SMgmtWrapper* pWrapper, void* handle, int8_t type);
typedef void (*ReportStartup)(SMgmtWrapper* pWrapper, const char* name, const char* desc); typedef void (*ReportStartup)(SMgmtWrapper* pWrapper, const char* name, const char* desc);
...@@ -52,9 +53,11 @@ typedef struct { ...@@ -52,9 +53,11 @@ typedef struct {
GetQueueSizeFp qsizeFp; GetQueueSizeFp qsizeFp;
SendReqFp sendReqFp; SendReqFp sendReqFp;
SendRspFp sendRspFp; SendRspFp sendRspFp;
SendRedirectRspFp sendRedirectRspFp;
RegisterBrokenLinkArgFp registerBrokenLinkArgFp; RegisterBrokenLinkArgFp registerBrokenLinkArgFp;
ReleaseHandleFp releaseHandleFp; ReleaseHandleFp releaseHandleFp;
ReportStartup reportStartupFp; ReportStartup reportStartupFp;
void* clientRpc;
} SMsgCb; } SMsgCb;
void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb); void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb);
...@@ -62,6 +65,7 @@ int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq); ...@@ -62,6 +65,7 @@ int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq);
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype); int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype);
int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq); int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq);
void tmsgSendRsp(const SRpcMsg* pRsp); void tmsgSendRsp(const SRpcMsg* pRsp);
void tmsgSendRedirectRsp(const SRpcMsg* pRsp, const SEpSet* pNewEpSet);
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg); void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg);
void tmsgReleaseHandle(void* handle, int8_t type); void tmsgReleaseHandle(void* handle, int8_t type);
void tmsgReportStartup(const char* name, const char* desc); void tmsgReportStartup(const char* name, const char* desc);
......
...@@ -170,9 +170,9 @@ enum { ...@@ -170,9 +170,9 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_UPDATE_TAG_VAL, "vnode-update-tag-val", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_UPDATE_TAG_VAL, "vnode-update-tag-val", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TABLE_META, "vnode-table-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TABLE_META, "vnode-table-meta", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TABLES_META, "vnode-tables-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TABLES_META, "vnode-tables-meta", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-stb", SVCreateTbReq, SVCreateTbRsp) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-stb", SVCreateStbReq, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_STB, "vnode-alter-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_STB, "vnode-alter-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", SVDropStbReq, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONSUME, "vnode-mq-consume", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONSUME, "vnode-mq-consume", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_QUERY, "vnode-mq-query", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_QUERY, "vnode-mq-query", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONNECT, "vnode-mq-connect", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONNECT, "vnode-mq-connect", NULL, NULL)
...@@ -202,8 +202,8 @@ enum { ...@@ -202,8 +202,8 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp)
// sync integration
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_TIMEOUT, "vnode-sync-timeout", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SYNC_TIMEOUT, "vnode-sync-timeout", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_PING, "vnode-sync-ping", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SYNC_PING, "vnode-sync-ping", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_PING_REPLY, "vnode-sync-ping-reply", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SYNC_PING_REPLY, "vnode-sync-ping-reply", NULL, NULL)
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#ifndef _TD_COMMON_NAME_H_ #ifndef _TD_COMMON_NAME_H_
#define _TD_COMMON_NAME_H_ #define _TD_COMMON_NAME_H_
#include "tarray.h"
#include "tdef.h" #include "tdef.h"
#ifdef __cplusplus #ifdef __cplusplus
...@@ -62,6 +63,18 @@ int32_t tNameSetAcctId(SName* dst, int32_t acctId); ...@@ -62,6 +63,18 @@ int32_t tNameSetAcctId(SName* dst, int32_t acctId);
bool tNameDBNameEqual(SName* left, SName* right); bool tNameDBNameEqual(SName* left, SName* right);
typedef struct {
// input
SArray* tags; // element is SSmlKv
const char* sTableName; // super table name
uint8_t sTableNameLen; // the length of super table name
// output
char* childTableName; // must have size of TSDB_TABLE_NAME_LEN;
uint64_t uid; // child table uid, may be useful
} RandTableName;
void buildChildTableName(RandTableName* rName);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
此差异已折叠。
...@@ -51,201 +51,208 @@ ...@@ -51,201 +51,208 @@
#define TK_USER 33 #define TK_USER 33
#define TK_PRIVILEGE 34 #define TK_PRIVILEGE 34
#define TK_DROP 35 #define TK_DROP 35
#define TK_DNODE 36 #define TK_GRANT 36
#define TK_PORT 37 #define TK_ON 37
#define TK_NK_INTEGER 38 #define TK_TO 38
#define TK_DNODES 39 #define TK_REVOKE 39
#define TK_NK_IPTOKEN 40 #define TK_FROM 40
#define TK_LOCAL 41 #define TK_NK_COMMA 41
#define TK_QNODE 42 #define TK_READ 42
#define TK_ON 43 #define TK_WRITE 43
#define TK_BNODE 44 #define TK_NK_DOT 44
#define TK_SNODE 45 #define TK_DNODE 45
#define TK_MNODE 46 #define TK_PORT 46
#define TK_DATABASE 47 #define TK_NK_INTEGER 47
#define TK_USE 48 #define TK_DNODES 48
#define TK_IF 49 #define TK_NK_IPTOKEN 49
#define TK_NOT 50 #define TK_LOCAL 50
#define TK_EXISTS 51 #define TK_QNODE 51
#define TK_BUFFER 52 #define TK_BNODE 52
#define TK_CACHELAST 53 #define TK_SNODE 53
#define TK_COMP 54 #define TK_MNODE 54
#define TK_DAYS 55 #define TK_DATABASE 55
#define TK_NK_VARIABLE 56 #define TK_USE 56
#define TK_FSYNC 57 #define TK_IF 57
#define TK_MAXROWS 58 #define TK_NOT 58
#define TK_MINROWS 59 #define TK_EXISTS 59
#define TK_KEEP 60 #define TK_BUFFER 60
#define TK_PAGES 61 #define TK_CACHELAST 61
#define TK_PAGESIZE 62 #define TK_COMP 62
#define TK_PRECISION 63 #define TK_DAYS 63
#define TK_REPLICA 64 #define TK_NK_VARIABLE 64
#define TK_STRICT 65 #define TK_FSYNC 65
#define TK_WAL 66 #define TK_MAXROWS 66
#define TK_VGROUPS 67 #define TK_MINROWS 67
#define TK_SINGLE_STABLE 68 #define TK_KEEP 68
#define TK_RETENTIONS 69 #define TK_PAGES 69
#define TK_NK_COMMA 70 #define TK_PAGESIZE 70
#define TK_NK_COLON 71 #define TK_PRECISION 71
#define TK_TABLE 72 #define TK_REPLICA 72
#define TK_NK_LP 73 #define TK_STRICT 73
#define TK_NK_RP 74 #define TK_WAL 74
#define TK_STABLE 75 #define TK_VGROUPS 75
#define TK_ADD 76 #define TK_SINGLE_STABLE 76
#define TK_COLUMN 77 #define TK_RETENTIONS 77
#define TK_MODIFY 78 #define TK_NK_COLON 78
#define TK_RENAME 79 #define TK_TABLE 79
#define TK_TAG 80 #define TK_NK_LP 80
#define TK_SET 81 #define TK_NK_RP 81
#define TK_NK_EQ 82 #define TK_STABLE 82
#define TK_USING 83 #define TK_ADD 83
#define TK_TAGS 84 #define TK_COLUMN 84
#define TK_NK_DOT 85 #define TK_MODIFY 85
#define TK_COMMENT 86 #define TK_RENAME 86
#define TK_BOOL 87 #define TK_TAG 87
#define TK_TINYINT 88 #define TK_SET 88
#define TK_SMALLINT 89 #define TK_NK_EQ 89
#define TK_INT 90 #define TK_USING 90
#define TK_INTEGER 91 #define TK_TAGS 91
#define TK_BIGINT 92 #define TK_COMMENT 92
#define TK_FLOAT 93 #define TK_BOOL 93
#define TK_DOUBLE 94 #define TK_TINYINT 94
#define TK_BINARY 95 #define TK_SMALLINT 95
#define TK_TIMESTAMP 96 #define TK_INT 96
#define TK_NCHAR 97 #define TK_INTEGER 97
#define TK_UNSIGNED 98 #define TK_BIGINT 98
#define TK_JSON 99 #define TK_FLOAT 99
#define TK_VARCHAR 100 #define TK_DOUBLE 100
#define TK_MEDIUMBLOB 101 #define TK_BINARY 101
#define TK_BLOB 102 #define TK_TIMESTAMP 102
#define TK_VARBINARY 103 #define TK_NCHAR 103
#define TK_DECIMAL 104 #define TK_UNSIGNED 104
#define TK_DELAY 105 #define TK_JSON 105
#define TK_FILE_FACTOR 106 #define TK_VARCHAR 106
#define TK_NK_FLOAT 107 #define TK_MEDIUMBLOB 107
#define TK_ROLLUP 108 #define TK_BLOB 108
#define TK_TTL 109 #define TK_VARBINARY 109
#define TK_SMA 110 #define TK_DECIMAL 110
#define TK_SHOW 111 #define TK_DELAY 111
#define TK_DATABASES 112 #define TK_FILE_FACTOR 112
#define TK_TABLES 113 #define TK_NK_FLOAT 113
#define TK_STABLES 114 #define TK_ROLLUP 114
#define TK_MNODES 115 #define TK_TTL 115
#define TK_MODULES 116 #define TK_SMA 116
#define TK_QNODES 117 #define TK_SHOW 117
#define TK_FUNCTIONS 118 #define TK_DATABASES 118
#define TK_INDEXES 119 #define TK_TABLES 119
#define TK_FROM 120 #define TK_STABLES 120
#define TK_ACCOUNTS 121 #define TK_MNODES 121
#define TK_APPS 122 #define TK_MODULES 122
#define TK_CONNECTIONS 123 #define TK_QNODES 123
#define TK_LICENCE 124 #define TK_FUNCTIONS 124
#define TK_GRANTS 125 #define TK_INDEXES 125
#define TK_QUERIES 126 #define TK_ACCOUNTS 126
#define TK_SCORES 127 #define TK_APPS 127
#define TK_TOPICS 128 #define TK_CONNECTIONS 128
#define TK_VARIABLES 129 #define TK_LICENCE 129
#define TK_BNODES 130 #define TK_GRANTS 130
#define TK_SNODES 131 #define TK_QUERIES 131
#define TK_CLUSTER 132 #define TK_SCORES 132
#define TK_LIKE 133 #define TK_TOPICS 133
#define TK_INDEX 134 #define TK_VARIABLES 134
#define TK_FULLTEXT 135 #define TK_BNODES 135
#define TK_FUNCTION 136 #define TK_SNODES 136
#define TK_INTERVAL 137 #define TK_CLUSTER 137
#define TK_TOPIC 138 #define TK_TRANSACTIONS 138
#define TK_AS 139 #define TK_LIKE 139
#define TK_WITH 140 #define TK_INDEX 140
#define TK_SCHEMA 141 #define TK_FULLTEXT 141
#define TK_DESC 142 #define TK_FUNCTION 142
#define TK_DESCRIBE 143 #define TK_INTERVAL 143
#define TK_RESET 144 #define TK_TOPIC 144
#define TK_QUERY 145 #define TK_AS 145
#define TK_CACHE 146 #define TK_WITH 146
#define TK_EXPLAIN 147 #define TK_SCHEMA 147
#define TK_ANALYZE 148 #define TK_DESC 148
#define TK_VERBOSE 149 #define TK_DESCRIBE 149
#define TK_NK_BOOL 150 #define TK_RESET 150
#define TK_RATIO 151 #define TK_QUERY 151
#define TK_COMPACT 152 #define TK_CACHE 152
#define TK_VNODES 153 #define TK_EXPLAIN 153
#define TK_IN 154 #define TK_ANALYZE 154
#define TK_OUTPUTTYPE 155 #define TK_VERBOSE 155
#define TK_AGGREGATE 156 #define TK_NK_BOOL 156
#define TK_BUFSIZE 157 #define TK_RATIO 157
#define TK_STREAM 158 #define TK_COMPACT 158
#define TK_INTO 159 #define TK_VNODES 159
#define TK_TRIGGER 160 #define TK_IN 160
#define TK_AT_ONCE 161 #define TK_OUTPUTTYPE 161
#define TK_WINDOW_CLOSE 162 #define TK_AGGREGATE 162
#define TK_WATERMARK 163 #define TK_BUFSIZE 163
#define TK_KILL 164 #define TK_STREAM 164
#define TK_CONNECTION 165 #define TK_INTO 165
#define TK_MERGE 166 #define TK_TRIGGER 166
#define TK_VGROUP 167 #define TK_AT_ONCE 167
#define TK_REDISTRIBUTE 168 #define TK_WINDOW_CLOSE 168
#define TK_SPLIT 169 #define TK_WATERMARK 169
#define TK_SYNCDB 170 #define TK_KILL 170
#define TK_NULL 171 #define TK_CONNECTION 171
#define TK_NK_QUESTION 172 #define TK_TRANSACTION 172
#define TK_NK_ARROW 173 #define TK_MERGE 173
#define TK_ROWTS 174 #define TK_VGROUP 174
#define TK_TBNAME 175 #define TK_REDISTRIBUTE 175
#define TK_QSTARTTS 176 #define TK_SPLIT 176
#define TK_QENDTS 177 #define TK_SYNCDB 177
#define TK_WSTARTTS 178 #define TK_NULL 178
#define TK_WENDTS 179 #define TK_NK_QUESTION 179
#define TK_WDURATION 180 #define TK_NK_ARROW 180
#define TK_CAST 181 #define TK_ROWTS 181
#define TK_NOW 182 #define TK_TBNAME 182
#define TK_TODAY 183 #define TK_QSTARTTS 183
#define TK_TIMEZONE 184 #define TK_QENDTS 184
#define TK_COUNT 185 #define TK_WSTARTTS 185
#define TK_FIRST 186 #define TK_WENDTS 186
#define TK_LAST 187 #define TK_WDURATION 187
#define TK_LAST_ROW 188 #define TK_CAST 188
#define TK_BETWEEN 189 #define TK_NOW 189
#define TK_IS 190 #define TK_TODAY 190
#define TK_NK_LT 191 #define TK_TIMEZONE 191
#define TK_NK_GT 192 #define TK_COUNT 192
#define TK_NK_LE 193 #define TK_FIRST 193
#define TK_NK_GE 194 #define TK_LAST 194
#define TK_NK_NE 195 #define TK_LAST_ROW 195
#define TK_MATCH 196 #define TK_BETWEEN 196
#define TK_NMATCH 197 #define TK_IS 197
#define TK_CONTAINS 198 #define TK_NK_LT 198
#define TK_JOIN 199 #define TK_NK_GT 199
#define TK_INNER 200 #define TK_NK_LE 200
#define TK_SELECT 201 #define TK_NK_GE 201
#define TK_DISTINCT 202 #define TK_NK_NE 202
#define TK_WHERE 203 #define TK_MATCH 203
#define TK_PARTITION 204 #define TK_NMATCH 204
#define TK_BY 205 #define TK_CONTAINS 205
#define TK_SESSION 206 #define TK_JOIN 206
#define TK_STATE_WINDOW 207 #define TK_INNER 207
#define TK_SLIDING 208 #define TK_SELECT 208
#define TK_FILL 209 #define TK_DISTINCT 209
#define TK_VALUE 210 #define TK_WHERE 210
#define TK_NONE 211 #define TK_PARTITION 211
#define TK_PREV 212 #define TK_BY 212
#define TK_LINEAR 213 #define TK_SESSION 213
#define TK_NEXT 214 #define TK_STATE_WINDOW 214
#define TK_GROUP 215 #define TK_SLIDING 215
#define TK_HAVING 216 #define TK_FILL 216
#define TK_ORDER 217 #define TK_VALUE 217
#define TK_SLIMIT 218 #define TK_NONE 218
#define TK_SOFFSET 219 #define TK_PREV 219
#define TK_LIMIT 220 #define TK_LINEAR 220
#define TK_OFFSET 221 #define TK_NEXT 221
#define TK_ASC 222 #define TK_GROUP 222
#define TK_NULLS 223 #define TK_HAVING 223
#define TK_ID 224 #define TK_ORDER 224
#define TK_NK_BITNOT 225 #define TK_SLIMIT 225
#define TK_INSERT 226 #define TK_SOFFSET 226
#define TK_VALUES 227 #define TK_LIMIT 227
#define TK_IMPORT 228 #define TK_OFFSET 228
#define TK_NK_SEMI 229 #define TK_ASC 229
#define TK_FILE 230 #define TK_NULLS 230
#define TK_ID 231
#define TK_NK_BITNOT 232
#define TK_INSERT 233
#define TK_VALUES 234
#define TK_IMPORT 235
#define TK_NK_SEMI 236
#define TK_FILE 237
#define TK_NK_SPACE 300 #define TK_NK_SPACE 300
#define TK_NK_COMMENT 301 #define TK_NK_COMMENT 301
......
...@@ -186,14 +186,14 @@ typedef struct { ...@@ -186,14 +186,14 @@ typedef struct {
#define IS_NUMERIC_TYPE(_t) ((IS_SIGNED_NUMERIC_TYPE(_t)) || (IS_UNSIGNED_NUMERIC_TYPE(_t)) || (IS_FLOAT_TYPE(_t))) #define IS_NUMERIC_TYPE(_t) ((IS_SIGNED_NUMERIC_TYPE(_t)) || (IS_UNSIGNED_NUMERIC_TYPE(_t)) || (IS_FLOAT_TYPE(_t)))
#define IS_MATHABLE_TYPE(_t) (IS_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL) || (_t) == (TSDB_DATA_TYPE_TIMESTAMP)) #define IS_MATHABLE_TYPE(_t) (IS_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL) || (_t) == (TSDB_DATA_TYPE_TIMESTAMP))
#define IS_VALID_TINYINT(_t) ((_t) > INT8_MIN && (_t) <= INT8_MAX) #define IS_VALID_TINYINT(_t) ((_t) >= INT8_MIN && (_t) <= INT8_MAX)
#define IS_VALID_SMALLINT(_t) ((_t) > INT16_MIN && (_t) <= INT16_MAX) #define IS_VALID_SMALLINT(_t) ((_t) >= INT16_MIN && (_t) <= INT16_MAX)
#define IS_VALID_INT(_t) ((_t) > INT32_MIN && (_t) <= INT32_MAX) #define IS_VALID_INT(_t) ((_t) >= INT32_MIN && (_t) <= INT32_MAX)
#define IS_VALID_BIGINT(_t) ((_t) > INT64_MIN && (_t) <= INT64_MAX) #define IS_VALID_BIGINT(_t) ((_t) >= INT64_MIN && (_t) <= INT64_MAX)
#define IS_VALID_UTINYINT(_t) ((_t) >= 0 && (_t) < UINT8_MAX) #define IS_VALID_UTINYINT(_t) ((_t) >= 0 && (_t) <= UINT8_MAX)
#define IS_VALID_USMALLINT(_t) ((_t) >= 0 && (_t) < UINT16_MAX) #define IS_VALID_USMALLINT(_t) ((_t) >= 0 && (_t) <= UINT16_MAX)
#define IS_VALID_UINT(_t) ((_t) >= 0 && (_t) < UINT32_MAX) #define IS_VALID_UINT(_t) ((_t) >= 0 && (_t) <= UINT32_MAX)
#define IS_VALID_UBIGINT(_t) ((_t) >= 0 && (_t) < UINT64_MAX) #define IS_VALID_UBIGINT(_t) ((_t) >= 0 && (_t) <= UINT64_MAX)
#define IS_VALID_FLOAT(_t) ((_t) >= -FLT_MAX && (_t) <= FLT_MAX) #define IS_VALID_FLOAT(_t) ((_t) >= -FLT_MAX && (_t) <= FLT_MAX)
#define IS_VALID_DOUBLE(_t) ((_t) >= -DBL_MAX && (_t) <= DBL_MAX) #define IS_VALID_DOUBLE(_t) ((_t) >= -DBL_MAX && (_t) <= DBL_MAX)
......
...@@ -107,10 +107,9 @@ typedef enum { ...@@ -107,10 +107,9 @@ typedef enum {
typedef enum { typedef enum {
SDB_STATUS_INIT = 0, SDB_STATUS_INIT = 0,
SDB_STATUS_CREATING = 1, SDB_STATUS_CREATING = 1,
SDB_STATUS_UPDATING = 2, SDB_STATUS_DROPPING = 2,
SDB_STATUS_DROPPING = 3, SDB_STATUS_DROPPED = 3,
SDB_STATUS_READY = 4, SDB_STATUS_READY = 4,
SDB_STATUS_DROPPED = 5
} ESdbStatus; } ESdbStatus;
typedef enum { typedef enum {
......
...@@ -40,6 +40,11 @@ enum { ...@@ -40,6 +40,11 @@ enum {
CTG_DBG_STB_RENT_NUM, CTG_DBG_STB_RENT_NUM,
}; };
typedef enum {
AUTH_TYPE_READ = 1,
AUTH_TYPE_WRITE,
AUTH_TYPE_OTHER,
} AUTH_TYPE;
typedef struct SCatalogReq { typedef struct SCatalogReq {
SArray *pTableName; // element is SNAME SArray *pTableName; // element is SNAME
...@@ -57,6 +62,7 @@ typedef struct SMetaData { ...@@ -57,6 +62,7 @@ typedef struct SMetaData {
typedef struct SCatalogCfg { typedef struct SCatalogCfg {
uint32_t maxTblCacheNum; uint32_t maxTblCacheNum;
uint32_t maxDBCacheNum; uint32_t maxDBCacheNum;
uint32_t maxUserCacheNum;
uint32_t dbRentSec; uint32_t dbRentSec;
uint32_t stbRentSec; uint32_t stbRentSec;
} SCatalogCfg; } SCatalogCfg;
...@@ -77,6 +83,11 @@ typedef struct SDbVgVersion { ...@@ -77,6 +83,11 @@ typedef struct SDbVgVersion {
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
} SDbVgVersion; } SDbVgVersion;
typedef struct SUserAuthVersion {
char user[TSDB_USER_LEN];
int32_t version;
} SUserAuthVersion;
typedef SDbCfgRsp SDbCfgInfo; typedef SDbCfgRsp SDbCfgInfo;
typedef SUserIndexRsp SIndexInfo; typedef SUserIndexRsp SIndexInfo;
...@@ -219,12 +230,18 @@ int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableMetaVersion **stable ...@@ -219,12 +230,18 @@ int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableMetaVersion **stable
int32_t catalogGetExpiredDBs(SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *num); int32_t catalogGetExpiredDBs(SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *num);
int32_t catalogGetExpiredUsers(SCatalog* pCtg, SUserAuthVersion **users, uint32_t *num);
int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg); int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg);
int32_t catalogGetIndexInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo); int32_t catalogGetIndexInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo);
int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo** pInfo); int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo** pInfo);
int32_t catalogChkAuth(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass);
int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth);
/** /**
* Destroy catalog and relase all resources * Destroy catalog and relase all resources
......
...@@ -45,6 +45,7 @@ typedef struct SInputData { ...@@ -45,6 +45,7 @@ typedef struct SInputData {
typedef struct SOutputData { typedef struct SOutputData {
int32_t numOfRows; int32_t numOfRows;
int32_t numOfCols;
int8_t compressed; int8_t compressed;
char* pData; char* pData;
bool queryEnd; bool queryEnd;
......
...@@ -22,6 +22,7 @@ extern "C" { ...@@ -22,6 +22,7 @@ extern "C" {
#include "query.h" #include "query.h"
#include "tcommon.h" #include "tcommon.h"
#include "tmsgcb.h"
typedef void* qTaskInfo_t; typedef void* qTaskInfo_t;
typedef void* DataSinkHandle; typedef void* DataSinkHandle;
...@@ -33,6 +34,8 @@ typedef struct SReadHandle { ...@@ -33,6 +34,8 @@ typedef struct SReadHandle {
void* meta; void* meta;
void* config; void* config;
void* vnode; void* vnode;
void* mnd;
SMsgCb* pMsgCb;
} SReadHandle; } SReadHandle;
#define STREAM_DATA_TYPE_SUBMIT_BLOCK 0x1 #define STREAM_DATA_TYPE_SUBMIT_BLOCK 0x1
...@@ -164,15 +167,6 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t ...@@ -164,15 +167,6 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t
*/ */
int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type); int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type);
/**
* release the query handle and decrease the reference count in cache
* @param pMgmt
* @param pQInfo
* @param freeHandle
* @return
*/
void** qReleaseTask(void* pMgmt, void* pQInfo, bool freeHandle);
void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes); int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes);
......
...@@ -165,7 +165,7 @@ typedef struct SInputColumnInfoData { ...@@ -165,7 +165,7 @@ typedef struct SInputColumnInfoData {
SColumnInfoData *pPTS; // primary timestamp column SColumnInfoData *pPTS; // primary timestamp column
SColumnInfoData **pData; SColumnInfoData **pData;
SColumnDataAgg **pColumnDataAgg; SColumnDataAgg **pColumnDataAgg;
uint64_t uid; // table uid uint64_t uid; // table uid, used to set the tag value when building the final query result for selectivity functions.
} SInputColumnInfoData; } SInputColumnInfoData;
// sql function runtime context // sql function runtime context
...@@ -207,7 +207,7 @@ typedef struct SqlFunctionCtx { ...@@ -207,7 +207,7 @@ typedef struct SqlFunctionCtx {
struct SSDataBlock *pSrcBlock; struct SSDataBlock *pSrcBlock;
int32_t curBufPage; int32_t curBufPage;
char* udfName[TSDB_FUNC_NAME_LEN]; char udfName[TSDB_FUNC_NAME_LEN];
} SqlFunctionCtx; } SqlFunctionCtx;
enum { enum {
...@@ -336,6 +336,17 @@ int32_t udfcOpen(); ...@@ -336,6 +336,17 @@ int32_t udfcOpen();
*/ */
int32_t udfcClose(); int32_t udfcClose();
/**
* start udfd that serves udf function invocation under dnode startDnodeId
* @param startDnodeId
* @return
*/
int32_t udfStartUdfd(int32_t startDnodeId);
/**
* stop udfd
* @return
*/
int32_t udfStopUdfd();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -31,15 +31,16 @@ typedef enum EFunctionType { ...@@ -31,15 +31,16 @@ typedef enum EFunctionType {
FUNCTION_TYPE_ELAPSED, FUNCTION_TYPE_ELAPSED,
FUNCTION_TYPE_IRATE, FUNCTION_TYPE_IRATE,
FUNCTION_TYPE_LAST_ROW, FUNCTION_TYPE_LAST_ROW,
FUNCTION_TYPE_LEASTSQUARES,
FUNCTION_TYPE_MAX, FUNCTION_TYPE_MAX,
FUNCTION_TYPE_MIN, FUNCTION_TYPE_MIN,
FUNCTION_TYPE_MODE, FUNCTION_TYPE_MODE,
FUNCTION_TYPE_PERCENTILE, FUNCTION_TYPE_PERCENTILE,
FUNCTION_TYPE_SPREAD, FUNCTION_TYPE_SPREAD,
FUNCTION_TYPE_STDDEV, FUNCTION_TYPE_STDDEV,
FUNCTION_TYPE_LEASTSQUARES,
FUNCTION_TYPE_SUM, FUNCTION_TYPE_SUM,
FUNCTION_TYPE_TWA, FUNCTION_TYPE_TWA,
FUNCTION_TYPE_HISTOGRAM,
// nonstandard SQL function // nonstandard SQL function
FUNCTION_TYPE_BOTTOM = 500, FUNCTION_TYPE_BOTTOM = 500,
...@@ -54,6 +55,8 @@ typedef enum EFunctionType { ...@@ -54,6 +55,8 @@ typedef enum EFunctionType {
FUNCTION_TYPE_TAIL, FUNCTION_TYPE_TAIL,
FUNCTION_TYPE_TOP, FUNCTION_TYPE_TOP,
FUNCTION_TYPE_UNIQUE, FUNCTION_TYPE_UNIQUE,
FUNCTION_TYPE_STATE_COUNT,
FUNCTION_TYPE_STATE_DURATION,
// math function // math function
FUNCTION_TYPE_ABS = 1000, FUNCTION_TYPE_ABS = 1000,
...@@ -112,6 +115,9 @@ typedef enum EFunctionType { ...@@ -112,6 +115,9 @@ typedef enum EFunctionType {
FUNCTION_TYPE_WENDTS, FUNCTION_TYPE_WENDTS,
FUNCTION_TYPE_WDURATION, FUNCTION_TYPE_WDURATION,
// internal function
FUNCTION_TYPE_SELECT_VALUE,
// user defined funcion // user defined funcion
FUNCTION_TYPE_UDF = 10000 FUNCTION_TYPE_UDF = 10000
} EFunctionType; } EFunctionType;
...@@ -140,6 +146,7 @@ bool fmIsScalarFunc(int32_t funcId); ...@@ -140,6 +146,7 @@ bool fmIsScalarFunc(int32_t funcId);
bool fmIsNonstandardSQLFunc(int32_t funcId); bool fmIsNonstandardSQLFunc(int32_t funcId);
bool fmIsStringFunc(int32_t funcId); bool fmIsStringFunc(int32_t funcId);
bool fmIsDatetimeFunc(int32_t funcId); bool fmIsDatetimeFunc(int32_t funcId);
bool fmIsSelectFunc(int32_t funcId);
bool fmIsTimelineFunc(int32_t funcId); bool fmIsTimelineFunc(int32_t funcId);
bool fmIsTimeorderFunc(int32_t funcId); bool fmIsTimeorderFunc(int32_t funcId);
bool fmIsPseudoColumnFunc(int32_t funcId); bool fmIsPseudoColumnFunc(int32_t funcId);
...@@ -149,6 +156,7 @@ bool fmIsWindowClauseFunc(int32_t funcId); ...@@ -149,6 +156,7 @@ bool fmIsWindowClauseFunc(int32_t funcId);
bool fmIsSpecialDataRequiredFunc(int32_t funcId); bool fmIsSpecialDataRequiredFunc(int32_t funcId);
bool fmIsDynamicScanOptimizedFunc(int32_t funcId); bool fmIsDynamicScanOptimizedFunc(int32_t funcId);
bool fmIsMultiResFunc(int32_t funcId); bool fmIsMultiResFunc(int32_t funcId);
bool fmIsRepeatScanFunc(int32_t funcId);
bool fmIsUserDefinedFunc(int32_t funcId); bool fmIsUserDefinedFunc(int32_t funcId);
typedef enum EFuncDataRequired { typedef enum EFuncDataRequired {
...@@ -162,6 +170,10 @@ EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWin ...@@ -162,6 +170,10 @@ EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWin
int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet); int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet);
int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet); int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet);
int32_t fmGetUdafExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet);
int32_t fmSetInvertFunc(int32_t funcId, SFuncExecFuncs* pFpSet);
int32_t fmSetNormalFunc(int32_t funcId, SFuncExecFuncs* pFpSet);
bool fmIsInvertible(int32_t funcId);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -22,13 +22,18 @@ ...@@ -22,13 +22,18 @@
#include "tmsg.h" #include "tmsg.h"
#include "tcommon.h" #include "tcommon.h"
#include "function.h" #include "function.h"
#include "tdatablock.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#define UDF_LISTEN_PIPE_NAME_LEN 32 #define UDF_LISTEN_PIPE_NAME_LEN 32
#define UDF_LISTEN_PIPE_NAME_PREFIX "udfd.sock." #ifdef _WIN32
#define UDF_LISTEN_PIPE_NAME_PREFIX "\\\\?\\pipe\\udfd.sock"
#else
#define UDF_LISTEN_PIPE_NAME_PREFIX ".udfd.sock."
#endif
#define UDF_DNODE_ID_ENV_NAME "DNODE_ID" #define UDF_DNODE_ID_ENV_NAME "DNODE_ID"
//====================================================================================== //======================================================================================
...@@ -39,7 +44,8 @@ enum { ...@@ -39,7 +44,8 @@ enum {
UDFC_CODE_PIPE_READ_ERR = -2, UDFC_CODE_PIPE_READ_ERR = -2,
UDFC_CODE_CONNECT_PIPE_ERR = -3, UDFC_CODE_CONNECT_PIPE_ERR = -3,
UDFC_CODE_LOAD_UDF_FAILURE = -4, UDFC_CODE_LOAD_UDF_FAILURE = -4,
UDFC_CODE_INVALID_STATE = -5 UDFC_CODE_INVALID_STATE = -5,
UDFC_CODE_NO_PIPE = -6,
}; };
typedef void *UdfcFuncHandle; typedef void *UdfcFuncHandle;
...@@ -54,14 +60,14 @@ int32_t setupUdf(char udfName[], UdfcFuncHandle *handle); ...@@ -54,14 +60,14 @@ int32_t setupUdf(char udfName[], UdfcFuncHandle *handle);
typedef struct SUdfColumnMeta { typedef struct SUdfColumnMeta {
int16_t type; int16_t type;
int32_t bytes; // <0 var length, others fixed length bytes int32_t bytes;
uint8_t precision; uint8_t precision;
uint8_t scale; uint8_t scale;
} SUdfColumnMeta; } SUdfColumnMeta;
typedef struct SUdfColumnData { typedef struct SUdfColumnData {
int32_t numOfRows; int32_t numOfRows;
bool varLengthColumn; int32_t rowsAlloc;
union { union {
struct { struct {
int32_t nullBitmapLen; int32_t nullBitmapLen;
...@@ -72,9 +78,10 @@ typedef struct SUdfColumnData { ...@@ -72,9 +78,10 @@ typedef struct SUdfColumnData {
struct { struct {
int32_t varOffsetsLen; int32_t varOffsetsLen;
char *varOffsets; int32_t *varOffsets;
int32_t payloadLen; int32_t payloadLen;
char *payload; char *payload;
int32_t payloadAllocLen;
} varLenCol; } varLenCol;
}; };
} SUdfColumnData; } SUdfColumnData;
...@@ -127,20 +134,166 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock); ...@@ -127,20 +134,166 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock);
// begin API to UDF writer. // begin API to UDF writer.
// dynamic lib init and destroy // dynamic lib init and destroy
typedef int32_t (*TUdfSetupFunc)(); typedef int32_t (*TUdfInitFunc)();
typedef int32_t (*TUdfTeardownFunc)(); typedef int32_t (*TUdfDestroyFunc)();
//TODO: add API to check function arguments type, number etc. //TODO: add API to check function arguments type, number etc.
//TODO: another way to manage memory is provide api for UDF to add data to SUdfColumnData and UDF framework will allocate memory.
// then UDF framework will free the memory
//typedef int32_t addFixedLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t colBytes, char* data);
//typedef int32_t addVariableLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t dataLen, char * data);
typedef int32_t (*TUdfFreeUdfColumnFunc)(SUdfColumn* column); #define UDF_MEMORY_EXP_GROWTH 1.5
#define udfColDataIsNull_var(pColumn, row) ((pColumn->colData.varLenCol.varOffsets)[row] == -1)
#define udfColDataIsNull_f(pColumn, row) ((BMCharPos(pColumn->colData.fixLenCol.nullBitmap, row) & (1u << (7u - BitPos(row)))) == (1u << (7u - BitPos(row))))
#define udfColDataSetNull_f(pColumn, row) \
do { \
BMCharPos(pColumn->colData.fixLenCol.nullBitmap, row) |= (1u << (7u - BitPos(row))); \
} while (0)
#define udfColDataSetNotNull_f(pColumn, r_) \
do { \
BMCharPos(pColumn->colData.fixLenCol.nullBitmap, r_) &= ~(1u << (7u - BitPos(r_))); \
} while (0)
#define udfColDataSetNull_var(pColumn, row) ((pColumn->colData.varLenCol.varOffsets)[row] = -1)
static FORCE_INLINE char* udfColDataGetData(const SUdfColumn* pColumn, int32_t row) {
if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
return pColumn->colData.varLenCol.payload + pColumn->colData.varLenCol.varOffsets[row];
} else {
return pColumn->colData.fixLenCol.data + pColumn->colMeta.bytes * row;
}
}
static FORCE_INLINE bool udfColDataIsNull(const SUdfColumn* pColumn, int32_t row) {
if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
if (pColumn->colMeta.type == TSDB_DATA_TYPE_JSON) {
if (udfColDataIsNull_var(pColumn, row)) {
return true;
}
char* data = udfColDataGetData(pColumn, row);
return (*data == TSDB_DATA_TYPE_NULL);
} else {
return udfColDataIsNull_var(pColumn, row);
}
} else {
return udfColDataIsNull_f(pColumn, row);
}
}
static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn* pColumn, int32_t newCapacity) {
SUdfColumnMeta *meta = &pColumn->colMeta;
SUdfColumnData *data = &pColumn->colData;
if (newCapacity== 0 || newCapacity <= data->rowsAlloc) {
return TSDB_CODE_SUCCESS;
}
int allocCapacity = TMAX(data->rowsAlloc, 8);
while (allocCapacity < newCapacity) {
allocCapacity *= UDF_MEMORY_EXP_GROWTH;
}
if (IS_VAR_DATA_TYPE(meta->type)) {
char* tmp = taosMemoryRealloc(data->varLenCol.varOffsets, sizeof(int32_t) * allocCapacity);
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
data->varLenCol.varOffsets = (int32_t*)tmp;
data->varLenCol.varOffsetsLen = sizeof(int32_t) * allocCapacity;
// for payload, add data in udfColDataAppend
} else {
char* tmp = taosMemoryRealloc(data->fixLenCol.nullBitmap, BitmapLen(allocCapacity));
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
data->fixLenCol.nullBitmap = tmp;
data->fixLenCol.nullBitmapLen = BitmapLen(allocCapacity);
if (meta->type == TSDB_DATA_TYPE_NULL) {
return TSDB_CODE_SUCCESS;
}
tmp = taosMemoryRealloc(data->fixLenCol.data, allocCapacity* meta->bytes);
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
data->fixLenCol.data = tmp;
data->fixLenCol.dataLen = allocCapacity* meta->bytes;
}
data->rowsAlloc = allocCapacity;
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE void udfColDataSetNull(SUdfColumn* pColumn, int32_t row) {
udfColEnsureCapacity(pColumn, row+1);
if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
udfColDataSetNull_var(pColumn, row);
} else {
udfColDataSetNull_f(pColumn, row);
}
}
static FORCE_INLINE int32_t udfColDataSet(SUdfColumn* pColumn, uint32_t currentRow, const char* pData, bool isNull) {
SUdfColumnMeta *meta = &pColumn->colMeta;
SUdfColumnData *data = &pColumn->colData;
udfColEnsureCapacity(pColumn, currentRow+1);
bool isVarCol = IS_VAR_DATA_TYPE(meta->type);
if (isNull) {
udfColDataSetNull(pColumn, currentRow);
} else {
if (!isVarCol) {
colDataSetNotNull_f(data->fixLenCol.nullBitmap, currentRow);
memcpy(data->fixLenCol.data + meta->bytes * currentRow, pData, meta->bytes);
} else {
int32_t dataLen = varDataTLen(pData);
if (meta->type == TSDB_DATA_TYPE_JSON) {
if (*pData == TSDB_DATA_TYPE_NULL) {
dataLen = 0;
} else if (*pData == TSDB_DATA_TYPE_NCHAR) {
dataLen = varDataTLen(pData + CHAR_BYTES);
} else if (*pData == TSDB_DATA_TYPE_BIGINT || *pData == TSDB_DATA_TYPE_DOUBLE) {
dataLen = LONG_BYTES;
} else if (*pData == TSDB_DATA_TYPE_BOOL) {
dataLen = CHAR_BYTES;
}
dataLen += CHAR_BYTES;
}
if (data->varLenCol.payloadAllocLen < data->varLenCol.payloadLen + dataLen) {
uint32_t newSize = data->varLenCol.payloadAllocLen;
if (newSize <= 1) {
newSize = 8;
}
while (newSize < data->varLenCol.payloadLen + dataLen) {
newSize = newSize * UDF_MEMORY_EXP_GROWTH;
}
char *buf = taosMemoryRealloc(data->varLenCol.payload, newSize);
if (buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
data->varLenCol.payload = buf;
data->varLenCol.payloadAllocLen = newSize;
}
uint32_t len = data->varLenCol.payloadLen;
data->varLenCol.varOffsets[currentRow] = len;
memcpy(data->varLenCol.payload + len, pData, dataLen);
data->varLenCol.payloadLen += dataLen;
}
}
data->numOfRows = TMAX(currentRow + 1, data->numOfRows);
return 0;
}
typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol); typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol);
typedef int32_t (*TUdfAggStartFunc)(SUdfInterBuf *buf); typedef int32_t (*TUdfAggStartFunc)(SUdfInterBuf *buf);
typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock* block, SUdfInterBuf *interBuf); typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf);
typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf* buf, SUdfInterBuf *resultData); typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf* buf, SUdfInterBuf *resultData);
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#define _TD_INDEX_H_ #define _TD_INDEX_H_
#include "os.h" #include "os.h"
#include "taoserror.h"
#include "tarray.h" #include "tarray.h"
#ifdef __cplusplus #ifdef __cplusplus
...@@ -41,11 +42,23 @@ typedef enum { ...@@ -41,11 +42,23 @@ typedef enum {
UPDATE_VALUE, // update index column value UPDATE_VALUE, // update index column value
ADD_INDEX, // add index on specify column ADD_INDEX, // add index on specify column
DROP_INDEX, // drop existed index DROP_INDEX, // drop existed index
DROP_SATBLE // drop stable DROP_SATBLE, // drop stable
DEFAULT // query
} SIndexOperOnColumn; } SIndexOperOnColumn;
typedef enum { MUST = 0, SHOULD = 1, NOT = 2 } EIndexOperatorType; typedef enum { MUST = 0, SHOULD, NOT } EIndexOperatorType;
typedef enum { QUERY_TERM = 0, QUERY_PREFIX = 1, QUERY_SUFFIX = 2, QUERY_REGEX = 3, QUERY_RANGE = 4 } EIndexQueryType; typedef enum {
QUERY_TERM = 0,
QUERY_PREFIX,
QUERY_SUFFIX,
QUERY_REGEX,
QUERY_LESS_THAN,
QUERY_LESS_EQUAL,
QUERY_GREATER_THAN,
QUERY_GREATER_EQUAL,
QUERY_RANGE,
QUERY_MAX
} EIndexQueryType;
/* /*
* create multi query * create multi query
......
...@@ -310,6 +310,29 @@ typedef struct SCreateFunctionStmt { ...@@ -310,6 +310,29 @@ typedef struct SCreateFunctionStmt {
int32_t bufSize; int32_t bufSize;
} SCreateFunctionStmt; } SCreateFunctionStmt;
typedef struct SDropFunctionStmt {
ENodeType type;
char funcName[TSDB_FUNC_NAME_LEN];
bool ignoreNotExists;
} SDropFunctionStmt;
#define PRIVILEGE_TYPE_MASK(n) (1 << n)
#define PRIVILEGE_TYPE_ALL PRIVILEGE_TYPE_MASK(0)
#define PRIVILEGE_TYPE_READ PRIVILEGE_TYPE_MASK(1)
#define PRIVILEGE_TYPE_WRITE PRIVILEGE_TYPE_MASK(2)
#define PRIVILEGE_TYPE_TEST_MASK(val, mask) (((val) & (mask)) != 0)
typedef struct SGrantStmt {
ENodeType type;
char userName[TSDB_USER_LEN];
char dbName[TSDB_DB_NAME_LEN];
int64_t privileges;
} SGrantStmt;
typedef SGrantStmt SRevokeStmt;
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -28,7 +28,8 @@ extern "C" { ...@@ -28,7 +28,8 @@ extern "C" {
#define LIST_LENGTH(l) (NULL != (l) ? (l)->length : 0) #define LIST_LENGTH(l) (NULL != (l) ? (l)->length : 0)
#define FOREACH(node, list) \ #define FOREACH(node, list) \
for (SListCell* cell = (NULL != (list) ? (list)->pHead : NULL); (NULL != cell ? (node = cell->pNode, true) : (node = NULL, false)); cell = cell->pNext) for (SListCell* cell = (NULL != (list) ? (list)->pHead : NULL); \
(NULL != cell ? (node = cell->pNode, true) : (node = NULL, false)); cell = cell->pNext)
#define REPLACE_NODE(newNode) cell->pNode = (SNode*)(newNode) #define REPLACE_NODE(newNode) cell->pNode = (SNode*)(newNode)
...@@ -44,15 +45,25 @@ extern "C" { ...@@ -44,15 +45,25 @@ extern "C" {
#define ERASE_NODE(list) cell = nodesListErase((list), cell) #define ERASE_NODE(list) cell = nodesListErase((list), cell)
#define FORBOTH(node1, list1, node2, list2) \ #define FORBOTH(node1, list1, node2, list2) \
for (SListCell* cell1 = (NULL != (list1) ? (list1)->pHead : NULL), *cell2 = (NULL != (list2) ? (list2)->pHead : NULL); \ for (SListCell* cell1 = (NULL != (list1) ? (list1)->pHead : NULL), \
(NULL == cell1 ? (node1 = NULL, false) : (node1 = cell1->pNode, true)), (NULL == cell2 ? (node2 = NULL, false) : (node2 = cell2->pNode, true)), (node1 != NULL && node2 != NULL); \ *cell2 = (NULL != (list2) ? (list2)->pHead : NULL); \
(NULL == cell1 ? (node1 = NULL, false) : (node1 = cell1->pNode, true)), \
(NULL == cell2 ? (node2 = NULL, false) : (node2 = cell2->pNode, true)), \
(node1 != NULL && node2 != NULL); \
cell1 = cell1->pNext, cell2 = cell2->pNext) cell1 = cell1->pNext, cell2 = cell2->pNext)
#define REPLACE_LIST1_NODE(newNode) cell1->pNode = (SNode*)(newNode) #define REPLACE_LIST1_NODE(newNode) cell1->pNode = (SNode*)(newNode)
#define REPLACE_LIST2_NODE(newNode) cell2->pNode = (SNode*)(newNode) #define REPLACE_LIST2_NODE(newNode) cell2->pNode = (SNode*)(newNode)
#define FOREACH_FOR_REWRITE(node, list) \ #define FOREACH_FOR_REWRITE(node, list) \
for (SListCell* cell = (NULL != (list) ? (list)->pHead : NULL); (NULL != cell ? (node = &(cell->pNode), true) : (node = NULL, false)); cell = cell->pNext) for (SListCell* cell = (NULL != (list) ? (list)->pHead : NULL); \
(NULL != cell ? (node = &(cell->pNode), true) : (node = NULL, false)); cell = cell->pNext)
#define DESTORY_LIST(list) \
do { \
nodesDestroyList(list); \
list = NULL; \
} while (0)
typedef enum ENodeType { typedef enum ENodeType {
// Syntax nodes are used in parser and planner module, and some are also used in executor module, such as COLUMN, // Syntax nodes are used in parser and planner module, and some are also used in executor module, such as COLUMN,
...@@ -132,6 +143,8 @@ typedef enum ENodeType { ...@@ -132,6 +143,8 @@ typedef enum ENodeType {
QUERY_NODE_REDISTRIBUTE_VGROUP_STMT, QUERY_NODE_REDISTRIBUTE_VGROUP_STMT,
QUERY_NODE_SPLIT_VGROUP_STMT, QUERY_NODE_SPLIT_VGROUP_STMT,
QUERY_NODE_SYNCDB_STMT, QUERY_NODE_SYNCDB_STMT,
QUERY_NODE_GRANT_STMT,
QUERY_NODE_REVOKE_STMT,
QUERY_NODE_SHOW_DNODES_STMT, QUERY_NODE_SHOW_DNODES_STMT,
QUERY_NODE_SHOW_MNODES_STMT, QUERY_NODE_SHOW_MNODES_STMT,
QUERY_NODE_SHOW_MODULES_STMT, QUERY_NODE_SHOW_MODULES_STMT,
...@@ -151,7 +164,6 @@ typedef enum ENodeType { ...@@ -151,7 +164,6 @@ typedef enum ENodeType {
QUERY_NODE_SHOW_TOPICS_STMT, QUERY_NODE_SHOW_TOPICS_STMT,
QUERY_NODE_SHOW_CONSUMERS_STMT, QUERY_NODE_SHOW_CONSUMERS_STMT,
QUERY_NODE_SHOW_SUBSCRIBES_STMT, QUERY_NODE_SHOW_SUBSCRIBES_STMT,
QUERY_NODE_SHOW_TRANS_STMT,
QUERY_NODE_SHOW_SMAS_STMT, QUERY_NODE_SHOW_SMAS_STMT,
QUERY_NODE_SHOW_CONFIGS_STMT, QUERY_NODE_SHOW_CONFIGS_STMT,
QUERY_NODE_SHOW_CONNECTIONS_STMT, QUERY_NODE_SHOW_CONNECTIONS_STMT,
...@@ -163,8 +175,11 @@ typedef enum ENodeType { ...@@ -163,8 +175,11 @@ typedef enum ENodeType {
QUERY_NODE_SHOW_CREATE_DATABASE_STMT, QUERY_NODE_SHOW_CREATE_DATABASE_STMT,
QUERY_NODE_SHOW_CREATE_TABLE_STMT, QUERY_NODE_SHOW_CREATE_TABLE_STMT,
QUERY_NODE_SHOW_CREATE_STABLE_STMT, QUERY_NODE_SHOW_CREATE_STABLE_STMT,
QUERY_NODE_SHOW_TRANSACTIONS_STMT,
QUERY_NODE_KILL_CONNECTION_STMT, QUERY_NODE_KILL_CONNECTION_STMT,
QUERY_NODE_KILL_QUERY_STMT, QUERY_NODE_KILL_QUERY_STMT,
QUERY_NODE_KILL_TRANSACTION_STMT,
QUERY_NODE_QUERY,
// logic plan node // logic plan node
QUERY_NODE_LOGIC_PLAN_SCAN, QUERY_NODE_LOGIC_PLAN_SCAN,
...@@ -174,6 +189,7 @@ typedef enum ENodeType { ...@@ -174,6 +189,7 @@ typedef enum ENodeType {
QUERY_NODE_LOGIC_PLAN_VNODE_MODIF, QUERY_NODE_LOGIC_PLAN_VNODE_MODIF,
QUERY_NODE_LOGIC_PLAN_EXCHANGE, QUERY_NODE_LOGIC_PLAN_EXCHANGE,
QUERY_NODE_LOGIC_PLAN_WINDOW, QUERY_NODE_LOGIC_PLAN_WINDOW,
QUERY_NODE_LOGIC_PLAN_FILL,
QUERY_NODE_LOGIC_PLAN_SORT, QUERY_NODE_LOGIC_PLAN_SORT,
QUERY_NODE_LOGIC_PLAN_PARTITION, QUERY_NODE_LOGIC_PLAN_PARTITION,
QUERY_NODE_LOGIC_SUBPLAN, QUERY_NODE_LOGIC_SUBPLAN,
...@@ -191,6 +207,8 @@ typedef enum ENodeType { ...@@ -191,6 +207,8 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE,
QUERY_NODE_PHYSICAL_PLAN_SORT, QUERY_NODE_PHYSICAL_PLAN_SORT,
QUERY_NODE_PHYSICAL_PLAN_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_FILL,
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW, QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW,
QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW, QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW,
QUERY_NODE_PHYSICAL_PLAN_PARTITION, QUERY_NODE_PHYSICAL_PLAN_PARTITION,
...@@ -240,12 +258,7 @@ void nodesDestroyList(SNodeList* pList); ...@@ -240,12 +258,7 @@ void nodesDestroyList(SNodeList* pList);
// Only clear the linked list structure, without releasing the elements inside // Only clear the linked list structure, without releasing the elements inside
void nodesClearList(SNodeList* pList); void nodesClearList(SNodeList* pList);
typedef enum EDealRes { typedef enum EDealRes { DEAL_RES_CONTINUE = 1, DEAL_RES_IGNORE_CHILD, DEAL_RES_ERROR, DEAL_RES_END } EDealRes;
DEAL_RES_CONTINUE = 1,
DEAL_RES_IGNORE_CHILD,
DEAL_RES_ERROR,
DEAL_RES_END
} EDealRes;
typedef EDealRes (*FNodeWalker)(SNode* pNode, void* pContext); typedef EDealRes (*FNodeWalker)(SNode* pNode, void* pContext);
void nodesWalkExpr(SNodeptr pNode, FNodeWalker walker, void* pContext); void nodesWalkExpr(SNodeptr pNode, FNodeWalker walker, void* pContext);
...@@ -271,8 +284,8 @@ int32_t nodesStringToNode(const char* pStr, SNode** pNode); ...@@ -271,8 +284,8 @@ int32_t nodesStringToNode(const char* pStr, SNode** pNode);
int32_t nodesListToString(const SNodeList* pList, bool format, char** pStr, int32_t* pLen); int32_t nodesListToString(const SNodeList* pList, bool format, char** pStr, int32_t* pLen);
int32_t nodesStringToList(const char* pStr, SNodeList** pList); int32_t nodesStringToList(const char* pStr, SNodeList** pList);
int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len); int32_t nodesNodeToSQL(SNode* pNode, char* buf, int32_t bufSize, int32_t* len);
char *nodesGetNameFromColumnNode(SNode *pNode); char* nodesGetNameFromColumnNode(SNode* pNode);
int32_t nodesGetOutputNumFromSlotList(SNodeList* pSlots); int32_t nodesGetOutputNumFromSlotList(SNodeList* pSlots);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -102,7 +102,6 @@ typedef struct SWindowLogicNode { ...@@ -102,7 +102,6 @@ typedef struct SWindowLogicNode {
int64_t sliding; int64_t sliding;
int8_t intervalUnit; int8_t intervalUnit;
int8_t slidingUnit; int8_t slidingUnit;
SFillNode* pFill;
int64_t sessionGap; int64_t sessionGap;
SNode* pTspk; SNode* pTspk;
SNode* pStateExpr; SNode* pStateExpr;
...@@ -110,6 +109,14 @@ typedef struct SWindowLogicNode { ...@@ -110,6 +109,14 @@ typedef struct SWindowLogicNode {
int64_t watermark; int64_t watermark;
} SWindowLogicNode; } SWindowLogicNode;
typedef struct SFillLogicNode {
SLogicNode node;
EFillMode mode;
SNode* pWStartTs;
SNode* pValues; // SNodeListNode
STimeWindow timeRange;
} SFillLogicNode;
typedef struct SSortLogicNode { typedef struct SSortLogicNode {
SLogicNode node; SLogicNode node;
SNodeList* pSortKeys; SNodeList* pSortKeys;
...@@ -223,10 +230,12 @@ typedef struct SProjectPhysiNode { ...@@ -223,10 +230,12 @@ typedef struct SProjectPhysiNode {
typedef struct SJoinPhysiNode { typedef struct SJoinPhysiNode {
SPhysiNode node; SPhysiNode node;
EJoinType joinType; EJoinType joinType;
SNode* pOnConditions; // in or out tuple ? SNode* pOnConditions;
SNodeList* pTargets; SNodeList* pTargets;
} SJoinPhysiNode; } SJoinPhysiNode;
typedef SJoinPhysiNode SSortMergeJoinPhysiNode;
typedef struct SAggPhysiNode { typedef struct SAggPhysiNode {
SPhysiNode node; SPhysiNode node;
SNodeList* pExprs; // these are expression list of group_by_clause and parameter expression of aggregate function SNodeList* pExprs; // these are expression list of group_by_clause and parameter expression of aggregate function
...@@ -263,9 +272,19 @@ typedef struct SIntervalPhysiNode { ...@@ -263,9 +272,19 @@ typedef struct SIntervalPhysiNode {
int64_t sliding; int64_t sliding;
int8_t intervalUnit; int8_t intervalUnit;
int8_t slidingUnit; int8_t slidingUnit;
SFillNode* pFill;
} SIntervalPhysiNode; } SIntervalPhysiNode;
typedef SIntervalPhysiNode SStreamIntervalPhysiNode;
typedef struct SFillPhysiNode {
SPhysiNode node;
EFillMode mode;
SNode* pWStartTs; // SColumnNode
SNode* pValues; // SNodeListNode
SNodeList* pTargets;
STimeWindow timeRange;
} SFillPhysiNode;
typedef struct SMultiTableIntervalPhysiNode { typedef struct SMultiTableIntervalPhysiNode {
SIntervalPhysiNode interval; SIntervalPhysiNode interval;
SNodeList* pPartitionKeys; SNodeList* pPartitionKeys;
......
...@@ -88,6 +88,7 @@ typedef struct SValueNode { ...@@ -88,6 +88,7 @@ typedef struct SValueNode {
double d; double d;
char* p; char* p;
} datum; } datum;
int64_t typeData;
char unit; char unit;
} SValueNode; } SValueNode;
...@@ -211,6 +212,8 @@ typedef struct SFillNode { ...@@ -211,6 +212,8 @@ typedef struct SFillNode {
ENodeType type; // QUERY_NODE_FILL ENodeType type; // QUERY_NODE_FILL
EFillMode mode; EFillMode mode;
SNode* pValues; // SNodeListNode SNode* pValues; // SNodeListNode
SNode* pWStartTs; // _wstartts pseudo column
STimeWindow timeRange;
} SFillNode; } SFillNode;
typedef struct SSelectStmt { typedef struct SSelectStmt {
...@@ -230,6 +233,8 @@ typedef struct SSelectStmt { ...@@ -230,6 +233,8 @@ typedef struct SSelectStmt {
uint8_t precision; uint8_t precision;
bool isEmptyResult; bool isEmptyResult;
bool hasAggFuncs; bool hasAggFuncs;
bool hasRepeatScanFuncs;
bool isTimeOrderQuery;
} SSelectStmt; } SSelectStmt;
typedef enum ESetOperatorType { SET_OP_TYPE_UNION_ALL = 1, SET_OP_TYPE_UNION } ESetOperatorType; typedef enum ESetOperatorType { SET_OP_TYPE_UNION_ALL = 1, SET_OP_TYPE_UNION } ESetOperatorType;
...@@ -290,16 +295,48 @@ typedef struct SExplainStmt { ...@@ -290,16 +295,48 @@ typedef struct SExplainStmt {
SNode* pQuery; SNode* pQuery;
} SExplainStmt; } SExplainStmt;
typedef struct SCmdMsgInfo {
int16_t msgType;
SEpSet epSet;
void* pMsg;
int32_t msgLen;
void* pExtension; // todo remove it soon
} SCmdMsgInfo;
typedef enum EQueryExecMode {
QUERY_EXEC_MODE_LOCAL = 1,
QUERY_EXEC_MODE_RPC,
QUERY_EXEC_MODE_SCHEDULE,
QUERY_EXEC_MODE_EMPTY_RESULT
} EQueryExecMode;
typedef struct SQuery {
ENodeType type;
EQueryExecMode execMode;
bool haveResultSet;
SNode* pRoot;
int32_t numOfResCols;
SSchema* pResSchema;
int8_t precision;
SCmdMsgInfo* pCmdMsg;
int32_t msgType;
SArray* pDbList;
SArray* pTableList;
bool showRewrite;
int32_t placeholderNum;
} SQuery;
void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext); void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext);
void nodesRewriteSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeRewriter rewriter, void* pContext); void nodesRewriteSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeRewriter rewriter, void* pContext);
typedef enum ECollectColType { COLLECT_COL_TYPE_COL = 1, COLLECT_COL_TYPE_TAG, COLLECT_COL_TYPE_ALL } ECollectColType; typedef enum ECollectColType { COLLECT_COL_TYPE_COL = 1, COLLECT_COL_TYPE_TAG, COLLECT_COL_TYPE_ALL } ECollectColType;
int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, const char* pTableAlias, ECollectColType type, int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, const char* pTableAlias, ECollectColType type,
SNodeList** pCols); SNodeList** pCols);
typedef bool (*FFuncClassifier)(int32_t funcId); typedef bool (*FFuncClassifier)(int32_t funcId);
int32_t nodesCollectFuncs(SSelectStmt* pSelect, FFuncClassifier classifier, SNodeList** pFuncs); int32_t nodesCollectFuncs(SSelectStmt* pSelect, ESqlClause clause, FFuncClassifier classifier, SNodeList** pFuncs);
int32_t nodesCollectSpecialNodes(SSelectStmt* pSelect, ESqlClause clause, ENodeType type, SNodeList** pNodes);
bool nodesIsExprNode(const SNode* pNode); bool nodesIsExprNode(const SNode* pNode);
...@@ -312,6 +349,7 @@ bool nodesIsTimeorderQuery(const SNode* pQuery); ...@@ -312,6 +349,7 @@ bool nodesIsTimeorderQuery(const SNode* pQuery);
bool nodesIsTimelineQuery(const SNode* pQuery); bool nodesIsTimelineQuery(const SNode* pQuery);
void* nodesGetValueFromNode(SValueNode* pNode); void* nodesGetValueFromNode(SValueNode* pNode);
int32_t nodesSetValueNodeValue(SValueNode* pNode, void* value);
char* nodesGetStrValueFromNode(SValueNode* pNode); char* nodesGetStrValueFromNode(SValueNode* pNode);
char* getFillModeString(EFillMode mode); char* getFillModeString(EFillMode mode);
void valueNodeToVariant(const SValueNode* pNode, SVariant* pVal); void valueNodeToVariant(const SValueNode* pNode, SVariant* pVal);
......
...@@ -20,8 +20,8 @@ ...@@ -20,8 +20,8 @@
extern "C" { extern "C" {
#endif #endif
#include "querynodes.h"
#include "query.h" #include "query.h"
#include "querynodes.h"
typedef struct SStmtCallback { typedef struct SStmtCallback {
TAOS_STMT* pStmt; TAOS_STMT* pStmt;
...@@ -34,48 +34,20 @@ typedef struct SStmtCallback { ...@@ -34,48 +34,20 @@ typedef struct SStmtCallback {
typedef struct SParseContext { typedef struct SParseContext {
uint64_t requestId; uint64_t requestId;
int32_t acctId; int32_t acctId;
const char *db; const char* db;
bool topicQuery; bool topicQuery;
void *pTransporter; void* pTransporter;
SEpSet mgmtEpSet; SEpSet mgmtEpSet;
const char *pSql; // sql string const char* pSql; // sql string
size_t sqlLen; // length of the sql string size_t sqlLen; // length of the sql string
char *pMsg; // extended error message if exists to help identifying the problem in sql statement. char* pMsg; // extended error message if exists to help identifying the problem in sql statement.
int32_t msgLen; // max length of the msg int32_t msgLen; // max length of the msg
struct SCatalog *pCatalog; struct SCatalog* pCatalog;
SStmtCallback *pStmtCb; SStmtCallback* pStmtCb;
const char* pUser;
bool isSuperUser;
} SParseContext; } SParseContext;
typedef struct SCmdMsgInfo {
int16_t msgType;
SEpSet epSet;
void* pMsg;
int32_t msgLen;
void* pExtension; // todo remove it soon
} SCmdMsgInfo;
typedef enum EQueryExecMode {
QUERY_EXEC_MODE_LOCAL = 1,
QUERY_EXEC_MODE_RPC,
QUERY_EXEC_MODE_SCHEDULE,
QUERY_EXEC_MODE_EMPTY_RESULT
} EQueryExecMode;
typedef struct SQuery {
EQueryExecMode execMode;
bool haveResultSet;
SNode* pRoot;
int32_t numOfResCols;
SSchema* pResSchema;
int8_t precision;
SCmdMsgInfo* pCmdMsg;
int32_t msgType;
SArray* pDbList;
SArray* pTableList;
bool showRewrite;
int32_t placeholderNum;
} SQuery;
int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery); int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery);
bool isInsertSql(const char* pStr, size_t length); bool isInsertSql(const char* pStr, size_t length);
...@@ -89,14 +61,23 @@ int32_t qCloneStmtDataBlock(void** pDst, void* pSrc); ...@@ -89,14 +61,23 @@ int32_t qCloneStmtDataBlock(void** pDst, void* pSrc);
void qFreeStmtDataBlock(void* pDataBlock); void qFreeStmtDataBlock(void* pDataBlock);
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc); int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc);
void qDestroyStmtDataBlock(void* pBlock); void qDestroyStmtDataBlock(void* pBlock);
int32_t qBindStmtColsValue(void *pBlock, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen);
int32_t qBindStmtSingleColValue(void *pBlock, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen, int32_t colIdx, int32_t rowNum); int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen);
int32_t qBuildStmtColFields(void *pDataBlock, int32_t *fieldNum, TAOS_FIELD** fields); int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
int32_t qBuildStmtTagFields(void *pBlock, void *boundTags, int32_t *fieldNum, TAOS_FIELD** fields); int32_t rowNum);
int32_t qBindStmtTagsValue(void *pBlock, void *boundTags, int64_t suid, SName *pName, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen); int32_t qBuildStmtColFields(void* pDataBlock, int32_t* fieldNum, TAOS_FIELD** fields);
int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD** fields);
int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, char* tName, TAOS_MULTI_BIND* bind,
char* msgBuf, int32_t msgBufLen);
void destroyBoundColumnInfo(void* pBoundInfo); void destroyBoundColumnInfo(void* pBoundInfo);
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char *msgBuf, int32_t msgBufLen); int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf,
int32_t msgBufLen);
void* smlInitHandle(SQuery* pQuery);
void smlDestroyHandle(void* pHandle);
int32_t smlBindData(void* handle, SArray* tags, SArray* colsFormat, SArray* colsSchema, SArray* cols, bool format,
STableMeta* pTableMeta, char* tableName, char* msgBuf, int16_t msgBufLen);
int32_t smlBuildOutput(void* handle, SHashObj* pVgHash);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -35,8 +35,6 @@ typedef struct SPlanContext { ...@@ -35,8 +35,6 @@ typedef struct SPlanContext {
int8_t triggerType; int8_t triggerType;
int64_t watermark; int64_t watermark;
int32_t placeholderNum; int32_t placeholderNum;
void* pTransporter;
struct SCatalog* pCatalog;
char* pMsg; char* pMsg;
int32_t msgLen; int32_t msgLen;
} SPlanContext; } SPlanContext;
......
...@@ -24,6 +24,7 @@ extern "C" { ...@@ -24,6 +24,7 @@ extern "C" {
#include "thash.h" #include "thash.h"
#include "tlog.h" #include "tlog.h"
#include "tmsg.h" #include "tmsg.h"
#include "tmsgcb.h"
typedef enum { typedef enum {
JOB_TASK_STATUS_NULL = 0, JOB_TASK_STATUS_NULL = 0,
...@@ -149,7 +150,8 @@ int32_t cleanupTaskQueue(); ...@@ -149,7 +150,8 @@ int32_t cleanupTaskQueue();
*/ */
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, bool persistHandle, void *ctx); int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo,
bool persistHandle, void* ctx);
/** /**
* Asynchronously send message to server, after the response received, the callback will be incured. * Asynchronously send message to server, after the response received, the callback will be incured.
...@@ -182,7 +184,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t ...@@ -182,7 +184,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE #define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE
#define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE #define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE
#define NEED_CLIENT_RM_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_TDB_INVALID_TABLE_ID || (_code) == TSDB_CODE_VND_TB_NOT_EXIST) #define NEED_CLIENT_RM_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_PAR_TABLE_NOT_EXIST || (_code) == TSDB_CODE_VND_TB_NOT_EXIST)
#define NEED_CLIENT_REFRESH_VG_ERROR(_code) ((_code) == TSDB_CODE_VND_HASH_MISMATCH || (_code) == TSDB_CODE_VND_INVALID_VGROUP_ID) #define NEED_CLIENT_REFRESH_VG_ERROR(_code) ((_code) == TSDB_CODE_VND_HASH_MISMATCH || (_code) == TSDB_CODE_VND_INVALID_VGROUP_ID)
#define NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_TDB_TABLE_RECREATED) #define NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_TDB_TABLE_RECREATED)
#define NEED_CLIENT_HANDLE_ERROR(_code) (NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code)) #define NEED_CLIENT_HANDLE_ERROR(_code) (NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code))
......
...@@ -91,6 +91,8 @@ int32_t winDurFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu ...@@ -91,6 +91,8 @@ int32_t winDurFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
int32_t qStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t qStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t qEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t qEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
此差异已折叠。
...@@ -36,7 +36,7 @@ typedef struct SUpdateInfo { ...@@ -36,7 +36,7 @@ typedef struct SUpdateInfo {
SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark); SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark);
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark); SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark);
bool isUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts); bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts);
void updateInfoDestroy(SUpdateInfo *pInfo); void updateInfoDestroy(SUpdateInfo *pInfo);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -149,6 +149,8 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); ...@@ -149,6 +149,8 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg);
ESyncState syncGetMyRole(int64_t rid); ESyncState syncGetMyRole(int64_t rid);
const char* syncGetMyRoleStr(int64_t rid); const char* syncGetMyRoleStr(int64_t rid);
SyncTerm syncGetMyTerm(int64_t rid); SyncTerm syncGetMyTerm(int64_t rid);
void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
int32_t syncGetVgId(int64_t rid);
typedef enum { typedef enum {
TAOS_SYNC_PROPOSE_SUCCESS = 0, TAOS_SYNC_PROPOSE_SUCCESS = 0,
......
...@@ -68,6 +68,7 @@ typedef int (*RpcAfp)(void *parent, char *tableId, char *spi, char *encrypt, cha ...@@ -68,6 +68,7 @@ typedef int (*RpcAfp)(void *parent, char *tableId, char *spi, char *encrypt, cha
typedef bool (*RpcRfp)(int32_t code); typedef bool (*RpcRfp)(int32_t code);
typedef struct SRpcInit { typedef struct SRpcInit {
char localFqdn[TSDB_FQDN_LEN];
uint16_t localPort; // local port uint16_t localPort; // local port
char * label; // for debug purpose char * label; // for debug purpose
int numOfThreads; // number of threads to handle connections int numOfThreads; // number of threads to handle connections
......
...@@ -208,6 +208,7 @@ int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readN ...@@ -208,6 +208,7 @@ int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readN
int64_t walGetFirstVer(SWal *); int64_t walGetFirstVer(SWal *);
int64_t walGetSnapshotVer(SWal *); int64_t walGetSnapshotVer(SWal *);
int64_t walGetLastVer(SWal *); int64_t walGetLastVer(SWal *);
int64_t walGetCommittedVer(SWal *);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册