提交 0cb811a5 编写于 作者: X Xiaoyu Wang

Merge remote-tracking branch 'origin/3.0' into feature/3.0_wxy

......@@ -6,40 +6,14 @@ project(
DESCRIPTION "An open-source big data platform designed and optimized for the Internet of Things(IOT)"
)
IF ("${BUILD_TOOLS}" STREQUAL "")
IF (TD_LINUX)
IF (TD_ARM_32)
SET(BUILD_TOOLS "false")
ELSEIF (TD_ARM_64)
SET(BUILD_TOOLS "false")
ELSE ()
SET(BUILD_TOOLS "false")
ENDIF ()
ELSEIF (TD_DARWIN)
SET(BUILD_TOOLS "false")
ELSE ()
SET(BUILD_TOOLS "false")
ENDIF ()
ENDIF ()
IF ("${BUILD_TOOLS}" MATCHES "false")
MESSAGE("${Yellow} Will _not_ build taos_tools! ${ColourReset}")
SET(TD_TAOS_TOOLS FALSE)
ELSE ()
MESSAGE("")
MESSAGE("${Green} Will build taos_tools! ${ColourReset}")
MESSAGE("")
SET(TD_TAOS_TOOLS TRUE)
ENDIF ()
set(CMAKE_SUPPORT_DIR "${CMAKE_SOURCE_DIR}/cmake")
set(CMAKE_CONTRIB_DIR "${CMAKE_SOURCE_DIR}/contrib")
include(${CMAKE_SUPPORT_DIR}/cmake.platform)
include(${CMAKE_SUPPORT_DIR}/cmake.define)
include(${CMAKE_SUPPORT_DIR}/cmake.options)
include(${CMAKE_SUPPORT_DIR}/cmake.version)
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -fPIC -gdwarf-2 -msse4.2 -mfma -g3")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -fPIC -gdwarf-2 -msse4.2 -mfma -g3")
# contrib
add_subdirectory(contrib)
......@@ -52,6 +26,7 @@ if(${BUILD_TEST})
include(CTest)
enable_testing()
endif(${BUILD_TEST})
add_subdirectory(source)
add_subdirectory(tools)
add_subdirectory(tests)
......
cmake_minimum_required(VERSION 3.16)
IF ("${BUILD_TOOLS}" STREQUAL "")
IF (TD_LINUX)
IF (TD_ARM_32)
SET(BUILD_TOOLS "false")
ELSEIF (TD_ARM_64)
SET(BUILD_TOOLS "false")
ELSE ()
SET(BUILD_TOOLS "false")
ENDIF ()
ELSEIF (TD_DARWIN)
SET(BUILD_TOOLS "false")
ELSE ()
SET(BUILD_TOOLS "false")
ENDIF ()
ENDIF ()
IF ("${BUILD_TOOLS}" MATCHES "false")
MESSAGE("${Yellow} Will _not_ build taos_tools! ${ColourReset}")
SET(TD_TAOS_TOOLS FALSE)
ELSE ()
MESSAGE("")
MESSAGE("${Green} Will build taos_tools! ${ColourReset}")
MESSAGE("")
SET(TD_TAOS_TOOLS TRUE)
ENDIF ()
IF (TD_WINDOWS)
MESSAGE("${Yellow} set compiler flag for Windows! ${ColourReset}")
SET(CMAKE_GENERATOR "NMake Makefiles" CACHE INTERNAL "" FORCE)
SET(COMMON_FLAGS "/nologo /WX /wd4018 /wd4999 /Oi /Oy- /Gm- /EHsc /MT /GS /Gy /fp:precise /Zc:wchar_t /Zc:forScope /Gd /errorReport:prompt /analyze-")
IF (MSVC AND (MSVC_VERSION GREATER_EQUAL 1900))
SET(COMMON_FLAGS "${COMMON_FLAGS} /Wv:18")
ENDIF ()
ELSE ()
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -fPIC -gdwarf-2 -msse4.2 -mfma -g3")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -fPIC -gdwarf-2 -msse4.2 -mfma -g3")
ENDIF ()
# =========================================================
# Deps options
# =========================================================
IF(${TD_WINDOWS})
MESSAGE("build pthread Win32")
option(
BUILD_PTHREAD
"If build pthread on Windows"
ON
)
MESSAGE("build gnu regex for Windows")
option(
BUILD_GNUREGEX
"If build gnu regex on Windows"
ON
)
ENDIF ()
IF(${TD_LINUX} MATCHES TRUE)
option(
BUILD_TEST
"If build unit tests using googletest"
ON
)
ENDIF ()
option(
BUILD_WITH_LEVELDB
"If build with leveldb"
......@@ -25,11 +48,16 @@ option(
OFF
)
option(
BUILD_WITH_BDB
"If build with BerkleyDB"
ON
)
IF(${TD_WINDOWS})
MESSAGE("Not build BDB on Windows")
ELSE ()
option(
BUILD_WITH_BDB
"If build with BerkleyDB"
ON
)
ENDIF ()
option(
BUILD_WITH_LUCENE
......@@ -68,12 +96,16 @@ option(
OFF
)
IF(${TD_LINUX} MATCHES TRUE)
option(
BUILD_DEPENDENCY_TESTS
"If build dependency tests"
ON
)
ENDIF ()
option(
BUILD_DOCS
"If use doxygen build documents"
......
cmake_minimum_required(VERSION 3.16)
MESSAGE("Current system is ${CMAKE_SYSTEM_NAME}")
# init
SET(TD_LINUX FALSE)
SET(TD_WINDOWS FALSE)
SET(TD_DARWIN FALSE)
IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux")
SET(TD_LINUX TRUE)
SET(OSTYPE "Linux")
ADD_DEFINITIONS("-DLINUX")
IF (${CMAKE_SIZEOF_VOID_P} MATCHES 8)
SET(TD_LINUX_64 TRUE)
ELSE ()
SET(TD_LINUX_32 TRUE)
ENDIF ()
ELSEIF (${CMAKE_SYSTEM_NAME} MATCHES "Darwin")
SET(TD_DARWIN TRUE)
SET(OSTYPE "macOS")
ADD_DEFINITIONS("-DDARWIN")
IF (${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm64")
MESSAGE("Current system arch is arm64")
SET(TD_DARWIN_64 TRUE)
ADD_DEFINITIONS("-D_TD_DARWIN_64")
ENDIF ()
ADD_DEFINITIONS("-DHAVE_UNISTD_H")
ELSEIF (${CMAKE_SYSTEM_NAME} MATCHES "Windows")
SET(TD_WINDOWS TRUE)
SET(OSTYPE "Windows")
ADD_DEFINITIONS("-DWINDOWS")
IF (${CMAKE_SIZEOF_VOID_P} MATCHES 8)
SET(TD_WINDOWS_64 TRUE)
ADD_DEFINITIONS("-D_TD_WINDOWS_64")
ELSE ()
SET(TD_WINDOWS_32 TRUE)
ADD_DEFINITIONS("-D_TD_WINDOWS_32")
ENDIF ()
ENDIF()
MESSAGE("C Compiler ID: ${CMAKE_C_COMPILER_ID}")
MESSAGE("CXX Compiler ID: ${CMAKE_CXX_COMPILER_ID}")
# gnuregex
ExternalProject_Add(gnuregex
URL https://launchpad.net/gnuregex/trunk/2.9/+download/libgnurx-src-2.9.zip
DOWNLOAD_NAME libgnurx-src.zip
SOURCE_DIR "${CMAKE_CONTRIB_DIR}/gnuregex"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
)
# pthread
ExternalProject_Add(pthread
GIT_REPOSITORY https://github.com/GerHobbelt/pthread-win32
GIT_TAG v3.0.3.1
SOURCE_DIR "${CMAKE_CONTRIB_DIR}/pthread-win32"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
)
......@@ -9,6 +9,16 @@ endfunction(cat IN_FILE OUT_FILE)
set(CONTRIB_TMP_FILE "${CMAKE_BINARY_DIR}/deps_tmp_CMakeLists.txt.in")
configure_file("${CMAKE_SUPPORT_DIR}/deps_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
# pthread
if(${BUILD_PTHREAD})
cat("${CMAKE_SUPPORT_DIR}/pthread_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif()
# gnu regex
if(${BUILD_GNUREGEX})
cat("${CMAKE_SUPPORT_DIR}/gnuregex_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif()
# googletest
if(${BUILD_TEST})
cat("${CMAKE_SUPPORT_DIR}/gtest_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
......@@ -193,7 +203,10 @@ endif(${BUILD_WITH_TRAFT})
# LIBUV
if(${BUILD_WITH_UV})
add_compile_options(-Wno-sign-compare)
if (NOT ${CMAKE_SYSTEM_NAME} MATCHES "Windows")
MESSAGE("Windows need set no-sign-compare")
add_compile_options(-Wno-sign-compare)
endif ()
add_subdirectory(libuv)
endif(${BUILD_WITH_UV})
......@@ -224,6 +237,7 @@ if(${BUILD_WITH_SQLITE})
)
endif(${BUILD_WITH_SQLITE})
# pthread
# ================================================================================================
......
......@@ -31,27 +31,27 @@ typedef void TAOS_SUB;
typedef void **TAOS_ROW;
// Data type definition
#define TSDB_DATA_TYPE_NULL 0 // 1 bytes
#define TSDB_DATA_TYPE_BOOL 1 // 1 bytes
#define TSDB_DATA_TYPE_TINYINT 2 // 1 byte
#define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes
#define TSDB_DATA_TYPE_INT 4 // 4 bytes
#define TSDB_DATA_TYPE_BIGINT 5 // 8 bytes
#define TSDB_DATA_TYPE_FLOAT 6 // 4 bytes
#define TSDB_DATA_TYPE_DOUBLE 7 // 8 bytes
#define TSDB_DATA_TYPE_BINARY 8 // string, alias for varchar
#define TSDB_DATA_TYPE_TIMESTAMP 9 // 8 bytes
#define TSDB_DATA_TYPE_NCHAR 10 // unicode string
#define TSDB_DATA_TYPE_UTINYINT 11 // 1 byte
#define TSDB_DATA_TYPE_USMALLINT 12 // 2 bytes
#define TSDB_DATA_TYPE_UINT 13 // 4 bytes
#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes
#define TSDB_DATA_TYPE_VARCHAR 15 // string
#define TSDB_DATA_TYPE_VARBINARY 16 // binary
#define TSDB_DATA_TYPE_JSON 17 // json
#define TSDB_DATA_TYPE_DECIMAL 18 // decimal
#define TSDB_DATA_TYPE_BLOB 19 // binary
#define TSDB_DATA_TYPE_MEDIUMBLOB 20
#define TSDB_DATA_TYPE_NULL 0 // 1 bytes
#define TSDB_DATA_TYPE_BOOL 1 // 1 bytes
#define TSDB_DATA_TYPE_TINYINT 2 // 1 byte
#define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes
#define TSDB_DATA_TYPE_INT 4 // 4 bytes
#define TSDB_DATA_TYPE_BIGINT 5 // 8 bytes
#define TSDB_DATA_TYPE_FLOAT 6 // 4 bytes
#define TSDB_DATA_TYPE_DOUBLE 7 // 8 bytes
#define TSDB_DATA_TYPE_VARCHAR 8 // string, alias for varchar
#define TSDB_DATA_TYPE_TIMESTAMP 9 // 8 bytes
#define TSDB_DATA_TYPE_NCHAR 10 // unicode string
#define TSDB_DATA_TYPE_UTINYINT 11 // 1 byte
#define TSDB_DATA_TYPE_USMALLINT 12 // 2 bytes
#define TSDB_DATA_TYPE_UINT 13 // 4 bytes
#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes
#define TSDB_DATA_TYPE_JSON 15 // json string
#define TSDB_DATA_TYPE_VARBINARY 16 // binary
#define TSDB_DATA_TYPE_DECIMAL 17 // decimal
#define TSDB_DATA_TYPE_BLOB 18 // binary
#define TSDB_DATA_TYPE_MEDIUMBLOB 19
#define TSDB_DATA_TYPE_BINARY TSDB_DATA_TYPE_VARCHAR // string
typedef enum {
TSDB_OPTION_LOCALE,
......
......@@ -1356,6 +1356,7 @@ typedef struct SVCreateTbReq {
} SVCreateTbReq, SVUpdateTbReq;
typedef struct {
int tmp; // TODO: to avoid compile error
} SVCreateTbRsp, SVUpdateTbRsp;
int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
......@@ -1367,6 +1368,7 @@ typedef struct {
} SVCreateTbBatchReq;
typedef struct {
int tmp; // TODO: to avoid compile error
} SVCreateTbBatchRsp;
int32_t tSerializeSVCreateTbBatchReq(void** buf, SVCreateTbBatchReq* pReq);
......@@ -1380,6 +1382,7 @@ typedef struct {
} SVDropTbReq;
typedef struct {
int tmp; // TODO: to avoid compile error
} SVDropTbRsp;
int32_t tSerializeSVDropTbReq(void** buf, SVDropTbReq* pReq);
......@@ -1895,24 +1898,19 @@ typedef enum {
} ETDTimeUnit;
typedef struct {
uint16_t funcId;
uint16_t nColIds;
col_id_t* colIds; // sorted colIds
} SFuncColIds;
typedef struct {
uint8_t version; // for compatibility
uint8_t intervalUnit;
uint8_t slidingUnit;
char indexName[TSDB_INDEX_NAME_LEN];
char timezone[TD_TIMEZONE_LEN];
uint16_t nFuncColIds;
uint16_t tagsFilterLen;
tb_uid_t tableUid; // super/common table uid
int64_t interval;
int64_t sliding;
SFuncColIds* funcColIds; // sorted funcIds
char* tagsFilter;
int8_t version; // for compatibility(default 0)
int8_t intervalUnit;
int8_t slidingUnit;
char indexName[TSDB_INDEX_NAME_LEN];
char timezone[TD_TIMEZONE_LEN]; // sma data is invalid if timezone change.
uint16_t exprLen;
uint16_t tagsFilterLen;
int64_t indexUid;
tb_uid_t tableUid; // super/child/common table uid
int64_t interval;
int64_t sliding;
char* expr; // sma expression
char* tagsFilter;
} STSma; // Time-range-wise SMA
typedef struct {
......@@ -1930,7 +1928,9 @@ typedef struct {
int64_t ver; // use a general definition
char indexName[TSDB_INDEX_NAME_LEN];
} SVDropTSmaReq;
typedef struct {
int tmp; // TODO: to avoid compile error
} SVCreateTSmaRsp, SVDropTSmaRsp;
int32_t tSerializeSVCreateTSmaReq(void** buf, SVCreateTSmaReq* pReq);
......@@ -1939,24 +1939,30 @@ int32_t tSerializeSVDropTSmaReq(void** buf, SVDropTSmaReq* pReq);
void* tDeserializeSVDropTSmaReq(void* buf, SVDropTSmaReq* pReq);
typedef struct {
STimeWindow tsWindow; // [skey, ekey]
uint64_t tableUid; // sub/common table uid
int32_t numOfBlocks; // number of sma blocks for each column, total number is numOfBlocks*numOfColId
int32_t dataLen; // total data length
col_id_t* colIds; // e.g. 2,4,9,10
col_id_t numOfColIds; // e.g. 4
char data[]; // the sma blocks
} STSmaData;
// TODO: move to the final location afte schema of STSma/STSmaData defined
static FORCE_INLINE void tdDestroySmaData(STSmaData* pSmaData) {
if (pSmaData) {
if (pSmaData->colIds) {
tfree(pSmaData->colIds);
}
tfree(pSmaData);
}
}
col_id_t colId;
uint16_t blockSize; // sma data block size
char data[];
} STSmaColData;
typedef struct {
tb_uid_t tableUid; // super/child/normal table uid
int32_t dataLen; // not including head
char data[];
} STSmaTbData;
typedef struct {
int64_t indexUid;
TSKEY skey; // startTS of one interval/sliding
int64_t interval;
int32_t dataLen; // not including head
int8_t intervalUnit;
char data[];
} STSmaDataWrapper; // sma data for a interval/sliding window
// interval/sliding => window
// => window->table->colId
// => 当一个window下所有的表均计算完成时,流计算告知tsdb清除window的过期标记
// RSma: Rollup SMA
typedef struct {
......@@ -1979,13 +1985,7 @@ typedef struct {
static FORCE_INLINE void tdDestroyTSma(STSma* pSma) {
if (pSma) {
if (pSma->funcColIds != NULL) {
for (uint16_t i = 0; i < pSma->nFuncColIds; ++i) {
tfree((pSma->funcColIds + i)->colIds);
}
tfree(pSma->funcColIds);
}
tfree(pSma->expr);
tfree(pSma->tagsFilter);
}
}
......@@ -2004,24 +2004,20 @@ static FORCE_INLINE void tdDestroyTSmaWrapper(STSmaWrapper* pSW) {
static FORCE_INLINE int32_t tEncodeTSma(void** buf, const STSma* pSma) {
int32_t tlen = 0;
tlen += taosEncodeFixedU8(buf, pSma->version);
tlen += taosEncodeFixedU8(buf, pSma->intervalUnit);
tlen += taosEncodeFixedU8(buf, pSma->slidingUnit);
tlen += taosEncodeFixedI8(buf, pSma->version);
tlen += taosEncodeFixedI8(buf, pSma->intervalUnit);
tlen += taosEncodeFixedI8(buf, pSma->slidingUnit);
tlen += taosEncodeString(buf, pSma->indexName);
tlen += taosEncodeString(buf, pSma->timezone);
tlen += taosEncodeFixedU16(buf, pSma->nFuncColIds);
tlen += taosEncodeFixedU16(buf, pSma->exprLen);
tlen += taosEncodeFixedU16(buf, pSma->tagsFilterLen);
tlen += taosEncodeFixedI64(buf, pSma->indexUid);
tlen += taosEncodeFixedI64(buf, pSma->tableUid);
tlen += taosEncodeFixedI64(buf, pSma->interval);
tlen += taosEncodeFixedI64(buf, pSma->sliding);
for (uint16_t i = 0; i < pSma->nFuncColIds; ++i) {
SFuncColIds* funcColIds = pSma->funcColIds + i;
tlen += taosEncodeFixedU16(buf, funcColIds->funcId);
tlen += taosEncodeFixedU16(buf, funcColIds->nColIds);
for (uint16_t j = 0; j < funcColIds->nColIds; ++j) {
tlen += taosEncodeFixedU16(buf, *(funcColIds->colIds + j));
}
if (pSma->exprLen > 0) {
tlen += taosEncodeString(buf, pSma->expr);
}
if (pSma->tagsFilterLen > 0) {
......@@ -2042,43 +2038,30 @@ static FORCE_INLINE int32_t tEncodeTSmaWrapper(void** buf, const STSmaWrapper* p
}
static FORCE_INLINE void* tDecodeTSma(void* buf, STSma* pSma) {
buf = taosDecodeFixedU8(buf, &pSma->version);
buf = taosDecodeFixedU8(buf, &pSma->intervalUnit);
buf = taosDecodeFixedU8(buf, &pSma->slidingUnit);
buf = taosDecodeFixedI8(buf, &pSma->version);
buf = taosDecodeFixedI8(buf, &pSma->intervalUnit);
buf = taosDecodeFixedI8(buf, &pSma->slidingUnit);
buf = taosDecodeStringTo(buf, pSma->indexName);
buf = taosDecodeStringTo(buf, pSma->timezone);
buf = taosDecodeFixedU16(buf, &pSma->nFuncColIds);
buf = taosDecodeFixedU16(buf, &pSma->exprLen);
buf = taosDecodeFixedU16(buf, &pSma->tagsFilterLen);
buf = taosDecodeFixedI64(buf, &pSma->indexUid);
buf = taosDecodeFixedI64(buf, &pSma->tableUid);
buf = taosDecodeFixedI64(buf, &pSma->interval);
buf = taosDecodeFixedI64(buf, &pSma->sliding);
if (pSma->nFuncColIds > 0) {
pSma->funcColIds = (SFuncColIds*)calloc(pSma->nFuncColIds, sizeof(SFuncColIds));
if (pSma->funcColIds == NULL) {
if (pSma->exprLen > 0) {
pSma->expr = (char*)calloc(pSma->exprLen, 1);
if (pSma->expr != NULL) {
buf = taosDecodeStringTo(buf, pSma->expr);
} else {
tdDestroyTSma(pSma);
return NULL;
}
for (uint16_t i = 0; i < pSma->nFuncColIds; ++i) {
SFuncColIds* funcColIds = pSma->funcColIds + i;
buf = taosDecodeFixedU16(buf, &funcColIds->funcId);
buf = taosDecodeFixedU16(buf, &funcColIds->nColIds);
if (funcColIds->nColIds > 0) {
funcColIds->colIds = (col_id_t*)calloc(funcColIds->nColIds, sizeof(col_id_t));
if (funcColIds->colIds != NULL) {
for (uint16_t j = 0; j < funcColIds->nColIds; ++j) {
buf = taosDecodeFixedU16(buf, funcColIds->colIds + j);
}
} else {
tdDestroyTSma(pSma);
return NULL;
}
} else {
funcColIds->colIds = NULL;
}
}
} else {
pSma->funcColIds = NULL;
pSma->expr = NULL;
}
if (pSma->tagsFilterLen > 0) {
......
......@@ -103,6 +103,7 @@ typedef struct {
typedef struct {
// TODO
int tmp; // TODO: to avoid compile error
} STpRow; // tuple
#pragma pack(push, 1)
......@@ -1098,4 +1099,4 @@ const STSRow *tRowBatchIterNext(STSRowBatchIter *pRowBatchIter);
}
#endif
#endif /*_TD_COMMON_ROW_H_*/
\ No newline at end of file
#endif /*_TD_COMMON_ROW_H_*/
......@@ -41,7 +41,6 @@ int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg);
typedef struct SInputData {
const struct SSDataBlock* pData;
SHashObj* pTableRetrieveTsMap;
} SInputData;
typedef struct SOutputData {
......
......@@ -29,7 +29,6 @@ extern "C" {
extern int tsRpcHeadSize;
typedef struct SRpcPush SRpcPush;
typedef struct SRpcConnInfo {
uint32_t clientIp;
......@@ -45,16 +44,8 @@ typedef struct SRpcMsg {
int32_t code;
void * handle; // rpc handle returned to app
void * ahandle; // app handle set by client
int persist; // keep handle or not, default 0
SRpcPush *push;
} SRpcMsg;
typedef struct SRpcPush {
void *arg;
int (*callback)(void *arg, SRpcMsg *rpcMsg);
} SRpcPush;
typedef struct SRpcInit {
uint16_t localPort; // local port
......
......@@ -22,7 +22,22 @@ extern "C" {
#include <assert.h>
#include <ctype.h>
#if !defined(WINDOWS)
#include <unistd.h>
#include <dirent.h>
#include <regex.h>
#include <sched.h>
#include <wordexp.h>
#include <libgen.h>
#include <sys/utsname.h>
#include <sys/param.h>
#include <sys/mman.h>
#include <sys/prctl.h>
#endif
#include <errno.h>
#include <fcntl.h>
#include <float.h>
......@@ -30,8 +45,6 @@ extern "C" {
#include <limits.h>
#include <locale.h>
#include <math.h>
#include <regex.h>
#include <sched.h>
#include <setjmp.h>
#include <signal.h>
#include <stdarg.h>
......@@ -43,16 +56,9 @@ extern "C" {
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/utsname.h>
#include <sys/param.h>
#include <unistd.h>
#include <wchar.h>
#include <wctype.h>
#include <wordexp.h>
#include <libgen.h>
#include <sys/mman.h>
#include <sys/prctl.h>
#include "osAtomic.h"
#include "osDef.h"
......
......@@ -49,7 +49,7 @@ extern "C" {
#endif
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#if defined(WINDOWS)
char *stpcpy (char *dest, const char *src);
char *stpncpy (char *dest, const char *src, size_t n);
......@@ -63,16 +63,46 @@ extern "C" {
#define strtok_r strtok_s
#define snprintf _snprintf
#define in_addr_t unsigned long
#define socklen_t int
// #define socklen_t int
struct tm *localtime_r(const time_t *timep, struct tm *result);
char * strptime(const char *buf, const char *fmt, struct tm *tm);
char * strsep(char **stringp, const char *delim);
char * getpass(const char *prefix);
char * strndup(const char *s, size_t n);
#endif
int gettimeofday(struct timeval *ptv, void *pTimeZone);
// for send function in tsocket.c
#define MSG_NOSIGNAL 0
#define SO_NO_CHECK 0x1234
#define SOL_TCP 0x1234
#define SHUT_RDWR SD_BOTH
#define SHUT_RD SD_RECEIVE
#define SHUT_WR SD_SEND
#define LOCK_EX 1
#define LOCK_NB 2
#define LOCK_UN 3
#ifndef PATH_MAX
#define PATH_MAX 256
#endif
typedef struct {
int we_wordc;
char *we_wordv[1];
int we_offs;
char wordPos[1025];
} wordexp_t;
int wordexp(char *words, wordexp_t *pwordexp, int flags);
void wordfree(wordexp_t *pwordexp);
#define openlog(a, b, c)
#define closelog()
#define LOG_ERR 0
#define LOG_INFO 1
void syslog(int unused, const char *format, ...);
#endif // WINDOWS
#ifndef WINDOWS
#ifndef O_BINARY
#define O_BINARY 0
......@@ -166,7 +196,7 @@ extern "C" {
#define PRIzu "zu"
#endif
#if defined(_TD_LINUX_64) || defined(_TD_LINUX_32) || defined(_TD_MIPS_64) || defined(_TD_ARM_32) || defined(_TD_ARM_64) || defined(_TD_DARWIN_64)
#if !defined(WINDOWS)
#if defined(_TD_DARWIN_64)
// MacOS
#if !defined(_GNU_SOURCE)
......@@ -181,8 +211,7 @@ extern "C" {
#endif
#else
// Windows
// #define setThreadName(name)
#define setThreadName(name) do { prctl(PR_SET_NAME, (name)); } while (0)
#define setThreadName(name)
#endif
#if defined(_WIN32)
......@@ -199,4 +228,4 @@ extern "C" {
}
#endif
#endif /*_TD_OS_DEF_H_*/
\ No newline at end of file
#endif /*_TD_OS_DEF_H_*/
......@@ -22,6 +22,15 @@ extern "C" {
#include "osSocket.h"
#if defined(WINDOWS)
typedef int32_t FileFd;
typedef SOCKET SocketFd;
#else
typedef int32_t FileFd;
typedef int32_t SocketFd;
#endif
int64_t taosRead(FileFd fd, void *buf, int64_t count);
// If the error is in a third-party library, place this header file under the third-party library header file.
#ifndef ALLOW_FORBID_FUNC
#define open OPEN_FUNC_TAOS_FORBID
......@@ -76,7 +85,13 @@ int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count);
int64_t taosPReadFile(TdFilePtr pFile, void *buf, int64_t count, int64_t offset);
int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count);
void taosFprintfFile(TdFilePtr pFile, const char *format, ...);
#if defined(WINDOWS)
#define __restrict__
#endif // WINDOWS
int64_t taosGetLineFile(TdFilePtr pFile, char ** __restrict__ ptrBuf);
int32_t taosEOFFile(TdFilePtr pFile);
int64_t taosCloseFile(TdFilePtr *ppFile);
......
......@@ -16,12 +16,13 @@
#ifndef _TD_OS_SEMPHONE_H_
#define _TD_OS_SEMPHONE_H_
#include <semaphore.h>
#ifdef __cplusplus
extern "C" {
#endif
#include <pthread.h>
#include <semaphore.h>
#if defined (_TD_DARWIN_64)
typedef struct tsem_s *tsem_t;
int tsem_init(tsem_t *sem, int pshared, unsigned int value);
......
......@@ -27,7 +27,7 @@
#define epoll_wait EPOLL_WAIT_FUNC_TAOS_FORBID
#endif
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#if defined(WINDOWS)
#include "winsock2.h"
#include <WS2tcpip.h>
#include <winbase.h>
......
......@@ -52,6 +52,13 @@ int32_t taosGetSystemUUID(char *uid, int32_t uidlen);
char *taosGetCmdlineByPID(int32_t pid);
void taosSetCoreDump(bool enable);
#if defined(WINDOWS)
#define _UTSNAME_LENGTH 65
#define _UTSNAME_MACHINE_LENGTH _UTSNAME_LENGTH
#endif // WINDOWS
typedef struct {
char sysname[_UTSNAME_MACHINE_LENGTH];
char nodename[_UTSNAME_MACHINE_LENGTH];
......
......@@ -20,7 +20,19 @@
extern "C" {
#endif
// If the error is in a third-party library, place this header file under the third-party library header file.
#ifndef ALLOW_FORBID_FUNC
#define strptime STRPTIME_FUNC_TAOS_FORBID
#define gettimeofday GETTIMEOFDAY_FUNC_TAOS_FORBID
#define localtime_s LOCALTIMES_FUNC_TAOS_FORBID
#define localtime_r LOCALTIMER_FUNC_TAOS_FORBID
#define time TIME_FUNC_TAOS_FORBID
#endif
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#define CLOCK_REALTIME 0
#ifdef _TD_GO_DLL_
#define MILLISECOND_PER_SECOND (1000LL)
#else
......@@ -61,6 +73,9 @@ static FORCE_INLINE int64_t taosGetTimestampNs() {
return (int64_t)systemTime.tv_sec * 1000000000L + (int64_t)systemTime.tv_nsec;
}
char *taosStrpTime(const char *buf, const char *fmt, struct tm *tm);
struct tm *taosLocalTime(const time_t *timep, struct tm *result);
#ifdef __cplusplus
}
#endif
......
......@@ -20,6 +20,15 @@
extern "C" {
#endif
// If the error is in a third-party library, place this header file under the third-party library header file.
#ifndef ALLOW_FORBID_FUNC
#define timer_create TIMER_CREATE_FUNC_TAOS_FORBID
#define timer_settime TIMER_SETTIME_FUNC_TAOS_FORBID
#define timer_delete TIMER_DELETE_FUNC_TAOS_FORBID
#define timeSetEvent TIMESETEVENT_SETTIME_FUNC_TAOS_FORBID
#define timeKillEvent TIMEKILLEVENT_SETTIME_FUNC_TAOS_FORBID
#endif
#define MSECONDS_PER_TICK 5
int32_t taosInitTimer(void (*callback)(int32_t), int32_t ms);
......
......@@ -4,6 +4,10 @@ target_include_directories(
common
PUBLIC "${CMAKE_SOURCE_DIR}/include/common"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
IF(${TD_WINDOWS})
PRIVATE "${CMAKE_SOURCE_DIR}/contrib/pthread-win32"
PRIVATE "${CMAKE_SOURCE_DIR}/contrib/gnuregex"
ENDIF ()
)
target_link_libraries(
common
......
......@@ -61,7 +61,7 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, in
}
struct tm tm;
time_t t = (time_t)start;
localtime_r(&t, &tm);
taosLocalTime(&t, &tm);
tm.tm_sec = 0;
tm.tm_min = 0;
tm.tm_hour = 0;
......
......@@ -69,7 +69,7 @@ static int64_t m_deltaUtc = 0;
void deltaToUtcInitOnce() {
struct tm tm = {0};
(void)strptime("1970-01-01 00:00:00", (const char*)("%Y-%m-%d %H:%M:%S"), &tm);
(void)taosStrpTime("1970-01-01 00:00:00", (const char*)("%Y-%m-%d %H:%M:%S"), &tm);
m_deltaUtc = (int64_t)mktime(&tm);
// printf("====delta:%lld\n\n", seconds);
}
......@@ -236,9 +236,9 @@ int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, ch
char* str;
if (delim == 'T') {
str = strptime(timestr, "%Y-%m-%dT%H:%M:%S", &tm);
str = taosStrpTime(timestr, "%Y-%m-%dT%H:%M:%S", &tm);
} else if (delim == 0) {
str = strptime(timestr, "%Y-%m-%d %H:%M:%S", &tm);
str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm);
} else {
str = NULL;
}
......@@ -303,7 +303,7 @@ int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec) {
*time = 0;
struct tm tm = {0};
char* str = strptime(timestr, "%Y-%m-%d %H:%M:%S", &tm);
char* str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm);
if (str == NULL) {
return -1;
}
......@@ -338,7 +338,7 @@ int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec) {
struct tm tm = {0};
tm.tm_isdst = -1;
char* str = strptime(timestr, "%Y-%m-%d %H:%M:%S", &tm);
char* str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm);
if (str == NULL) {
return -1;
}
......@@ -466,7 +466,7 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) {
struct tm tm;
time_t tt = (time_t)(t / TSDB_TICK_PER_SECOND(precision));
localtime_r(&tt, &tm);
taosLocalTime(&tt, &tm);
int32_t mon = tm.tm_year * 12 + tm.tm_mon + (int32_t)duration;
tm.tm_year = mon / 12;
tm.tm_mon = mon % 12;
......@@ -489,11 +489,11 @@ int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char
struct tm tm;
time_t t = (time_t)skey;
localtime_r(&t, &tm);
taosLocalTime(&t, &tm);
int32_t smon = tm.tm_year * 12 + tm.tm_mon;
t = (time_t)ekey;
localtime_r(&t, &tm);
taosLocalTime(&t, &tm);
int32_t emon = tm.tm_year * 12 + tm.tm_mon;
if (unit == 'y') {
......@@ -514,7 +514,7 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio
start /= (int64_t)(TSDB_TICK_PER_SECOND(precision));
struct tm tm;
time_t tt = (time_t)start;
localtime_r(&tt, &tm);
taosLocalTime(&tt, &tm);
tm.tm_sec = 0;
tm.tm_min = 0;
tm.tm_hour = 0;
......@@ -597,13 +597,13 @@ const char* fmtts(int64_t ts) {
if (ts > -62135625943 && ts < 32503651200) {
time_t t = (time_t)ts;
localtime_r(&t, &tm);
taosLocalTime(&t, &tm);
pos += strftime(buf + pos, sizeof(buf), "s=%Y-%m-%d %H:%M:%S", &tm);
}
if (ts > -62135625943000 && ts < 32503651200000) {
time_t t = (time_t)(ts / 1000);
localtime_r(&t, &tm);
taosLocalTime(&t, &tm);
if (pos > 0) {
buf[pos++] = ' ';
buf[pos++] = '|';
......@@ -615,7 +615,7 @@ const char* fmtts(int64_t ts) {
{
time_t t = (time_t)(ts / 1000000);
localtime_r(&t, &tm);
taosLocalTime(&t, &tm);
if (pos > 0) {
buf[pos++] = ' ';
buf[pos++] = '|';
......
......@@ -58,8 +58,8 @@ STbCfg * metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid);
STbCfg * metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid);
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline);
STSchema * metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver);
STSma * metaGetSmaInfoByName(SMeta *pMeta, const char *indexName);
STSmaWrapper * metaGetSmaInfoByUid(SMeta *pMeta, tb_uid_t uid);
STSma * metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid);
STSmaWrapper * metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid);
SArray * metaGetSmaTbUids(SMeta *pMeta, bool isDup);
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
......
......@@ -89,24 +89,21 @@ int tsdbCommit(STsdb *pTsdb);
/**
* @brief Insert tSma(Time-range-wise SMA) data from stream computing engine
*
* @param pTsdb
* @param param
* @param pData
* @return int32_t
*
* @param pTsdb
* @param msg
* @return int32_t
*/
int32_t tsdbInsertTSmaData(STsdb *pTsdb, STSma *param, STSmaData *pData);
int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg);
/**
* @brief Insert RSma(Time-range-wise Rollup SMA) data.
*
* @param pTsdb
* @param param
* @param pData
* @return int32_t
*
* @param pTsdb
* @param msg
* @return int32_t
*/
int32_t tsdbInsertRSmaData(STsdb *pTsdb, SRSma *param, STSmaData *pData);
int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg);
// STsdbCfg
int tsdbOptionsInit(STsdbCfg *);
......
......@@ -42,17 +42,14 @@ typedef struct {
typedef struct {
STsdbFSMeta meta; // FS meta
SArray * df; // data file array
// SArray * v2t100.index_name
SArray * smaf; // sma data file array v2t1900.index_name
SArray * sf; // sma data file array v2(t|r)1900.index_name_1
} SFSStatus;
/**
* @brief Directory structure of .tsma data files.
*
* root@cary /vnode2/tsdb $ tree .tsma/
* .tsma/
*
* /vnode2/tsdb $ tree .sma/
* .sma/
* ├── v2t100.index_name_1
* ├── v2t101.index_name_1
* ├── v2t102.index_name_1
......@@ -66,7 +63,7 @@ typedef struct {
* 0 directories, 9 files
*/
typedef struct {
typedef struct {
pthread_rwlock_t lock;
SFSStatus *cstatus; // current status
......
......@@ -335,6 +335,17 @@ typedef struct {
SDFile files[TSDB_FILE_MAX];
} SDFileSet;
typedef struct {
int fid;
int8_t state;
uint8_t ver;
#if 0
SDFInfo info;
#endif
STfsFile f;
TdFilePtr pFile;
} SSFile; // files split by days with fid
#define TSDB_LATEST_FSET_VER 0
#define TSDB_FSET_FID(s) ((s)->fid)
......
......@@ -19,26 +19,28 @@
typedef struct SSmaStat SSmaStat;
// insert/update interface
int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData);
int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, SRSma *param, STSmaData *pData);
int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg);
int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg);
// query interface
// TODO: This is the basic params, and should wrap the params to a queryHandle.
int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData, STimeWindow *queryWin, int32_t nMaxResult);
int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, STimeWindow *queryWin, int32_t nMaxResult);
// management interface
int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg);
int32_t tsdbDestroySmaState(SSmaStat *pSmaStat);
#if 0
int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result);
int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin);
int32_t tsdbDestroySmaState(SSmaStat *pSmaStat);
#endif
// internal func
static FORCE_INLINE int32_t tsdbEncodeTSmaKey(uint64_t tableUid, col_id_t colId, TSKEY tsKey, void **pData) {
static FORCE_INLINE int32_t tsdbEncodeTSmaKey(tb_uid_t tableUid, col_id_t colId, TSKEY tsKey, void **pData) {
int32_t len = 0;
len += taosEncodeFixedU64(pData, tableUid);
len += taosEncodeFixedI64(pData, tableUid);
len += taosEncodeFixedU16(pData, colId);
len += taosEncodeFixedI64(pData, tsKey);
return len;
......
......@@ -227,21 +227,27 @@ int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) {
}
int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) {
char buf[512] = {0}; // TODO: may overflow
void *pBuf = NULL;
// char buf[512] = {0}; // TODO: may overflow
void *pBuf = NULL, *qBuf = NULL;
DBT key1 = {0}, value1 = {0};
{
// save sma info
pBuf = buf;
int32_t len = tEncodeTSma(NULL, pSmaCfg);
pBuf = calloc(len, 1);
if (pBuf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
key1.data = pSmaCfg->indexName;
key1.size = strlen(key1.data);
key1.data = (void *)&pSmaCfg->indexUid;
key1.size = sizeof(pSmaCfg->indexUid);
tEncodeTSma(&pBuf, pSmaCfg);
qBuf = pBuf;
tEncodeTSma(&qBuf, pSmaCfg);
value1.data = buf;
value1.size = POINTER_DISTANCE(pBuf, buf);
value1.data = pBuf;
value1.size = POINTER_DISTANCE(qBuf, pBuf);
value1.app_data = pSmaCfg;
}
......@@ -609,7 +615,7 @@ STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) {
return pTbCfg;
}
STSma *metaGetSmaInfoByName(SMeta *pMeta, const char *indexName) {
STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
STSma * pCfg = NULL;
SMetaDB *pDB = pMeta->pDB;
DBT key = {0};
......@@ -617,8 +623,8 @@ STSma *metaGetSmaInfoByName(SMeta *pMeta, const char *indexName) {
int ret;
// Set key/value
key.data = (void *)indexName;
key.size = strlen(indexName);
key.data = (void *)&indexUid;
key.size = sizeof(indexUid);
// Query
metaDBRLock(pDB);
......@@ -634,7 +640,10 @@ STSma *metaGetSmaInfoByName(SMeta *pMeta, const char *indexName) {
return NULL;
}
tDecodeTSma(value.data, pCfg);
if (tDecodeTSma(value.data, pCfg) == NULL) {
tfree(pCfg);
return NULL;
}
return pCfg;
}
......@@ -871,7 +880,7 @@ const char *metaSmaCursorNext(SMSmaCursor *pCur) {
}
}
STSmaWrapper *metaGetSmaInfoByUid(SMeta *pMeta, tb_uid_t uid) {
STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) {
STSmaWrapper *pSW = NULL;
pSW = calloc(sizeof(*pSW), 1);
......
......@@ -39,13 +39,13 @@ int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp) {
*
* @param pTsdb
* @param param
* @param pData
* @param msg
* @return int32_t
* TODO: Who is responsible for resource allocate and release?
*/
int32_t tsdbInsertTSmaData(STsdb *pTsdb, STSma *param, STSmaData *pData) {
int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tsdbInsertTSmaDataImpl(pTsdb, param, pData)) < 0) {
if ((code = tsdbInsertTSmaDataImpl(pTsdb, msg)) < 0) {
tsdbWarn("vgId:%d insert tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
}
return code;
......@@ -56,12 +56,12 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, STSma *param, STSmaData *pData) {
*
* @param pTsdb
* @param param
* @param pData
* @param msg
* @return int32_t
*/
int32_t tsdbInsertRSmaData(STsdb *pTsdb, SRSma *param, STSmaData *pData) {
int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tsdbInsertRSmaDataImpl(pTsdb, param, pData)) < 0) {
if ((code = tsdbInsertRSmaDataImpl(pTsdb, msg)) < 0) {
tsdbWarn("vgId:%d insert rSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
}
return code;
......
......@@ -43,20 +43,8 @@ TEST(testCase, tSmaEncodeDecodeTest) {
tSma.sliding = 0;
tstrncpy(tSma.indexName, "sma_index_test", TSDB_INDEX_NAME_LEN);
tstrncpy(tSma.timezone, "Asia/Shanghai", TD_TIMEZONE_LEN);
tSma.indexUid = 2345678910;
tSma.tableUid = 1234567890;
tSma.nFuncColIds = 5;
tSma.funcColIds = (SFuncColIds *)calloc(tSma.nFuncColIds, sizeof(SFuncColIds));
ASSERT(tSma.funcColIds != NULL);
for (int32_t n = 0; n < tSma.nFuncColIds; ++n) {
SFuncColIds *funcColIds = tSma.funcColIds + n;
funcColIds->funcId = n;
funcColIds->nColIds = 10;
funcColIds->colIds = (col_id_t *)calloc(funcColIds->nColIds, sizeof(col_id_t));
ASSERT(funcColIds->colIds != NULL);
for (int32_t i = 0; i < funcColIds->nColIds; ++i) {
*(funcColIds->colIds + i) = (i + PRIMARYKEY_TIMESTAMP_COL_ID);
}
}
STSmaWrapper tSmaWrapper = {.number = 1, .tSma = &tSma};
uint32_t bufLen = tEncodeTSmaWrapper(NULL, &tSmaWrapper);
......@@ -85,35 +73,31 @@ TEST(testCase, tSmaEncodeDecodeTest) {
EXPECT_EQ(pSma->slidingUnit, qSma->slidingUnit);
EXPECT_STRCASEEQ(pSma->indexName, qSma->indexName);
EXPECT_STRCASEEQ(pSma->timezone, qSma->timezone);
EXPECT_EQ(pSma->nFuncColIds, qSma->nFuncColIds);
EXPECT_EQ(pSma->indexUid, qSma->indexUid);
EXPECT_EQ(pSma->tableUid, qSma->tableUid);
EXPECT_EQ(pSma->interval, qSma->interval);
EXPECT_EQ(pSma->sliding, qSma->sliding);
EXPECT_EQ(pSma->exprLen, qSma->exprLen);
EXPECT_STRCASEEQ(pSma->expr, qSma->expr);
EXPECT_EQ(pSma->tagsFilterLen, qSma->tagsFilterLen);
EXPECT_STRCASEEQ(pSma->tagsFilter, qSma->tagsFilter);
for (uint32_t j = 0; j < pSma->nFuncColIds; ++j) {
SFuncColIds *pFuncColIds = pSma->funcColIds + j;
SFuncColIds *qFuncColIds = qSma->funcColIds + j;
EXPECT_EQ(pFuncColIds->funcId, qFuncColIds->funcId);
EXPECT_EQ(pFuncColIds->nColIds, qFuncColIds->nColIds);
for (uint32_t k = 0; k < pFuncColIds->nColIds; ++k) {
EXPECT_EQ(*(pFuncColIds->colIds + k), *(qFuncColIds->colIds + k));
}
}
}
// resource release
tdDestroyTSma(&tSma);
tdDestroyTSmaWrapper(&dstTSmaWrapper);
}
#if 1
TEST(testCase, tSma_DB_Put_Get_Del_Test) {
const char * smaIndexName1 = "sma_index_test_1";
const char * smaIndexName2 = "sma_index_test_2";
const char * timeZone = "Asia/Shanghai";
const char * timezone = "Asia/Shanghai";
const char * expr = "select count(a,b, top 20), from table interval 1d, sliding 1h;";
const char * tagsFilter = "I'm tags filter";
const char * smaTestDir = "./smaTest";
const uint64_t tbUid = 1234567890;
const tb_uid_t tbUid = 1234567890;
const int64_t indexUid1 = 2000000001;
const int64_t indexUid2 = 2000000002;
const uint32_t nCntTSma = 2;
// encode
STSma tSma = {0};
......@@ -122,22 +106,15 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) {
tSma.interval = 1;
tSma.slidingUnit = TD_TIME_UNIT_HOUR;
tSma.sliding = 0;
tSma.indexUid = indexUid1;
tstrncpy(tSma.indexName, smaIndexName1, TSDB_INDEX_NAME_LEN);
tstrncpy(tSma.timezone, timeZone, TD_TIMEZONE_LEN);
tstrncpy(tSma.timezone, timezone, TD_TIMEZONE_LEN);
tSma.tableUid = tbUid;
tSma.nFuncColIds = 5;
tSma.funcColIds = (SFuncColIds *)calloc(tSma.nFuncColIds, sizeof(SFuncColIds));
ASSERT(tSma.funcColIds != NULL);
for (int32_t n = 0; n < tSma.nFuncColIds; ++n) {
SFuncColIds *funcColIds = tSma.funcColIds + n;
funcColIds->funcId = n;
funcColIds->nColIds = 10;
funcColIds->colIds = (col_id_t *)calloc(funcColIds->nColIds, sizeof(col_id_t));
ASSERT(funcColIds->colIds != NULL);
for (int32_t i = 0; i < funcColIds->nColIds; ++i) {
*(funcColIds->colIds + i) = (i + PRIMARYKEY_TIMESTAMP_COL_ID);
}
}
tSma.exprLen = strlen(expr);
tSma.expr = (char *)calloc(tSma.exprLen + 1, 1);
tstrncpy(tSma.expr, expr, tSma.exprLen + 1);
tSma.tagsFilterLen = strlen(tagsFilter);
tSma.tagsFilter = (char *)calloc(tSma.tagsFilterLen + 1, 1);
tstrncpy(tSma.tagsFilter, tagsFilter, tSma.tagsFilterLen + 1);
......@@ -151,8 +128,9 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) {
pMeta = metaOpen(smaTestDir, pMetaCfg, NULL);
assert(pMeta != NULL);
// save index 1
metaSaveSmaToDB(pMeta, pSmaCfg);
EXPECT_EQ(metaSaveSmaToDB(pMeta, pSmaCfg), 0);
pSmaCfg->indexUid = indexUid2;
tstrncpy(pSmaCfg->indexName, smaIndexName2, TSDB_INDEX_NAME_LEN);
pSmaCfg->version = 1;
pSmaCfg->intervalUnit = TD_TIME_UNIT_HOUR;
......@@ -161,24 +139,26 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) {
pSmaCfg->sliding = 5;
// save index 2
metaSaveSmaToDB(pMeta, pSmaCfg);
EXPECT_EQ(metaSaveSmaToDB(pMeta, pSmaCfg), 0);
// get value by indexName
STSma *qSmaCfg = NULL;
qSmaCfg = metaGetSmaInfoByName(pMeta, smaIndexName1);
qSmaCfg = metaGetSmaInfoByIndex(pMeta, indexUid1);
assert(qSmaCfg != NULL);
printf("name1 = %s\n", qSmaCfg->indexName);
printf("timezone1 = %s\n", qSmaCfg->timezone);
printf("expr1 = %s\n", qSmaCfg->expr != NULL ? qSmaCfg->expr : "");
printf("tagsFilter1 = %s\n", qSmaCfg->tagsFilter != NULL ? qSmaCfg->tagsFilter : "");
EXPECT_STRCASEEQ(qSmaCfg->indexName, smaIndexName1);
EXPECT_EQ(qSmaCfg->tableUid, tSma.tableUid);
tdDestroyTSma(qSmaCfg);
tfree(qSmaCfg);
qSmaCfg = metaGetSmaInfoByName(pMeta, smaIndexName2);
qSmaCfg = metaGetSmaInfoByIndex(pMeta, indexUid2);
assert(qSmaCfg != NULL);
printf("name2 = %s\n", qSmaCfg->indexName);
printf("timezone2 = %s\n", qSmaCfg->timezone);
printf("expr2 = %s\n", qSmaCfg->expr != NULL ? qSmaCfg->expr : "");
printf("tagsFilter2 = %s\n", qSmaCfg->tagsFilter != NULL ? qSmaCfg->tagsFilter : "");
EXPECT_STRCASEEQ(qSmaCfg->indexName, smaIndexName2);
EXPECT_EQ(qSmaCfg->interval, tSma.interval);
......@@ -201,17 +181,21 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) {
metaCloseSmaCurosr(pSmaCur);
// get wrapper by table uid
STSmaWrapper *pSW = metaGetSmaInfoByUid(pMeta, tbUid);
STSmaWrapper *pSW = metaGetSmaInfoByTable(pMeta, tbUid);
assert(pSW != NULL);
EXPECT_EQ(pSW->number, nCntTSma);
EXPECT_STRCASEEQ(pSW->tSma->indexName, smaIndexName1);
EXPECT_STRCASEEQ(pSW->tSma->timezone, timeZone);
EXPECT_STRCASEEQ(pSW->tSma->timezone, timezone);
EXPECT_STRCASEEQ(pSW->tSma->expr, expr);
EXPECT_STRCASEEQ(pSW->tSma->tagsFilter, tagsFilter);
EXPECT_EQ(pSW->tSma->tableUid, tSma.tableUid);
EXPECT_EQ(pSW->tSma->indexUid, indexUid1);
EXPECT_EQ(pSW->tSma->tableUid, tbUid);
EXPECT_STRCASEEQ((pSW->tSma + 1)->indexName, smaIndexName2);
EXPECT_STRCASEEQ((pSW->tSma + 1)->timezone, timeZone);
EXPECT_STRCASEEQ((pSW->tSma + 1)->timezone, timezone);
EXPECT_STRCASEEQ((pSW->tSma + 1)->expr, expr);
EXPECT_STRCASEEQ((pSW->tSma + 1)->tagsFilter, tagsFilter);
EXPECT_EQ((pSW->tSma + 1)->tableUid, tSma.tableUid);
EXPECT_EQ((pSW->tSma + 1)->indexUid, indexUid2);
EXPECT_EQ((pSW->tSma + 1)->tableUid, tbUid);
tdDestroyTSmaWrapper(pSW);
tfree(pSW);
......@@ -233,44 +217,68 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) {
tdDestroyTSma(&tSma);
metaClose(pMeta);
}
#endif
#if 0
#if 1
TEST(testCase, tSmaInsertTest) {
STSma tSma = {0};
STSmaData *pSmaData = NULL;
STsdb tsdb = {0};
const int64_t indexUid = 2000000002;
STSmaDataWrapper *pSmaData = NULL;
STsdb tsdb = {0};
STsdbCfg * pCfg = &tsdb.config;
pCfg->daysPerFile = 1;
// init
tSma.intervalUnit = TD_TIME_UNIT_DAY;
tSma.interval = 1;
tSma.numOfFuncIds = 5; // sum/min/max/avg/last
int32_t blockSize = tSma.numOfFuncIds * sizeof(int64_t);
int32_t numOfColIds = 3;
int32_t numOfBlocks = 10;
int32_t dataLen = numOfColIds * numOfBlocks * blockSize;
pSmaData = (STSmaData *)malloc(sizeof(STSmaData) + dataLen);
ASSERT_EQ(pSmaData != NULL, true);
pSmaData->tableUid = 3232329230;
pSmaData->numOfColIds = numOfColIds;
pSmaData->numOfBlocks = numOfBlocks;
pSmaData->dataLen = dataLen;
pSmaData->tsWindow.skey = 1640000000;
pSmaData->tsWindow.ekey = 1645788649;
pSmaData->colIds = (col_id_t *)malloc(sizeof(col_id_t) * numOfColIds);
ASSERT_EQ(pSmaData->colIds != NULL, true);
for (int32_t i = 0; i < numOfColIds; ++i) {
*(pSmaData->colIds + i) = (i + PRIMARYKEY_TIMESTAMP_COL_ID);
int32_t allocCnt = 0;
int32_t allocStep = 40960;
int32_t buffer = 4096;
void * buf = NULL;
EXPECT_EQ(tsdbMakeRoom(&buf, allocStep), 0);
int32_t bufSize = taosTSizeof(buf);
int32_t numOfTables = 25;
col_id_t numOfCols = 4096;
EXPECT_GT(numOfCols, 0);
pSmaData = (STSmaDataWrapper *)buf;
printf(">> allocate [%d] time to %d and addr is %p\n", ++allocCnt, bufSize, pSmaData);
pSmaData->skey = 1646987196;
pSmaData->interval = 10;
pSmaData->intervalUnit = TD_TIME_UNIT_MINUTE;
pSmaData->indexUid = indexUid;
int32_t len = sizeof(STSmaDataWrapper);
for (int32_t t = 0; t < numOfTables; ++t) {
STSmaTbData *pTbData = (STSmaTbData *)POINTER_SHIFT(pSmaData, len);
pTbData->tableUid = t;
int32_t tableDataLen = sizeof(STSmaTbData);
for (col_id_t c = 0; c < numOfCols; ++c) {
if (bufSize - len - tableDataLen < buffer) {
EXPECT_EQ(tsdbMakeRoom(&buf, bufSize + allocStep), 0);
pSmaData = (STSmaDataWrapper *)buf;
pTbData = (STSmaTbData *)POINTER_SHIFT(pSmaData, len);
bufSize = taosTSizeof(buf);
printf(">> allocate [%d] time to %d and addr is %p\n", ++allocCnt, bufSize, pSmaData);
}
STSmaColData *pColData = (STSmaColData *)POINTER_SHIFT(pSmaData, len + tableDataLen);
pColData->colId = c + PRIMARYKEY_TIMESTAMP_COL_ID;
pColData->blockSize = ((c & 1) == 0) ? 8 : 16;
// TODO: fill col data
tableDataLen += (sizeof(STSmaColData) + pColData->blockSize);
}
pTbData->dataLen = (tableDataLen - sizeof(STSmaTbData));
len += tableDataLen;
// printf("bufSize=%d, len=%d, len of table[%d]=%d\n", bufSize, len, t, tableDataLen);
}
pSmaData->dataLen = (len - sizeof(STSmaDataWrapper));
EXPECT_GE(bufSize, pSmaData->dataLen);
// execute
EXPECT_EQ(tsdbInsertTSmaData(&tsdb, &tSma, pSmaData), TSDB_CODE_SUCCESS);
EXPECT_EQ(tsdbInsertTSmaData(&tsdb, (char *)pSmaData), TSDB_CODE_SUCCESS);
// release
tdDestroySmaData(pSmaData);
taosTZfree(buf);
}
#endif
......
......@@ -253,6 +253,11 @@ typedef struct STaskIdInfo {
char* str;
} STaskIdInfo;
typedef struct STaskBufInfo {
int32_t bufSize; // total available buffer size in bytes
int32_t remainBuf; // remain buffer size
} STaskBufInfo;
typedef struct SExecTaskInfo {
STaskIdInfo id;
char* content;
......@@ -264,7 +269,8 @@ typedef struct SExecTaskInfo {
uint64_t totalRows; // total number of rows
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
char* sql; // query sql string
jmp_buf env; //
jmp_buf env; // when error occurs, abort
STaskBufInfo bufInfo; // available buffer info this task
struct SOperatorInfo* pRoot;
} SExecTaskInfo;
......@@ -307,9 +313,11 @@ typedef struct STaskRuntimeEnv {
} STaskRuntimeEnv;
enum {
OP_IN_EXECUTING = 1,
OP_RES_TO_RETURN = 2,
OP_EXEC_DONE = 3,
OP_NOT_OPENED = 0x0,
OP_OPENED = 0x1,
OP_IN_EXECUTING = 0x3,
OP_RES_TO_RETURN = 0x5,
OP_EXEC_DONE = 0x9,
};
typedef struct SOperatorInfo {
......@@ -322,12 +330,14 @@ typedef struct SOperatorInfo {
SExprInfo* pExpr;
STaskRuntimeEnv* pRuntimeEnv; // todo remove it
SExecTaskInfo* pTaskInfo;
SOperatorCostInfo cost;
struct SOperatorInfo** pDownstream; // downstram pointer list
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
__optr_open_fn_t openFn;
__optr_fn_t nextDataFn;
__optr_fn_t getNextFn;
__optr_fn_t cleanupFn;
__optr_close_fn_t closeFn;
__optr_open_fn_t _openFn; // DO NOT invoke this function directly
} SOperatorInfo;
typedef struct {
......@@ -378,9 +388,9 @@ typedef struct STaskParam {
} STaskParam;
enum {
DATA_NOT_READY = 0x1,
DATA_READY = 0x2,
DATA_EXHAUSTED = 0x3,
EX_SOURCE_DATA_NOT_READY = 0x1,
EX_SOURCE_DATA_READY = 0x2,
EX_SOURCE_DATA_EXHAUSTED = 0x3,
};
typedef struct SSourceDataInfo {
......@@ -639,12 +649,16 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema,
int32_t tableType, SEpSet epset, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream);
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
......@@ -674,10 +688,6 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema,
int32_t numOfOutput);
SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
// SSDataBlock* doSLimit(void* param, bool* newgroup);
// int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo, uint64_t qId);
void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock);
......@@ -691,7 +701,6 @@ void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFil
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
void finalizeQueryResult(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo,
int32_t* rowCellInfoOffset);
void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity, int32_t numOfInputRows);
void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity);
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
......
......@@ -21,8 +21,6 @@
#include "tqueue.h"
#include "executorimpl.h"
#define DATA_META_LENGTH(tables) (sizeof(int32_t) + sizeof(STableIdInfo) * taosHashGetSize(tables) + sizeof(SRetrieveTableRsp))
typedef struct SDataDispatchBuf {
int32_t useSize;
int32_t allocSize;
......@@ -90,19 +88,6 @@ static void copyData(const SInputData* pInput, const SDataBlockDescNode* pSchema
data += pColRes->info.bytes * pInput->pData->info.rows;
}
}
int32_t numOfTables = (int32_t) taosHashGetSize(pInput->pTableRetrieveTsMap);
*(int32_t*)data = htonl(numOfTables);
data += sizeof(int32_t);
STableIdInfo* item = taosHashIterate(pInput->pTableRetrieveTsMap, NULL);
while (item) {
STableIdInfo* pDst = (STableIdInfo*)data;
pDst->uid = htobe64(item->uid);
pDst->key = htobe64(item->key);
data += sizeof(STableIdInfo);
item = taosHashIterate(pInput->pTableRetrieveTsMap, item);
}
}
// data format with compress: SDataCacheEntry | cols_data_offset | col1_data col2_data ... | numOfTables | STableIdInfo STableIdInfo ...
......@@ -113,7 +98,7 @@ static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputDat
pEntry->numOfRows = pInput->pData->info.rows;
pEntry->dataLen = 0;
pBuf->useSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap);
pBuf->useSize = sizeof(SRetrieveTableRsp);
copyData(pInput, pHandle->pSchema, pEntry->data, pEntry->compressed, &pEntry->dataLen);
if (0 == pEntry->compressed) {
pEntry->dataLen = pHandle->pSchema->resultRowSize * pInput->pData->info.rows;
......@@ -130,7 +115,7 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
return false;
}
pBuf->allocSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap) + pDispatcher->pSchema->resultRowSize * pInput->pData->info.rows;
pBuf->allocSize = sizeof(SRetrieveTableRsp) + pDispatcher->pSchema->resultRowSize * pInput->pData->info.rows;
pBuf->pData = malloc(pBuf->allocSize);
if (pBuf->pData == NULL) {
qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno));
......
......@@ -158,7 +158,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
int64_t st = 0;
st = taosGetTimestampUs();
*pRes = pTaskInfo->pRoot->nextDataFn(pTaskInfo->pRoot, &newgroup);
*pRes = pTaskInfo->pRoot->getNextFn(pTaskInfo->pRoot, &newgroup);
uint64_t el = (taosGetTimestampUs() - st);
pTaskInfo->cost.elapsedTime += el;
......
......@@ -201,9 +201,9 @@ SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_
pOperator->name = "dummyInputOpertor4Test";
if (numOfCols == 1) {
pOperator->nextDataFn = getDummyBlock;
pOperator->getNextFn = getDummyBlock;
} else {
pOperator->nextDataFn = get2ColsDummyBlock;
pOperator->getNextFn = get2ColsDummyBlock;
}
SDummyInputInfo *pInfo = (SDummyInputInfo*) calloc(1, sizeof(SDummyInputInfo));
......@@ -971,7 +971,7 @@ TEST(testCase, inMem_sort_Test) {
SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(10000, 5, 1000, data_asc, 1), pExprInfo, pOrderVal, NULL);
bool newgroup = false;
SSDataBlock* pRes = pOperator->nextDataFn(pOperator, &newgroup);
SSDataBlock* pRes = pOperator->getNextFn(pOperator, &newgroup);
SColumnInfoData* pCol1 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 0));
SColumnInfoData* pCol2 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 1));
......@@ -1019,7 +1019,7 @@ TEST(testCase, external_sort_Test) {
return;
#endif
taosSeedRand(time(NULL));
taosSeedRand(taosGetTimestampSec());
SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder));
SOrder o = {0};
......@@ -1049,7 +1049,7 @@ TEST(testCase, external_sort_Test) {
while(1) {
int64_t s = taosGetTimestampUs();
pRes = pOperator->nextDataFn(pOperator, &newgroup);
pRes = pOperator->getNextFn(pOperator, &newgroup);
int64_t e = taosGetTimestampUs();
if (t++ == 1) {
......@@ -1080,7 +1080,7 @@ TEST(testCase, external_sort_Test) {
}
TEST(testCase, sorted_merge_Test) {
taosSeedRand(time(NULL));
taosSeedRand(taosGetTimestampSec());
SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder));
SOrder o = {0};
......@@ -1121,7 +1121,7 @@ TEST(testCase, sorted_merge_Test) {
while(1) {
int64_t s = taosGetTimestampUs();
pRes = pOperator->nextDataFn(pOperator, &newgroup);
pRes = pOperator->getNextFn(pOperator, &newgroup);
int64_t e = taosGetTimestampUs();
if (t++ == 1) {
......@@ -1152,7 +1152,7 @@ TEST(testCase, sorted_merge_Test) {
}
TEST(testCase, time_interval_Operator_Test) {
taosSeedRand(time(NULL));
taosSeedRand(taosGetTimestampSec());
SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder));
SOrder o = {0};
......@@ -1199,7 +1199,7 @@ TEST(testCase, time_interval_Operator_Test) {
while(1) {
int64_t s = taosGetTimestampUs();
pRes = pOperator->nextDataFn(pOperator, &newgroup);
pRes = pOperator->getNextFn(pOperator, &newgroup);
int64_t e = taosGetTimestampUs();
if (t++ == 1) {
......
......@@ -25,7 +25,7 @@
#pragma GCC diagnostic ignored "-Wsign-compare"
TEST(testCase, linear_hash_Tests) {
taosSeedRand(time(NULL));
taosSeedRand(taosGetTimestampSec());
_hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
#if 0
......
......@@ -131,7 +131,6 @@ static SNode* valueNodeCopy(const SValueNode* pSrc, SValueNode* pDst) {
case TSDB_DATA_TYPE_DOUBLE:
COPY_SCALAR_FIELD(datum.d);
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY:
......
......@@ -1039,7 +1039,6 @@ static int32_t datumToJson(const void* pObj, SJson* pJson) {
case TSDB_DATA_TYPE_DOUBLE:
code = tjsonAddDoubleToObject(pJson, jkValueDatum, pNode->datum.d);
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY:
......@@ -1103,7 +1102,6 @@ static int32_t jsonToDatum(const SJson* pJson, void* pObj) {
case TSDB_DATA_TYPE_DOUBLE:
code = tjsonGetDoubleValue(pJson, jkValueDatum, &pNode->datum.d);
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY:
......
......@@ -390,7 +390,6 @@ void* nodesGetValueFromNode(SValueNode *pNode) {
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE:
return (void*)&pNode->datum.d;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY:
......
......@@ -287,7 +287,6 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) {
pVal->datum.d = strtold(pVal->literal, &endPtr);
break;
}
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY: {
......@@ -601,7 +600,6 @@ static int32_t translateStar(STranslateContext* pCxt, SSelectStmt* pSelect, bool
static int32_t getPositionValue(const SValueNode* pVal) {
switch (pVal->node.resType.type) {
case TSDB_DATA_TYPE_NULL:
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR:
......@@ -1446,7 +1444,6 @@ static void valueNodeToVariant(const SValueNode* pNode, SVariant* pVal) {
case TSDB_DATA_TYPE_DOUBLE:
pVal->d = pNode->datum.d;
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY:
......
......@@ -56,7 +56,7 @@ protected:
const string syntaxTreeStr = toString(query_->pRoot, false);
SLogicNode* pLogicPlan = nullptr;
SPlanContext cxt = { .queryId = 1, .pAstRoot = query_->pRoot };
SPlanContext cxt = { .queryId = 1, .acctId = 0, .pAstRoot = query_->pRoot };
code = createLogicPlan(&cxt, &pLogicPlan);
if (code != TSDB_CODE_SUCCESS) {
cout << "sql:[" << cxt_.pSql << "] logic plan code:" << code << ", strerror:" << tstrerror(code) << endl;
......
......@@ -495,7 +495,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
ASSERT(pRes->info.rows > 0);
SInputData inputData = {.pData = pRes, .pTableRetrieveTsMap = NULL};
SInputData inputData = {.pData = pRes};
code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue);
if (code) {
QW_TASK_ELOG("dsPutDataBlock failed, code:%s", tstrerror(code));
......
......@@ -963,7 +963,7 @@ TEST(seqTest, randCase) {
stubSetRpcSendResponse();
stubSetCreateExecTask();
taosSeedRand(time(NULL));
taosSeedRand(taosGetTimestampSec());
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
ASSERT_EQ(code, 0);
......@@ -1025,7 +1025,7 @@ TEST(seqTest, multithreadRand) {
stubSetStringToPlan();
stubSetRpcSendResponse();
taosSeedRand(time(NULL));
taosSeedRand(taosGetTimestampSec());
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
ASSERT_EQ(code, 0);
......@@ -1076,7 +1076,7 @@ TEST(rcTest, shortExecshortDelay) {
stubSetPutDataBlock();
stubSetGetDataBlock();
taosSeedRand(time(NULL));
taosSeedRand(taosGetTimestampSec());
qwtTestStop = false;
qwtTestQuitThreadNum = 0;
......@@ -1157,7 +1157,7 @@ TEST(rcTest, longExecshortDelay) {
stubSetPutDataBlock();
stubSetGetDataBlock();
taosSeedRand(time(NULL));
taosSeedRand(taosGetTimestampSec());
qwtTestStop = false;
qwtTestQuitThreadNum = 0;
......@@ -1240,7 +1240,7 @@ TEST(rcTest, shortExeclongDelay) {
stubSetPutDataBlock();
stubSetGetDataBlock();
taosSeedRand(time(NULL));
taosSeedRand(taosGetTimestampSec());
qwtTestStop = false;
qwtTestQuitThreadNum = 0;
......@@ -1324,7 +1324,7 @@ TEST(rcTest, dropTest) {
stubSetPutDataBlock();
stubSetGetDataBlock();
taosSeedRand(time(NULL));
taosSeedRand(taosGetTimestampSec());
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
ASSERT_EQ(code, 0);
......@@ -1358,7 +1358,7 @@ TEST(rcTest, dropTest) {
int main(int argc, char** argv) {
taosSeedRand(time(NULL));
taosSeedRand(taosGetTimestampSec());
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
......
......@@ -411,27 +411,26 @@ int32_t vectorConvertImpl(SScalarParam* pIn, SScalarParam* pOut) {
}
int8_t gConvertTypes[TSDB_DATA_TYPE_BLOB+1][TSDB_DATA_TYPE_BLOB+1] = {
/* NULL BOOL TINY SMAL INT BIG FLOA DOUB BINA TIME NCHA UTIN USMA UINT UBIG VARC VARB JSON DECI BLOB */
/*NULL*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
/*BOOL*/ 0, 0, 0, 3, 4, 5, 6, 7, 7, 9, 7, 0, 12, 13, 14, 7, 7, 0, 0, 0,
/*TINY*/ 0, 0, 0, 3, 4, 5, 6, 7, 7, 9, 7, 3, 4, 5, 7, 7, 7, 0, 0, 0,
/*SMAL*/ 0, 0, 0, 0, 4, 5, 6, 7, 7, 9, 7, 3, 4, 5, 7, 7, 7, 0, 0, 0,
/*INT */ 0, 0, 0, 0, 0, 5, 6, 7, 7, 9, 7, 4, 4, 5, 7, 7, 7, 0, 0, 0,
/*BIGI*/ 0, 0, 0, 0, 0, 0, 6, 7, 7, 0, 7, 5, 5, 5, 7, 7, 7, 0, 0, 0,
/*FLOA*/ 0, 0, 0, 0, 0, 0, 0, 7, 7, 6, 7, 6, 6, 6, 6, 7, 7, 0, 0, 0,
/*DOUB*/ 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 7, 7, 7, 7, 7, 0, 0, 0,
/*BINA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 7, 7, 7, 7, 0, 0, 0, 0, 0,
/*TIME*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 9, 9, 9, 7, 7, 7, 0, 0, 0,
/*NCHA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 0, 0, 0, 0, 0,
/*UTIN*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 12, 13, 14, 7, 7, 0, 0, 0,
/*USMA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 13, 14, 7, 7, 0, 0, 0,
/*UINT*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 7, 7, 0, 0, 0,
/*UBIG*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 0, 0, 0,
/*VARC*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
/*VARB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
/*JSON*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
/*DECI*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
/*BLOB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
/* NULL BOOL TINY SMAL INT BIG FLOA DOUB VARC TIME NCHA UTIN USMA UINT UBIG VARB JSON DECI BLOB */
/*NULL*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
/*BOOL*/ 0, 0, 0, 3, 4, 5, 6, 7, 7, 9, 7, 0, 12, 13, 14, 7, 0, 0, 0,
/*TINY*/ 0, 0, 0, 3, 4, 5, 6, 7, 7, 9, 7, 3, 4, 5, 7, 7, 0, 0, 0,
/*SMAL*/ 0, 0, 0, 0, 4, 5, 6, 7, 7, 9, 7, 3, 4, 5, 7, 7, 0, 0, 0,
/*INT */ 0, 0, 0, 0, 0, 5, 6, 7, 7, 9, 7, 4, 4, 5, 7, 7, 0, 0, 0,
/*BIGI*/ 0, 0, 0, 0, 0, 0, 6, 7, 7, 0, 7, 5, 5, 5, 7, 7, 0, 0, 0,
/*FLOA*/ 0, 0, 0, 0, 0, 0, 0, 7, 7, 6, 7, 6, 6, 6, 6, 7, 0, 0, 0,
/*DOUB*/ 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 7, 7, 7, 7, 0, 0, 0,
/*VARC*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 7, 7, 7, 7, 0, 0, 0, 0,
/*TIME*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 9, 9, 9, 7, 7, 0, 0, 0,
/*NCHA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 0, 0, 0, 0,
/*UTIN*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 12, 13, 14, 7, 0, 0, 0,
/*USMA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 13, 14, 7, 0, 0, 0,
/*UINT*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 7, 0, 0, 0,
/*UBIG*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0,
/*VARB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
/*JSON*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
/*DECI*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
/*BLOB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
};
int32_t vectorGetConvertType(int32_t type1, int32_t type2) {
......
......@@ -1286,7 +1286,7 @@ TEST(scalarModelogicTest, diff_columns_or_and_or) {
int main(int argc, char** argv) {
taosSeedRand(time(NULL));
taosSeedRand(taosGetTimestampSec());
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
......
......@@ -1435,7 +1435,7 @@ TEST(columnTest, greater_and_lower) {
int main(int argc, char** argv) {
taosSeedRand(time(NULL));
taosSeedRand(taosGetTimestampSec());
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
......
......@@ -724,7 +724,7 @@ TEST(queryTest, flowCtrlCase) {
schtInitLogFile();
taosSeedRand(time(NULL));
taosSeedRand(taosGetTimestampSec());
SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
......@@ -873,7 +873,7 @@ TEST(multiThread, forceFree) {
}
int main(int argc, char** argv) {
taosSeedRand(time(NULL));
taosSeedRand(taosGetTimestampSec());
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
......
......@@ -42,6 +42,12 @@ SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId
cJSON * syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr);
char * syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr);
// for debug -------------------
void syncIndexMgrPrint(SSyncIndexMgr *pObj);
void syncIndexMgrPrint2(char *s, SSyncIndexMgr *pObj);
void syncIndexMgrLog(SSyncIndexMgr *pObj);
void syncIndexMgrLog2(char *s, SSyncIndexMgr *pObj);
#ifdef __cplusplus
}
#endif
......
......@@ -211,11 +211,14 @@ int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode);
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode);
// for debug
cJSON* syncNode2Json(const SSyncNode* pSyncNode);
char* syncNode2Str(const SSyncNode* pSyncNode);
void syncNodePrint(char* s, const SSyncNode* pSyncNode);
cJSON* syncNode2Json(const SSyncNode* pSyncNode);
char* syncNode2Str(const SSyncNode* pSyncNode);
// for debug --------------
void syncNodePrint(SSyncNode* pObj);
void syncNodePrint2(char* s, SSyncNode* pObj);
void syncNodeLog(SSyncNode* pObj);
void syncNodeLog2(char* s, SSyncNode* pObj);
#ifdef __cplusplus
}
......
......@@ -47,6 +47,12 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg);
cJSON* syncRpcUnknownMsg2Json();
char* syncRpcMsg2Str(SRpcMsg* pRpcMsg);
// for debug ----------------------
void syncRpcMsgPrint(SRpcMsg* pMsg);
void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg);
void syncRpcMsgLog(SRpcMsg* pMsg);
void syncRpcMsgLog2(char* s, SRpcMsg* pMsg);
// ---------------------------------------------
typedef enum ESyncTimeoutType {
SYNC_TIMEOUT_PING = 100,
......@@ -60,17 +66,27 @@ typedef struct SyncTimeout {
ESyncTimeoutType timeoutType;
uint64_t logicClock;
int32_t timerMS;
void* data;
void* data; // need optimized
} SyncTimeout;
SyncTimeout* syncTimeoutBuild();
SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, void* data);
void syncTimeoutDestroy(SyncTimeout* pMsg);
void syncTimeoutSerialize(const SyncTimeout* pMsg, char* buf, uint32_t bufLen);
void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* pMsg);
char* syncTimeoutSerialize2(const SyncTimeout* pMsg, uint32_t* len); //
SyncTimeout* syncTimeoutDeserialize2(const char* buf, uint32_t len); //
void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg);
void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg);
SyncTimeout* syncTimeoutFromRpcMsg2(const SRpcMsg* pRpcMsg); //
cJSON* syncTimeout2Json(const SyncTimeout* pMsg);
SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, void* data);
char* syncTimeout2Str(const SyncTimeout* pMsg); //
// for debug ----------------------
void syncTimeoutPrint(const SyncTimeout* pMsg);
void syncTimeoutPrint2(char* s, const SyncTimeout* pMsg);
void syncTimeoutLog(const SyncTimeout* pMsg);
void syncTimeoutLog2(char* s, const SyncTimeout* pMsg);
// ---------------------------------------------
typedef struct SyncPing {
......@@ -83,17 +99,25 @@ typedef struct SyncPing {
char data[];
} SyncPing;
#define SYNC_PING_FIX_LEN (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t))
SyncPing* syncPingBuild(uint32_t dataLen);
SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str);
SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId);
void syncPingDestroy(SyncPing* pMsg);
void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen);
void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg);
char* syncPingSerialize2(const SyncPing* pMsg, uint32_t* len);
SyncPing* syncPingDeserialize2(const char* buf, uint32_t len);
void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg);
void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg);
SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncPing2Json(const SyncPing* pMsg);
SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str);
SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId);
char* syncPing2Str(const SyncPing* pMsg);
// for debug ----------------------
void syncPingPrint(const SyncPing* pMsg);
void syncPingPrint2(char* s, const SyncPing* pMsg);
void syncPingLog(const SyncPing* pMsg);
void syncPingLog2(char* s, const SyncPing* pMsg);
// ---------------------------------------------
typedef struct SyncPingReply {
......@@ -106,18 +130,25 @@ typedef struct SyncPingReply {
char data[];
} SyncPingReply;
#define SYNC_PING_REPLY_FIX_LEN \
(sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t))
SyncPingReply* syncPingReplyBuild(uint32_t dataLen);
SyncPingReply* syncPingReplyBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str);
SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId);
void syncPingReplyDestroy(SyncPingReply* pMsg);
void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLen);
void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg);
char* syncPingReplySerialize2(const SyncPingReply* pMsg, uint32_t* len); //
SyncPingReply* syncPingReplyDeserialize2(const char* buf, uint32_t len); //
void syncPingReply2RpcMsg(const SyncPingReply* pMsg, SRpcMsg* pRpcMsg);
void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg);
SyncPingReply* syncPingReplyFromRpcMsg2(const SRpcMsg* pRpcMsg); //
cJSON* syncPingReply2Json(const SyncPingReply* pMsg);
SyncPingReply* syncPingReplyBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str);
SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId);
char* syncPingReply2Str(const SyncPingReply* pMsg); //
// for debug ----------------------
void syncPingReplyPrint(const SyncPingReply* pMsg);
void syncPingReplyPrint2(char* s, const SyncPingReply* pMsg);
void syncPingReplyLog(const SyncPingReply* pMsg);
void syncPingReplyLog2(char* s, const SyncPingReply* pMsg);
// ---------------------------------------------
typedef struct SyncClientRequest {
......@@ -131,13 +162,23 @@ typedef struct SyncClientRequest {
} SyncClientRequest;
SyncClientRequest* syncClientRequestBuild(uint32_t dataLen);
SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak);
void syncClientRequestDestroy(SyncClientRequest* pMsg);
void syncClientRequestSerialize(const SyncClientRequest* pMsg, char* buf, uint32_t bufLen);
void syncClientRequestDeserialize(const char* buf, uint32_t len, SyncClientRequest* pMsg);
char* syncClientRequestSerialize2(const SyncClientRequest* pMsg, uint32_t* len);
SyncClientRequest* syncClientRequestDeserialize2(const char* buf, uint32_t len);
void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg);
void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg);
SyncClientRequest* syncClientRequestFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg);
SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak);
char* syncClientRequest2Str(const SyncClientRequest* pMsg);
// for debug ----------------------
void syncClientRequestPrint(const SyncClientRequest* pMsg);
void syncClientRequestPrint2(char* s, const SyncClientRequest* pMsg);
void syncClientRequestLog(const SyncClientRequest* pMsg);
void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg);
// ---------------------------------------------
typedef struct SyncClientRequestReply {
......@@ -163,9 +204,19 @@ SyncRequestVote* syncRequestVoteBuild();
void syncRequestVoteDestroy(SyncRequestVote* pMsg);
void syncRequestVoteSerialize(const SyncRequestVote* pMsg, char* buf, uint32_t bufLen);
void syncRequestVoteDeserialize(const char* buf, uint32_t len, SyncRequestVote* pMsg);
char* syncRequestVoteSerialize2(const SyncRequestVote* pMsg, uint32_t* len);
SyncRequestVote* syncRequestVoteDeserialize2(const char* buf, uint32_t len);
void syncRequestVote2RpcMsg(const SyncRequestVote* pMsg, SRpcMsg* pRpcMsg);
void syncRequestVoteFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVote* pMsg);
SyncRequestVote* syncRequestVoteFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg);
char* syncRequestVote2Str(const SyncRequestVote* pMsg);
// for debug ----------------------
void syncRequestVotePrint(const SyncRequestVote* pMsg);
void syncRequestVotePrint2(char* s, const SyncRequestVote* pMsg);
void syncRequestVoteLog(const SyncRequestVote* pMsg);
void syncRequestVoteLog2(char* s, const SyncRequestVote* pMsg);
// ---------------------------------------------
typedef struct SyncRequestVoteReply {
......@@ -178,13 +229,23 @@ typedef struct SyncRequestVoteReply {
bool voteGranted;
} SyncRequestVoteReply;
SyncRequestVoteReply* SyncRequestVoteReplyBuild();
SyncRequestVoteReply* syncRequestVoteReplyBuild();
void syncRequestVoteReplyDestroy(SyncRequestVoteReply* pMsg);
void syncRequestVoteReplySerialize(const SyncRequestVoteReply* pMsg, char* buf, uint32_t bufLen);
void syncRequestVoteReplyDeserialize(const char* buf, uint32_t len, SyncRequestVoteReply* pMsg);
char* syncRequestVoteReplySerialize2(const SyncRequestVoteReply* pMsg, uint32_t* len);
SyncRequestVoteReply* syncRequestVoteReplyDeserialize2(const char* buf, uint32_t len);
void syncRequestVoteReply2RpcMsg(const SyncRequestVoteReply* pMsg, SRpcMsg* pRpcMsg);
void syncRequestVoteReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVoteReply* pMsg);
SyncRequestVoteReply* syncRequestVoteReplyFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg);
char* syncRequestVoteReply2Str(const SyncRequestVoteReply* pMsg);
// for debug ----------------------
void syncRequestVoteReplyPrint(const SyncRequestVoteReply* pMsg);
void syncRequestVoteReplyPrint2(char* s, const SyncRequestVoteReply* pMsg);
void syncRequestVoteReplyLog(const SyncRequestVoteReply* pMsg);
void syncRequestVoteReplyLog2(char* s, const SyncRequestVoteReply* pMsg);
// ---------------------------------------------
typedef struct SyncAppendEntries {
......@@ -200,17 +261,23 @@ typedef struct SyncAppendEntries {
char data[];
} SyncAppendEntries;
#define SYNC_APPEND_ENTRIES_FIX_LEN \
(sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(SyncIndex) + sizeof(SyncTerm) + \
sizeof(SyncIndex) + sizeof(uint32_t))
SyncAppendEntries* syncAppendEntriesBuild(uint32_t dataLen);
void syncAppendEntriesDestroy(SyncAppendEntries* pMsg);
void syncAppendEntriesSerialize(const SyncAppendEntries* pMsg, char* buf, uint32_t bufLen);
void syncAppendEntriesDeserialize(const char* buf, uint32_t len, SyncAppendEntries* pMsg);
char* syncAppendEntriesSerialize2(const SyncAppendEntries* pMsg, uint32_t* len);
SyncAppendEntries* syncAppendEntriesDeserialize2(const char* buf, uint32_t len);
void syncAppendEntries2RpcMsg(const SyncAppendEntries* pMsg, SRpcMsg* pRpcMsg);
void syncAppendEntriesFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntries* pMsg);
SyncAppendEntries* syncAppendEntriesFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg);
char* syncAppendEntries2Str(const SyncAppendEntries* pMsg);
// for debug ----------------------
void syncAppendEntriesPrint(const SyncAppendEntries* pMsg);
void syncAppendEntriesPrint2(char* s, const SyncAppendEntries* pMsg);
void syncAppendEntriesLog(const SyncAppendEntries* pMsg);
void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg);
// ---------------------------------------------
typedef struct SyncAppendEntriesReply {
......@@ -227,9 +294,19 @@ SyncAppendEntriesReply* syncAppendEntriesReplyBuild();
void syncAppendEntriesReplyDestroy(SyncAppendEntriesReply* pMsg);
void syncAppendEntriesReplySerialize(const SyncAppendEntriesReply* pMsg, char* buf, uint32_t bufLen);
void syncAppendEntriesReplyDeserialize(const char* buf, uint32_t len, SyncAppendEntriesReply* pMsg);
char* syncAppendEntriesReplySerialize2(const SyncAppendEntriesReply* pMsg, uint32_t* len);
SyncAppendEntriesReply* syncAppendEntriesReplyDeserialize2(const char* buf, uint32_t len);
void syncAppendEntriesReply2RpcMsg(const SyncAppendEntriesReply* pMsg, SRpcMsg* pRpcMsg);
void syncAppendEntriesReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesReply* pMsg);
SyncAppendEntriesReply* syncAppendEntriesReplyFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg);
char* syncAppendEntriesReply2Str(const SyncAppendEntriesReply* pMsg);
// for debug ----------------------
void syncAppendEntriesReplyPrint(const SyncAppendEntriesReply* pMsg);
void syncAppendEntriesReplyPrint2(char* s, const SyncAppendEntriesReply* pMsg);
void syncAppendEntriesReplyLog(const SyncAppendEntriesReply* pMsg);
void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg);
#ifdef __cplusplus
}
......
......@@ -46,8 +46,12 @@ char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len);
SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len);
cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry);
char* syncEntry2Str(const SSyncRaftEntry* pEntry);
void syncEntryPrint(const SSyncRaftEntry* pEntry);
void syncEntryPrint2(char *s, const SSyncRaftEntry* pEntry);
// for debug ----------------------
void syncEntryPrint(const SSyncRaftEntry* pObj);
void syncEntryPrint2(char* s, const SSyncRaftEntry* pObj);
void syncEntryLog(const SSyncRaftEntry* pObj);
void syncEntryLog2(char* s, const SSyncRaftEntry* pObj);
#ifdef __cplusplus
}
......
......@@ -32,39 +32,24 @@ typedef struct SSyncLogStoreData {
SWal* pWal;
} SSyncLogStoreData;
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode);
void logStoreDestory(SSyncLogStore* pLogStore);
// append one log entry
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
// get one log entry, user need to free pEntry->pCont
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode);
void logStoreDestory(SSyncLogStore* pLogStore);
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index);
// truncate log with index, entries after the given index (>=index) will be deleted
int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex);
// return index of last entry
SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore);
// return term of last entry
SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore);
// update log store commit index with "index"
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index);
// return commit index of log
SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore);
int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex);
SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore);
SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore);
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index);
SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore);
SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore);
cJSON* logStore2Json(SSyncLogStore* pLogStore);
char* logStore2Str(SSyncLogStore* pLogStore);
cJSON* logStore2Json(SSyncLogStore* pLogStore);
char* logStore2Str(SSyncLogStore* pLogStore);
// for debug
void logStorePrint(SSyncLogStore* pLogStore);
void logStorePrint2(char* s, SSyncLogStore* pLogStore);
void logStoreLog(SSyncLogStore* pLogStore);
void logStoreLog2(char* s, SSyncLogStore* pLogStore);
#ifdef __cplusplus
}
......
......@@ -42,7 +42,12 @@ int32_t raftStoreClose(SRaftStore *pRaftStore);
int32_t raftStorePersist(SRaftStore *pRaftStore);
int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len);
int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len);
void raftStorePrint(SRaftStore *pRaftStore);
// for debug -------------------
void raftStorePrint(SRaftStore *pObj);
void raftStorePrint2(char *s, SRaftStore *pObj);
void raftStoreLog(SRaftStore *pObj);
void raftStoreLog2(char *s, SRaftStore *pObj);
#ifdef __cplusplus
}
......
......@@ -48,6 +48,12 @@ void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term);
cJSON * voteGranted2Json(SVotesGranted *pVotesGranted);
char * voteGranted2Str(SVotesGranted *pVotesGranted);
// for debug -------------------
void voteGrantedPrint(SVotesGranted *pObj);
void voteGrantedPrint2(char *s, SVotesGranted *pObj);
void voteGrantedLog(SVotesGranted *pObj);
void voteGrantedLog2(char *s, SVotesGranted *pObj);
// SVotesRespond -----------------------------
typedef struct SVotesRespond {
SRaftId (*replicas)[TSDB_MAX_REPLICA];
......@@ -65,6 +71,12 @@ void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term);
cJSON * votesRespond2Json(SVotesRespond *pVotesRespond);
char * votesRespond2Str(SVotesRespond *pVotesRespond);
// for debug -------------------
void votesRespondPrint(SVotesRespond *pObj);
void votesRespondPrint2(char *s, SVotesRespond *pObj);
void votesRespondLog(SVotesRespond *pObj);
void votesRespondLog2(char *s, SVotesRespond *pObj);
#ifdef __cplusplus
}
#endif
......
......@@ -28,7 +28,7 @@ static void doSyncEnvStopTimer(SSyncEnv *pSyncEnv, tmr_h *pTimer);
int32_t syncEnvStart() {
int32_t ret;
taosSeedRand(time(NULL));
taosSeedRand(taosGetTimestampSec());
gSyncEnv = (SSyncEnv *)malloc(sizeof(SSyncEnv));
assert(gSyncEnv != NULL);
ret = doSyncEnvStart(gSyncEnv);
......
......@@ -44,7 +44,7 @@ int32_t syncIOStart(char *host, uint16_t port) {
gSyncIO = syncIOCreate(host, port);
assert(gSyncIO != NULL);
taosSeedRand(time(NULL));
taosSeedRand(taosGetTimestampSec());
int32_t ret = syncIOStartInternal(gSyncIO);
assert(ret == 0);
......@@ -263,7 +263,7 @@ static void *syncIOConsumerFunc(void *param) {
} else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE_REPLY) {
if (io->FpOnSyncRequestVoteReply != NULL) {
SyncRequestVoteReply *pSyncMsg;
pSyncMsg = SyncRequestVoteReplyBuild();
pSyncMsg = syncRequestVoteReplyBuild();
syncRequestVoteReplyFromRpcMsg(pRpcMsg, pSyncMsg);
io->FpOnSyncRequestVoteReply(io->pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
......
......@@ -97,4 +97,31 @@ char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) {
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// for debug -------------------
void syncIndexMgrPrint(SSyncIndexMgr *pObj) {
char *serialized = syncIndexMgr2Str(pObj);
printf("syncIndexMgrPrint | len:%lu | %s \n", strlen(serialized), serialized);
fflush(NULL);
free(serialized);
}
void syncIndexMgrPrint2(char *s, SSyncIndexMgr *pObj) {
char *serialized = syncIndexMgr2Str(pObj);
printf("syncIndexMgrPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
fflush(NULL);
free(serialized);
}
void syncIndexMgrLog(SSyncIndexMgr *pObj) {
char *serialized = syncIndexMgr2Str(pObj);
sTrace("syncIndexMgrLog | len:%lu | %s", strlen(serialized), serialized);
free(serialized);
}
void syncIndexMgrLog2(char *s, SSyncIndexMgr *pObj) {
char *serialized = syncIndexMgr2Str(pObj);
sTrace("syncIndexMgrLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
free(serialized);
}
\ No newline at end of file
......@@ -327,11 +327,31 @@ char* syncNode2Str(const SSyncNode* pSyncNode) {
return serialized;
}
void syncNodePrint(char* s, const SSyncNode* pSyncNode) {
char* ss = syncNode2Str(pSyncNode);
// sTrace("syncNodePrint: %s [len:%lu]| %s", s, strlen(ss), ss);
fprintf(stderr, "syncNodePrint: %s [len:%lu]| %s", s, strlen(ss), ss);
free(ss);
// for debug --------------
void syncNodePrint(SSyncNode* pObj) {
char* serialized = syncNode2Str(pObj);
printf("syncNodePrint | len:%lu | %s \n", strlen(serialized), serialized);
fflush(NULL);
free(serialized);
}
void syncNodePrint2(char* s, SSyncNode* pObj) {
char* serialized = syncNode2Str(pObj);
printf("syncNodePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
fflush(NULL);
free(serialized);
}
void syncNodeLog(SSyncNode* pObj) {
char* serialized = syncNode2Str(pObj);
sTrace("syncNodeLog | len:%lu | %s", strlen(serialized), serialized);
free(serialized);
}
void syncNodeLog2(char* s, SSyncNode* pObj) {
char* serialized = syncNode2Str(pObj);
sTrace("syncNodeLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
free(serialized);
}
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
......
此差异已折叠。
......@@ -104,14 +104,29 @@ char* syncEntry2Str(const SSyncRaftEntry* pEntry) {
return serialized;
}
void syncEntryPrint(const SSyncRaftEntry* pEntry) {
char* s = syncEntry2Str(pEntry);
sTrace("%s", s);
free(s);
// for debug ----------------------
void syncEntryPrint(const SSyncRaftEntry* pObj) {
char* serialized = syncEntry2Str(pObj);
printf("syncEntryPrint | len:%lu | %s \n", strlen(serialized), serialized);
fflush(NULL);
free(serialized);
}
void syncEntryPrint2(char* s, const SSyncRaftEntry* pObj) {
char* serialized = syncEntry2Str(pObj);
printf("syncEntryPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
fflush(NULL);
free(serialized);
}
void syncEntryLog(const SSyncRaftEntry* pObj) {
char* serialized = syncEntry2Str(pObj);
sTrace("syncEntryLog | len:%lu | %s", strlen(serialized), serialized);
free(serialized);
}
void syncEntryPrint2(char* s, const SSyncRaftEntry* pEntry) {
char* ss = syncEntry2Str(pEntry);
sTrace("%s | %s", s, ss);
free(ss);
void syncEntryLog2(char* s, const SSyncRaftEntry* pObj) {
char* serialized = syncEntry2Str(pObj);
sTrace("syncEntryLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
free(serialized);
}
\ No newline at end of file
......@@ -43,7 +43,6 @@ void logStoreDestory(SSyncLogStore* pLogStore) {
}
}
// append one log entry
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
......@@ -61,7 +60,6 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
free(serialized);
}
// get one log entry, user need to free pEntry->pCont
SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
......@@ -77,14 +75,12 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
return pEntry;
}
// truncate log with index, entries after the given index (>=index) will be deleted
int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
walRollback(pWal, fromIndex);
}
// return index of last entry
SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
......@@ -92,7 +88,6 @@ SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) {
return lastIndex;
}
// return term of last entry
SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {
SyncTerm lastTerm = 0;
SSyncRaftEntry* pLastEntry = logStoreGetLastEntry(pLogStore);
......@@ -103,14 +98,12 @@ SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {
return lastTerm;
}
// update log store commit index with "index"
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
walCommit(pWal, index);
}
// return commit index of log
SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore) {
SSyncLogStoreData* pData = pLogStore->data;
return pData->pSyncNode->commitIndex;
......@@ -163,11 +156,29 @@ char* logStore2Str(SSyncLogStore* pLogStore) {
return serialized;
}
// for debug
// for debug -----------------
void logStorePrint(SSyncLogStore* pLogStore) {
char* s = logStore2Str(pLogStore);
// sTrace("%s", s);
fprintf(stderr, "logStorePrint: [len:%lu]| %s \n", strlen(s), s);
char* serialized = logStore2Str(pLogStore);
printf("logStorePrint | len:%lu | %s \n", strlen(serialized), serialized);
fflush(NULL);
free(serialized);
}
void logStorePrint2(char* s, SSyncLogStore* pLogStore) {
char* serialized = logStore2Str(pLogStore);
printf("logStorePrint | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
fflush(NULL);
free(serialized);
}
free(s);
void logStoreLog(SSyncLogStore* pLogStore) {
char* serialized = logStore2Str(pLogStore);
sTrace("logStorePrint | len:%lu | %s", strlen(serialized), serialized);
free(serialized);
}
void logStoreLog2(char* s, SSyncLogStore* pLogStore) {
char* serialized = logStore2Str(pLogStore);
sTrace("logStorePrint | len:%lu | %s | %s", strlen(serialized), s, serialized);
free(serialized);
}
\ No newline at end of file
......@@ -135,8 +135,30 @@ int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) {
return 0;
}
void raftStorePrint(SRaftStore *pRaftStore) {
char storeBuf[RAFT_STORE_BLOCK_SIZE];
raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf));
printf("%s\n", storeBuf);
// for debug -------------------
void raftStorePrint(SRaftStore *pObj) {
char serialized[RAFT_STORE_BLOCK_SIZE];
raftStoreSerialize(pObj, serialized, sizeof(serialized));
printf("raftStorePrint | len:%lu | %s \n", strlen(serialized), serialized);
fflush(NULL);
}
void raftStorePrint2(char *s, SRaftStore *pObj) {
char serialized[RAFT_STORE_BLOCK_SIZE];
raftStoreSerialize(pObj, serialized, sizeof(serialized));
printf("raftStorePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
fflush(NULL);
}
void raftStoreLog(SRaftStore *pObj) {
char serialized[RAFT_STORE_BLOCK_SIZE];
raftStoreSerialize(pObj, serialized, sizeof(serialized));
sTrace("raftStoreLog | len:%lu | %s", strlen(serialized), serialized);
fflush(NULL);
}
void raftStoreLog2(char *s, SRaftStore *pObj) {
char serialized[RAFT_STORE_BLOCK_SIZE];
raftStoreSerialize(pObj, serialized, sizeof(serialized));
sTrace("raftStoreLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
fflush(NULL);
}
......@@ -119,6 +119,33 @@ char *voteGranted2Str(SVotesGranted *pVotesGranted) {
return serialized;
}
// for debug -------------------
void voteGrantedPrint(SVotesGranted *pObj) {
char *serialized = voteGranted2Str(pObj);
printf("voteGrantedPrint | len:%lu | %s \n", strlen(serialized), serialized);
fflush(NULL);
free(serialized);
}
void voteGrantedPrint2(char *s, SVotesGranted *pObj) {
char *serialized = voteGranted2Str(pObj);
printf("voteGrantedPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
fflush(NULL);
free(serialized);
}
void voteGrantedLog(SVotesGranted *pObj) {
char *serialized = voteGranted2Str(pObj);
sTrace("voteGrantedLog | len:%lu | %s", strlen(serialized), serialized);
free(serialized);
}
void voteGrantedLog2(char *s, SVotesGranted *pObj) {
char *serialized = voteGranted2Str(pObj);
sTrace("voteGrantedLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
free(serialized);
}
// SVotesRespond -----------------------------
SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode) {
SVotesRespond *pVotesRespond = malloc(sizeof(SVotesRespond));
......@@ -210,4 +237,31 @@ char *votesRespond2Str(SVotesRespond *pVotesRespond) {
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// for debug -------------------
void votesRespondPrint(SVotesRespond *pObj) {
char *serialized = votesRespond2Str(pObj);
printf("votesRespondPrint | len:%lu | %s \n", strlen(serialized), serialized);
fflush(NULL);
free(serialized);
}
void votesRespondPrint2(char *s, SVotesRespond *pObj) {
char *serialized = votesRespond2Str(pObj);
printf("votesRespondPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
fflush(NULL);
free(serialized);
}
void votesRespondLog(SVotesRespond *pObj) {
char *serialized = votesRespond2Str(pObj);
sTrace("votesRespondLog | len:%lu | %s", strlen(serialized), serialized);
free(serialized);
}
void votesRespondLog2(char *s, SVotesRespond *pObj) {
char *serialized = votesRespond2Str(pObj);
sTrace("votesRespondLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
free(serialized);
}
\ No newline at end of file
add_executable(syncTest "")
add_executable(syncEnvTest "")
add_executable(syncPingTest "")
add_executable(syncEncodeTest "")
add_executable(syncPingTimerTest "")
add_executable(syncIOTickQTest "")
add_executable(syncIOTickPingTest "")
add_executable(syncIOSendMsgTest "")
......@@ -17,6 +16,15 @@ add_executable(syncVotesRespondTest "")
add_executable(syncIndexMgrTest "")
add_executable(syncLogStoreTest "")
add_executable(syncEntryTest "")
add_executable(syncRequestVoteTest "")
add_executable(syncRequestVoteReplyTest "")
add_executable(syncAppendEntriesTest "")
add_executable(syncAppendEntriesReplyTest "")
add_executable(syncClientRequestTest "")
add_executable(syncTimeoutTest "")
add_executable(syncPingTest "")
add_executable(syncPingReplyTest "")
add_executable(syncRpcMsgTest "")
target_sources(syncTest
......@@ -27,13 +35,9 @@ target_sources(syncEnvTest
PRIVATE
"syncEnvTest.cpp"
)
target_sources(syncPingTest
target_sources(syncPingTimerTest
PRIVATE
"syncPingTest.cpp"
)
target_sources(syncEncodeTest
PRIVATE
"syncEncodeTest.cpp"
"syncPingTimerTest.cpp"
)
target_sources(syncIOTickQTest
PRIVATE
......@@ -95,6 +99,42 @@ target_sources(syncEntryTest
PRIVATE
"syncEntryTest.cpp"
)
target_sources(syncRequestVoteTest
PRIVATE
"syncRequestVoteTest.cpp"
)
target_sources(syncRequestVoteReplyTest
PRIVATE
"syncRequestVoteReplyTest.cpp"
)
target_sources(syncAppendEntriesTest
PRIVATE
"syncAppendEntriesTest.cpp"
)
target_sources(syncAppendEntriesReplyTest
PRIVATE
"syncAppendEntriesReplyTest.cpp"
)
target_sources(syncClientRequestTest
PRIVATE
"syncClientRequestTest.cpp"
)
target_sources(syncTimeoutTest
PRIVATE
"syncTimeoutTest.cpp"
)
target_sources(syncPingTest
PRIVATE
"syncPingTest.cpp"
)
target_sources(syncPingReplyTest
PRIVATE
"syncPingReplyTest.cpp"
)
target_sources(syncRpcMsgTest
PRIVATE
"syncRpcMsgTest.cpp"
)
target_include_directories(syncTest
......@@ -107,12 +147,7 @@ target_include_directories(syncEnvTest
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncPingTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncEncodeTest
target_include_directories(syncPingTimerTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
......@@ -192,6 +227,51 @@ target_include_directories(syncEntryTest
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncRequestVoteTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncRequestVoteReplyTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncAppendEntriesTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncAppendEntriesReplyTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncClientRequestTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncTimeoutTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncPingTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncPingReplyTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncRpcMsgTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries(syncTest
......@@ -202,11 +282,7 @@ target_link_libraries(syncEnvTest
sync
gtest_main
)
target_link_libraries(syncPingTest
sync
gtest_main
)
target_link_libraries(syncEncodeTest
target_link_libraries(syncPingTimerTest
sync
gtest_main
)
......@@ -270,6 +346,42 @@ target_link_libraries(syncEntryTest
sync
gtest_main
)
target_link_libraries(syncRequestVoteTest
sync
gtest_main
)
target_link_libraries(syncRequestVoteReplyTest
sync
gtest_main
)
target_link_libraries(syncAppendEntriesTest
sync
gtest_main
)
target_link_libraries(syncAppendEntriesReplyTest
sync
gtest_main
)
target_link_libraries(syncClientRequestTest
sync
gtest_main
)
target_link_libraries(syncTimeoutTest
sync
gtest_main
)
target_link_libraries(syncPingTest
sync
gtest_main
)
target_link_libraries(syncPingReplyTest
sync
gtest_main
)
target_link_libraries(syncRpcMsgTest
sync
gtest_main
)
enable_testing()
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -89,7 +89,7 @@ int main(int argc, char** argv) {
SSyncNode* pSyncNode = syncInitTest();
assert(pSyncNode != NULL);
syncNodePrint((char*)"syncInitTest", pSyncNode);
syncNodePrint2((char*)"syncInitTest", pSyncNode);
initRaftId(pSyncNode);
......
......@@ -83,7 +83,7 @@ SSyncNode* syncInitTest() { return syncNodeInit(); }
void logStoreTest() {
logStorePrint(pSyncNode->pLogStore);
for (int i = 0; i < 5; ++i) {
int32_t dataLen = 10;
int32_t dataLen = 10;
SSyncRaftEntry* pEntry = syncEntryBuild(dataLen);
assert(pEntry != NULL);
pEntry->msgType = 1;
......@@ -94,7 +94,7 @@ void logStoreTest() {
pEntry->index = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;
snprintf(pEntry->data, dataLen, "value%d", i);
//syncEntryPrint2((char*)"write entry:", pEntry);
// syncEntryPrint2((char*)"write entry:", pEntry);
pSyncNode->pLogStore->appendEntry(pSyncNode->pLogStore, pEntry);
syncEntryDestory(pEntry);
}
......@@ -132,8 +132,8 @@ int main(int argc, char** argv) {
pSyncNode = syncInitTest();
assert(pSyncNode != NULL);
//syncNodePrint((char*)"syncLogStoreTest", pSyncNode);
//initRaftId(pSyncNode);
// syncNodePrint((char*)"syncLogStoreTest", pSyncNode);
// initRaftId(pSyncNode);
logStoreTest();
......
此差异已折叠。
此差异已折叠。
......@@ -22,15 +22,21 @@ int main() {
SRaftStore *pRaftStore = raftStoreOpen("./raft_store.json");
assert(pRaftStore != NULL);
raftStorePrint(pRaftStore);
#if 0
pRaftStore->currentTerm = 100;
pRaftStore->voteFor.addr = 200;
pRaftStore->voteFor.vgId = 300;
raftStorePersist(pRaftStore);
raftStorePrint(pRaftStore);
#endif
++(pRaftStore->currentTerm);
++(pRaftStore->voteFor.addr);
++(pRaftStore->voteFor.vgId);
raftStorePersist(pRaftStore);
raftStorePrint(pRaftStore);
return 0;
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册