提交 fe14f49c 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into fix/dnode

...@@ -13,6 +13,7 @@ ELSEIF (TD_WINDOWS) ...@@ -13,6 +13,7 @@ ELSEIF (TD_WINDOWS)
INSTALL(FILES ${TD_SOURCE_DIR}/packaging/cfg/taos.cfg DESTINATION cfg) INSTALL(FILES ${TD_SOURCE_DIR}/packaging/cfg/taos.cfg DESTINATION cfg)
INSTALL(FILES ${TD_SOURCE_DIR}/include/client/taos.h DESTINATION include) INSTALL(FILES ${TD_SOURCE_DIR}/include/client/taos.h DESTINATION include)
INSTALL(FILES ${TD_SOURCE_DIR}/include/util/taoserror.h DESTINATION include) INSTALL(FILES ${TD_SOURCE_DIR}/include/util/taoserror.h DESTINATION include)
INSTALL(FILES ${TD_SOURCE_DIR}/include/libs/function/taosudf.h DESTINATION include)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos.lib DESTINATION driver) INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos.lib DESTINATION driver)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos_static.lib DESTINATION driver) INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos_static.lib DESTINATION driver)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos.dll DESTINATION driver) INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos.dll DESTINATION driver)
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TAOSUDF_H
#define TDENGINE_TAOSUDF_H
#include <stdint.h>
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>
#include <taos.h>
#include <taoserror.h>
#ifdef __cplusplus
extern "C" {
#endif
#if defined(__GNUC__)
#define FORCE_INLINE inline __attribute__((always_inline))
#else
#define FORCE_INLINE
#endif
typedef struct SUdfColumnMeta {
int16_t type;
int32_t bytes;
uint8_t precision;
uint8_t scale;
} SUdfColumnMeta;
typedef struct SUdfColumnData {
int32_t numOfRows;
int32_t rowsAlloc;
union {
struct {
int32_t nullBitmapLen;
char *nullBitmap;
int32_t dataLen;
char *data;
} fixLenCol;
struct {
int32_t varOffsetsLen;
int32_t *varOffsets;
int32_t payloadLen;
char *payload;
int32_t payloadAllocLen;
} varLenCol;
};
} SUdfColumnData;
typedef struct SUdfColumn {
SUdfColumnMeta colMeta;
bool hasNull;
SUdfColumnData colData;
} SUdfColumn;
typedef struct SUdfDataBlock {
int32_t numOfRows;
int32_t numOfCols;
SUdfColumn **udfCols;
} SUdfDataBlock;
typedef struct SUdfInterBuf {
int32_t bufLen;
char* buf;
int8_t numOfResult; //zero or one
} SUdfInterBuf;
typedef void *UdfcFuncHandle;
// dynamic lib init and destroy
typedef int32_t (*TUdfInitFunc)();
typedef int32_t (*TUdfDestroyFunc)();
#define UDF_MEMORY_EXP_GROWTH 1.5
#define NBIT (3u)
#define BitPos(_n) ((_n) & ((1 << NBIT) - 1))
#define BMCharPos(bm_, r_) ((bm_)[(r_) >> NBIT])
#define BitmapLen(_n) (((_n) + ((1 << NBIT) - 1)) >> NBIT)
#define udfColDataIsNull_var(pColumn, row) ((pColumn->colData.varLenCol.varOffsets)[row] == -1)
#define udfColDataIsNull_f(pColumn, row) ((BMCharPos(pColumn->colData.fixLenCol.nullBitmap, row) & (1u << (7u - BitPos(row)))) == (1u << (7u - BitPos(row))))
#define udfColDataSetNull_f(pColumn, row) \
do { \
BMCharPos(pColumn->colData.fixLenCol.nullBitmap, row) |= (1u << (7u - BitPos(row))); \
} while (0)
#define udfColDataSetNotNull_f(pColumn, r_) \
do { \
BMCharPos(pColumn->colData.fixLenCol.nullBitmap, r_) &= ~(1u << (7u - BitPos(r_))); \
} while (0)
#define udfColDataSetNull_var(pColumn, row) ((pColumn->colData.varLenCol.varOffsets)[row] = -1)
typedef uint16_t VarDataLenT; // maxVarDataLen: 32767
#define VARSTR_HEADER_SIZE sizeof(VarDataLenT)
#define varDataLen(v) ((VarDataLenT *)(v))[0]
#define varDataVal(v) ((char *)(v) + VARSTR_HEADER_SIZE)
#define varDataTLen(v) (sizeof(VarDataLenT) + varDataLen(v))
#define varDataCopy(dst, v) memcpy((dst), (void *)(v), varDataTLen(v))
#define varDataLenByData(v) (*(VarDataLenT *)(((char *)(v)) - VARSTR_HEADER_SIZE))
#define varDataSetLen(v, _len) (((VarDataLenT *)(v))[0] = (VarDataLenT)(_len))
#define IS_VAR_DATA_TYPE(t) \
(((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_NCHAR) || ((t) == TSDB_DATA_TYPE_JSON))
#define IS_STR_DATA_TYPE(t) (((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_NCHAR))
static FORCE_INLINE char* udfColDataGetData(const SUdfColumn* pColumn, int32_t row) {
if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
return pColumn->colData.varLenCol.payload + pColumn->colData.varLenCol.varOffsets[row];
} else {
return pColumn->colData.fixLenCol.data + pColumn->colMeta.bytes * row;
}
}
static FORCE_INLINE bool udfColDataIsNull(const SUdfColumn* pColumn, int32_t row) {
if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
if (pColumn->colMeta.type == TSDB_DATA_TYPE_JSON) {
if (udfColDataIsNull_var(pColumn, row)) {
return true;
}
char* data = udfColDataGetData(pColumn, row);
return (*data == TSDB_DATA_TYPE_NULL);
} else {
return udfColDataIsNull_var(pColumn, row);
}
} else {
return udfColDataIsNull_f(pColumn, row);
}
}
static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn* pColumn, int32_t newCapacity) {
SUdfColumnMeta *meta = &pColumn->colMeta;
SUdfColumnData *data = &pColumn->colData;
if (newCapacity== 0 || newCapacity <= data->rowsAlloc) {
return TSDB_CODE_SUCCESS;
}
int allocCapacity = (data->rowsAlloc< 8) ? 8 : data->rowsAlloc;
while (allocCapacity < newCapacity) {
allocCapacity *= UDF_MEMORY_EXP_GROWTH;
}
if (IS_VAR_DATA_TYPE(meta->type)) {
char* tmp = (char*)realloc(data->varLenCol.varOffsets, sizeof(int32_t) * allocCapacity);
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
data->varLenCol.varOffsets = (int32_t*)tmp;
data->varLenCol.varOffsetsLen = sizeof(int32_t) * allocCapacity;
// for payload, add data in udfColDataAppend
} else {
char* tmp = (char*)realloc(data->fixLenCol.nullBitmap, BitmapLen(allocCapacity));
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
data->fixLenCol.nullBitmap = tmp;
data->fixLenCol.nullBitmapLen = BitmapLen(allocCapacity);
if (meta->type == TSDB_DATA_TYPE_NULL) {
return TSDB_CODE_SUCCESS;
}
tmp = (char*)realloc(data->fixLenCol.data, allocCapacity* meta->bytes);
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
data->fixLenCol.data = tmp;
data->fixLenCol.dataLen = allocCapacity* meta->bytes;
}
data->rowsAlloc = allocCapacity;
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE void udfColDataSetNull(SUdfColumn* pColumn, int32_t row) {
udfColEnsureCapacity(pColumn, row+1);
if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
udfColDataSetNull_var(pColumn, row);
} else {
udfColDataSetNull_f(pColumn, row);
}
pColumn->hasNull = true;
}
static FORCE_INLINE int32_t udfColDataSet(SUdfColumn* pColumn, uint32_t currentRow, const char* pData, bool isNull) {
SUdfColumnMeta *meta = &pColumn->colMeta;
SUdfColumnData *data = &pColumn->colData;
udfColEnsureCapacity(pColumn, currentRow+1);
bool isVarCol = IS_VAR_DATA_TYPE(meta->type);
if (isNull) {
udfColDataSetNull(pColumn, currentRow);
} else {
if (!isVarCol) {
udfColDataSetNotNull_f(pColumn, currentRow);
memcpy(data->fixLenCol.data + meta->bytes * currentRow, pData, meta->bytes);
} else {
int32_t dataLen = varDataTLen(pData);
if (meta->type == TSDB_DATA_TYPE_JSON) {
if (*pData == TSDB_DATA_TYPE_NULL) {
dataLen = 0;
} else if (*pData == TSDB_DATA_TYPE_NCHAR) {
dataLen = varDataTLen(pData + sizeof(char));
} else if (*pData == TSDB_DATA_TYPE_BIGINT || *pData == TSDB_DATA_TYPE_DOUBLE) {
dataLen = sizeof(int64_t);
} else if (*pData == TSDB_DATA_TYPE_BOOL) {
dataLen = sizeof(char);
}
dataLen += sizeof(char);
}
if (data->varLenCol.payloadAllocLen < data->varLenCol.payloadLen + dataLen) {
uint32_t newSize = data->varLenCol.payloadAllocLen;
if (newSize <= 1) {
newSize = 8;
}
while (newSize < data->varLenCol.payloadLen + dataLen) {
newSize = newSize * UDF_MEMORY_EXP_GROWTH;
}
char *buf = (char*)realloc(data->varLenCol.payload, newSize);
if (buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
data->varLenCol.payload = buf;
data->varLenCol.payloadAllocLen = newSize;
}
uint32_t len = data->varLenCol.payloadLen;
data->varLenCol.varOffsets[currentRow] = len;
memcpy(data->varLenCol.payload + len, pData, dataLen);
data->varLenCol.payloadLen += dataLen;
}
}
data->numOfRows = (currentRow + 1 > data->numOfRows) ? (currentRow+1) : data->numOfRows;
return 0;
}
typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol);
typedef int32_t (*TUdfAggStartFunc)(SUdfInterBuf *buf);
typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf);
typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf* buf, SUdfInterBuf *resultData);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TAOSUDF_H
...@@ -16,6 +16,13 @@ ...@@ -16,6 +16,13 @@
#ifndef TDENGINE_TUDF_H #ifndef TDENGINE_TUDF_H
#define TDENGINE_TUDF_H #define TDENGINE_TUDF_H
#undef malloc
#define malloc malloc
#undef free
#define free free
#undef realloc
#define alloc alloc
#include <taosudf.h>
#include <stdint.h> #include <stdint.h>
#include <stdbool.h> #include <stdbool.h>
...@@ -36,56 +43,6 @@ extern "C" { ...@@ -36,56 +43,6 @@ extern "C" {
#endif #endif
#define UDF_DNODE_ID_ENV_NAME "DNODE_ID" #define UDF_DNODE_ID_ENV_NAME "DNODE_ID"
//======================================================================================
//begin API to taosd and qworker
typedef struct SUdfColumnMeta {
int16_t type;
int32_t bytes;
uint8_t precision;
uint8_t scale;
} SUdfColumnMeta;
typedef struct SUdfColumnData {
int32_t numOfRows;
int32_t rowsAlloc;
union {
struct {
int32_t nullBitmapLen;
char *nullBitmap;
int32_t dataLen;
char *data;
} fixLenCol;
struct {
int32_t varOffsetsLen;
int32_t *varOffsets;
int32_t payloadLen;
char *payload;
int32_t payloadAllocLen;
} varLenCol;
};
} SUdfColumnData;
typedef struct SUdfColumn {
SUdfColumnMeta colMeta;
bool hasNull;
SUdfColumnData colData;
} SUdfColumn;
typedef struct SUdfDataBlock {
int32_t numOfRows;
int32_t numOfCols;
SUdfColumn **udfCols;
} SUdfDataBlock;
typedef struct SUdfInterBuf {
int32_t bufLen;
char* buf;
int8_t numOfResult; //zero or one
} SUdfInterBuf;
typedef void *UdfcFuncHandle;
//low level APIs //low level APIs
/** /**
...@@ -127,177 +84,6 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock); ...@@ -127,177 +84,6 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock);
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output); int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output);
int32_t cleanUpUdfs(); int32_t cleanUpUdfs();
// end API to taosd and qworker
//=============================================================================================================================
// begin API to UDF writer.
// dynamic lib init and destroy
typedef int32_t (*TUdfInitFunc)();
typedef int32_t (*TUdfDestroyFunc)();
//TODO: add API to check function arguments type, number etc.
#define UDF_MEMORY_EXP_GROWTH 1.5
#define udfColDataIsNull_var(pColumn, row) ((pColumn->colData.varLenCol.varOffsets)[row] == -1)
#define udfColDataIsNull_f(pColumn, row) ((BMCharPos(pColumn->colData.fixLenCol.nullBitmap, row) & (1u << (7u - BitPos(row)))) == (1u << (7u - BitPos(row))))
#define udfColDataSetNull_f(pColumn, row) \
do { \
BMCharPos(pColumn->colData.fixLenCol.nullBitmap, row) |= (1u << (7u - BitPos(row))); \
} while (0)
#define udfColDataSetNotNull_f(pColumn, r_) \
do { \
BMCharPos(pColumn->colData.fixLenCol.nullBitmap, r_) &= ~(1u << (7u - BitPos(r_))); \
} while (0)
#define udfColDataSetNull_var(pColumn, row) ((pColumn->colData.varLenCol.varOffsets)[row] = -1)
static FORCE_INLINE char* udfColDataGetData(const SUdfColumn* pColumn, int32_t row) {
if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
return pColumn->colData.varLenCol.payload + pColumn->colData.varLenCol.varOffsets[row];
} else {
return pColumn->colData.fixLenCol.data + pColumn->colMeta.bytes * row;
}
}
static FORCE_INLINE bool udfColDataIsNull(const SUdfColumn* pColumn, int32_t row) {
if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
if (pColumn->colMeta.type == TSDB_DATA_TYPE_JSON) {
if (udfColDataIsNull_var(pColumn, row)) {
return true;
}
char* data = udfColDataGetData(pColumn, row);
return (*data == TSDB_DATA_TYPE_NULL);
} else {
return udfColDataIsNull_var(pColumn, row);
}
} else {
return udfColDataIsNull_f(pColumn, row);
}
}
static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn* pColumn, int32_t newCapacity) {
SUdfColumnMeta *meta = &pColumn->colMeta;
SUdfColumnData *data = &pColumn->colData;
if (newCapacity== 0 || newCapacity <= data->rowsAlloc) {
return TSDB_CODE_SUCCESS;
}
int allocCapacity = TMAX(data->rowsAlloc, 8);
while (allocCapacity < newCapacity) {
allocCapacity *= UDF_MEMORY_EXP_GROWTH;
}
if (IS_VAR_DATA_TYPE(meta->type)) {
char* tmp = taosMemoryRealloc(data->varLenCol.varOffsets, sizeof(int32_t) * allocCapacity);
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
data->varLenCol.varOffsets = (int32_t*)tmp;
data->varLenCol.varOffsetsLen = sizeof(int32_t) * allocCapacity;
// for payload, add data in udfColDataAppend
} else {
char* tmp = taosMemoryRealloc(data->fixLenCol.nullBitmap, BitmapLen(allocCapacity));
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
data->fixLenCol.nullBitmap = tmp;
data->fixLenCol.nullBitmapLen = BitmapLen(allocCapacity);
if (meta->type == TSDB_DATA_TYPE_NULL) {
return TSDB_CODE_SUCCESS;
}
tmp = taosMemoryRealloc(data->fixLenCol.data, allocCapacity* meta->bytes);
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
data->fixLenCol.data = tmp;
data->fixLenCol.dataLen = allocCapacity* meta->bytes;
}
data->rowsAlloc = allocCapacity;
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE void udfColDataSetNull(SUdfColumn* pColumn, int32_t row) {
udfColEnsureCapacity(pColumn, row+1);
if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
udfColDataSetNull_var(pColumn, row);
} else {
udfColDataSetNull_f(pColumn, row);
}
pColumn->hasNull = true;
}
static FORCE_INLINE int32_t udfColDataSet(SUdfColumn* pColumn, uint32_t currentRow, const char* pData, bool isNull) {
SUdfColumnMeta *meta = &pColumn->colMeta;
SUdfColumnData *data = &pColumn->colData;
udfColEnsureCapacity(pColumn, currentRow+1);
bool isVarCol = IS_VAR_DATA_TYPE(meta->type);
if (isNull) {
udfColDataSetNull(pColumn, currentRow);
} else {
if (!isVarCol) {
colDataSetNotNull_f(data->fixLenCol.nullBitmap, currentRow);
memcpy(data->fixLenCol.data + meta->bytes * currentRow, pData, meta->bytes);
} else {
int32_t dataLen = varDataTLen(pData);
if (meta->type == TSDB_DATA_TYPE_JSON) {
if (*pData == TSDB_DATA_TYPE_NULL) {
dataLen = 0;
} else if (*pData == TSDB_DATA_TYPE_NCHAR) {
dataLen = varDataTLen(pData + CHAR_BYTES);
} else if (*pData == TSDB_DATA_TYPE_BIGINT || *pData == TSDB_DATA_TYPE_DOUBLE) {
dataLen = LONG_BYTES;
} else if (*pData == TSDB_DATA_TYPE_BOOL) {
dataLen = CHAR_BYTES;
}
dataLen += CHAR_BYTES;
}
if (data->varLenCol.payloadAllocLen < data->varLenCol.payloadLen + dataLen) {
uint32_t newSize = data->varLenCol.payloadAllocLen;
if (newSize <= 1) {
newSize = 8;
}
while (newSize < data->varLenCol.payloadLen + dataLen) {
newSize = newSize * UDF_MEMORY_EXP_GROWTH;
}
char *buf = taosMemoryRealloc(data->varLenCol.payload, newSize);
if (buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
data->varLenCol.payload = buf;
data->varLenCol.payloadAllocLen = newSize;
}
uint32_t len = data->varLenCol.payloadLen;
data->varLenCol.varOffsets[currentRow] = len;
memcpy(data->varLenCol.payload + len, pData, dataLen);
data->varLenCol.payloadLen += dataLen;
}
}
data->numOfRows = TMAX(currentRow + 1, data->numOfRows);
return 0;
}
typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol);
typedef int32_t (*TUdfAggStartFunc)(SUdfInterBuf *buf);
typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf);
typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf* buf, SUdfInterBuf *resultData);
// end API to UDF writer
//=======================================================================================================================
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -25,7 +25,7 @@ extern "C" { ...@@ -25,7 +25,7 @@ extern "C" {
// clang-format off // clang-format off
#define TAOS_DEF_ERROR_CODE(mod, code) ((int32_t)((0x80000000 | ((mod)<<16) | (code)))) #define TAOS_DEF_ERROR_CODE(mod, code) ((int32_t)((0x80000000 | ((mod)<<16) | (code))))
#define TAOS_SYSTEM_ERROR(code) (0x80ff0000 | (code)) #define TAOS_SYSTEM_ERROR(code) (0x80ff0000 | (code))
#define TAOS_SUCCEEDED(err) ((err) >= 0) #define TAOS_SUCCEEDED(err) ((err) >= 0)
#define TAOS_FAILED(err) ((err) < 0) #define TAOS_FAILED(err) ((err) < 0)
...@@ -35,7 +35,7 @@ const char* terrstr(); ...@@ -35,7 +35,7 @@ const char* terrstr();
int32_t* taosGetErrno(); int32_t* taosGetErrno();
#define terrno (*taosGetErrno()) #define terrno (*taosGetErrno())
#define TSDB_CODE_SUCCESS 0 #define TSDB_CODE_SUCCESS 0
#define TSDB_CODE_FAILED -1 // unknown or needn't tell detail error #define TSDB_CODE_FAILED -1 // unknown or needn't tell detail error
......
...@@ -170,7 +170,7 @@ function check_lib_path() { ...@@ -170,7 +170,7 @@ function check_lib_path() {
function check_header_path() { function check_header_path() {
# check all header # check all header
header_dir=("taos.h" "taosdef.h" "taoserror.h") header_dir=("taos.h" "taosdef.h" "taoserror.h" "taosudf.h")
for i in "${header_dir[@]}";do for i in "${header_dir[@]}";do
check_link ${inc_link_dir}/$i check_link ${inc_link_dir}/$i
done done
......
...@@ -29,6 +29,7 @@ else ...@@ -29,6 +29,7 @@ else
${csudo}rm -f ${bin_link_dir}/taosdemo || : ${csudo}rm -f ${bin_link_dir}/taosdemo || :
${csudo}rm -f ${cfg_link_dir}/* || : ${csudo}rm -f ${cfg_link_dir}/* || :
${csudo}rm -f ${inc_link_dir}/taos.h || : ${csudo}rm -f ${inc_link_dir}/taos.h || :
${csudo}rm -f ${inc_link_dir}/taosudf.h || :
${csudo}rm -f ${lib_link_dir}/libtaos.* || : ${csudo}rm -f ${lib_link_dir}/libtaos.* || :
${csudo}rm -f ${log_link_dir} || : ${csudo}rm -f ${log_link_dir} || :
......
...@@ -70,6 +70,7 @@ cp ${compile_dir}/build/lib/${libfile} ${pkg_dir}${install_home_pat ...@@ -70,6 +70,7 @@ cp ${compile_dir}/build/lib/${libfile} ${pkg_dir}${install_home_pat
cp ${compile_dir}/../include/client/taos.h ${pkg_dir}${install_home_path}/include cp ${compile_dir}/../include/client/taos.h ${pkg_dir}${install_home_path}/include
cp ${compile_dir}/../include/common/taosdef.h ${pkg_dir}${install_home_path}/include cp ${compile_dir}/../include/common/taosdef.h ${pkg_dir}${install_home_path}/include
cp ${compile_dir}/../include/util/taoserror.h ${pkg_dir}${install_home_path}/include cp ${compile_dir}/../include/util/taoserror.h ${pkg_dir}${install_home_path}/include
cp ${compile_dir}/../include/libs/function/taosudf.h ${pkg_dir}${install_home_path}/include
cp -r ${top_dir}/examples/* ${pkg_dir}${install_home_path}/examples cp -r ${top_dir}/examples/* ${pkg_dir}${install_home_path}/examples
#cp -r ${top_dir}/src/connector/python ${pkg_dir}${install_home_path}/connector #cp -r ${top_dir}/src/connector/python ${pkg_dir}${install_home_path}/connector
#cp -r ${top_dir}/src/connector/go ${pkg_dir}${install_home_path}/connector #cp -r ${top_dir}/src/connector/go ${pkg_dir}${install_home_path}/connector
......
...@@ -77,6 +77,7 @@ cp %{_compiledir}/build/lib/${libfile} %{buildroot}%{homepath}/driv ...@@ -77,6 +77,7 @@ cp %{_compiledir}/build/lib/${libfile} %{buildroot}%{homepath}/driv
cp %{_compiledir}/../include/client/taos.h %{buildroot}%{homepath}/include cp %{_compiledir}/../include/client/taos.h %{buildroot}%{homepath}/include
cp %{_compiledir}/../include/common/taosdef.h %{buildroot}%{homepath}/include cp %{_compiledir}/../include/common/taosdef.h %{buildroot}%{homepath}/include
cp %{_compiledir}/../include/util/taoserror.h %{buildroot}%{homepath}/include cp %{_compiledir}/../include/util/taoserror.h %{buildroot}%{homepath}/include
cp %{_compiledir}/../include/libs/function/taosudf.h %{buildroot}%{homepath}/include
#cp -r %{_compiledir}/../src/connector/python %{buildroot}%{homepath}/connector #cp -r %{_compiledir}/../src/connector/python %{buildroot}%{homepath}/connector
#cp -r %{_compiledir}/../src/connector/go %{buildroot}%{homepath}/connector #cp -r %{_compiledir}/../src/connector/go %{buildroot}%{homepath}/connector
#cp -r %{_compiledir}/../src/connector/nodejs %{buildroot}%{homepath}/connector #cp -r %{_compiledir}/../src/connector/nodejs %{buildroot}%{homepath}/connector
...@@ -201,6 +202,7 @@ if [ $1 -eq 0 ];then ...@@ -201,6 +202,7 @@ if [ $1 -eq 0 ];then
${csudo}rm -f ${inc_link_dir}/taos.h || : ${csudo}rm -f ${inc_link_dir}/taos.h || :
${csudo}rm -f ${inc_link_dir}/taosdef.h || : ${csudo}rm -f ${inc_link_dir}/taosdef.h || :
${csudo}rm -f ${inc_link_dir}/taoserror.h || : ${csudo}rm -f ${inc_link_dir}/taoserror.h || :
${csudo}rm -f ${inc_link_dir}/taosudf.h || :
${csudo}rm -f ${lib_link_dir}/libtaos.* || : ${csudo}rm -f ${lib_link_dir}/libtaos.* || :
${csudo}rm -f ${log_link_dir} || : ${csudo}rm -f ${log_link_dir} || :
......
...@@ -314,11 +314,12 @@ function install_jemalloc() { ...@@ -314,11 +314,12 @@ function install_jemalloc() {
} }
function install_header() { function install_header() {
${csudo}rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taosdef.h ${inc_link_dir}/taoserror.h || : ${csudo}rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taosdef.h ${inc_link_dir}/taoserror.h ${inc_link_dir}/taosudf.h || :
${csudo}cp -f ${script_dir}/inc/* ${install_main_dir}/include && ${csudo}chmod 644 ${install_main_dir}/include/* ${csudo}cp -f ${script_dir}/inc/* ${install_main_dir}/include && ${csudo}chmod 644 ${install_main_dir}/include/*
${csudo}ln -s ${install_main_dir}/include/taos.h ${inc_link_dir}/taos.h ${csudo}ln -s ${install_main_dir}/include/taos.h ${inc_link_dir}/taos.h
${csudo}ln -s ${install_main_dir}/include/taosdef.h ${inc_link_dir}/taosdef.h ${csudo}ln -s ${install_main_dir}/include/taosdef.h ${inc_link_dir}/taosdef.h
${csudo}ln -s ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h ${csudo}ln -s ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h
${csudo}ln -s ${install_main_dir}/include/taosudf.h ${inc_link_dir}/taosudf.h
} }
function add_newHostname_to_hosts() { function add_newHostname_to_hosts() {
......
...@@ -115,11 +115,12 @@ function install_bin() { ...@@ -115,11 +115,12 @@ function install_bin() {
} }
function install_header() { function install_header() {
${csudo}rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taosdef.h ${inc_link_dir}/taoserror.h || : ${csudo}rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taosdef.h ${inc_link_dir}/taoserror.h ${inc_link_dir}/taosudf.h || :
${csudo}cp -f ${script_dir}/inc/* ${install_main_dir}/include && ${csudo}chmod 644 ${install_main_dir}/include/* ${csudo}cp -f ${script_dir}/inc/* ${install_main_dir}/include && ${csudo}chmod 644 ${install_main_dir}/include/*
${csudo}ln -s ${install_main_dir}/include/taos.h ${inc_link_dir}/taos.h ${csudo}ln -s ${install_main_dir}/include/taos.h ${inc_link_dir}/taos.h
${csudo}ln -s ${install_main_dir}/include/taosdef.h ${inc_link_dir}/taosdef.h ${csudo}ln -s ${install_main_dir}/include/taosdef.h ${inc_link_dir}/taosdef.h
${csudo}ln -s ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h ${csudo}ln -s ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h
${csudo}ln -s ${install_main_dir}/include/taosudf.h ${inc_link_dir}/taosudf.h
} }
function install_jemalloc() { function install_jemalloc() {
......
...@@ -148,11 +148,12 @@ function install_lib() { ...@@ -148,11 +148,12 @@ function install_lib() {
} }
function install_header() { function install_header() {
${csudo}rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taosdef.h ${inc_link_dir}/taoserror.h || : ${csudo}rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taosdef.h ${inc_link_dir}/taoserror.h ${inc_link_dir}/taosudf.h || :
${csudo}cp -f ${script_dir}/inc/* ${install_main_dir}/include && ${csudo}chmod 644 ${install_main_dir}/include/* ${csudo}cp -f ${script_dir}/inc/* ${install_main_dir}/include && ${csudo}chmod 644 ${install_main_dir}/include/*
${csudo}ln -s ${install_main_dir}/include/taos.h ${inc_link_dir}/taos.h ${csudo}ln -s ${install_main_dir}/include/taos.h ${inc_link_dir}/taos.h
${csudo}ln -s ${install_main_dir}/include/taosdef.h ${inc_link_dir}/taosdef.h ${csudo}ln -s ${install_main_dir}/include/taosdef.h ${inc_link_dir}/taosdef.h
${csudo}ln -s ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h ${csudo}ln -s ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h
${csudo}ln -s ${install_main_dir}/include/taosudf.h ${inc_link_dir}/taosudf.h
} }
function install_jemalloc() { function install_jemalloc() {
......
...@@ -349,16 +349,17 @@ function install_lib() { ...@@ -349,16 +349,17 @@ function install_lib() {
function install_header() { function install_header() {
if [ "$osType" != "Darwin" ]; then if [ "$osType" != "Darwin" ]; then
${csudo}rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taosdef.h ${inc_link_dir}/taoserror.h || : ${csudo}rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taosdef.h ${inc_link_dir}/taoserror.h ${inc_link_dir}/taosudf.h || :
${csudo}cp -f ${source_dir}/include/client/taos.h ${source_dir}/include/common/taosdef.h ${source_dir}/include/util/taoserror.h \ ${csudo}cp -f ${source_dir}/include/client/taos.h ${source_dir}/include/common/taosdef.h ${source_dir}/include/util/taoserror.h ${source_dir}/include/libs/function/taosudf.h \
${install_main_dir}/include && ${csudo}chmod 644 ${install_main_dir}/include/* ${install_main_dir}/include && ${csudo}chmod 644 ${install_main_dir}/include/*
${csudo}ln -s ${install_main_dir}/include/taos.h ${inc_link_dir}/taos.h ${csudo}ln -s ${install_main_dir}/include/taos.h ${inc_link_dir}/taos.h
${csudo}ln -s ${install_main_dir}/include/taosdef.h ${inc_link_dir}/taosdef.h ${csudo}ln -s ${install_main_dir}/include/taosdef.h ${inc_link_dir}/taosdef.h
${csudo}ln -s ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h ${csudo}ln -s ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h
${csudo}ln -s ${install_main_dir}/include/taosudf.h ${inc_link_dir}/taosudf.h
else else
${csudo}cp -f ${source_dir}/include/client/taos.h ${source_dir}/include/common/taosdef.h ${source_dir}/include/util/taoserror.h \ ${csudo}cp -f ${source_dir}/include/client/taos.h ${source_dir}/include/common/taosdef.h ${source_dir}/include/util/taoserror.h ${source_dir}/include/libs/function/taosudf.h \
${install_main_dir}/include || ${install_main_dir}/include ||
${csudo}cp -f ${source_dir}/include/client/taos.h ${source_dir}/include/common/taosdef.h ${source_dir}/include/util/taoserror.h \ ${csudo}cp -f ${source_dir}/include/client/taos.h ${source_dir}/include/common/taosdef.h ${source_dir}/include/util/taoserror.h ${source_dir}/include/libs/function/taosudf.h \
${install_main_2_dir}/include && ${install_main_2_dir}/include &&
${csudo}chmod 644 ${install_main_dir}/include/* || ${csudo}chmod 644 ${install_main_dir}/include/* ||
${csudo}chmod 644 ${install_main_2_dir}/include/* ${csudo}chmod 644 ${install_main_2_dir}/include/*
......
...@@ -36,7 +36,7 @@ fi ...@@ -36,7 +36,7 @@ fi
bin_files="${build_dir}/bin/tarbitrator ${script_dir}/remove_arbi.sh" bin_files="${build_dir}/bin/tarbitrator ${script_dir}/remove_arbi.sh"
install_files="${script_dir}/install_arbi.sh" install_files="${script_dir}/install_arbi.sh"
#header_files="${code_dir}/include/client/taos.h ${code_dir}/include/common/taosdef.h ${code_dir}/include/util/taoserror.h" #header_files="${code_dir}/include/client/taos.h ${code_dir}/include/common/taosdef.h ${code_dir}/include/util/taoserror.h ${code_dir}/include/libs/function/taosudf.h"
init_file_tarbitrator_deb=${script_dir}/../deb/tarbitratord init_file_tarbitrator_deb=${script_dir}/../deb/tarbitratord
init_file_tarbitrator_rpm=${script_dir}/../rpm/tarbitratord init_file_tarbitrator_rpm=${script_dir}/../rpm/tarbitratord
......
...@@ -62,7 +62,7 @@ else ...@@ -62,7 +62,7 @@ else
lib_files="${build_dir}/lib/libtaos.${version}.dylib" lib_files="${build_dir}/lib/libtaos.${version}.dylib"
fi fi
header_files="${code_dir}/include/client/taos.h ${code_dir}/include/common/taosdef.h ${code_dir}/include/util/taoserror.h" header_files="${code_dir}/include/client/taos.h ${code_dir}/include/common/taosdef.h ${code_dir}/include/util/taoserror.h ${code_dir}/include/libs/function/taosudf.h"
if [ "$dbName" != "taos" ]; then if [ "$dbName" != "taos" ]; then
cfg_dir="${top_dir}/../enterprise/packaging/cfg" cfg_dir="${top_dir}/../enterprise/packaging/cfg"
else else
......
...@@ -93,7 +93,7 @@ else ...@@ -93,7 +93,7 @@ else
fi fi
lib_files="${build_dir}/lib/libtaos.so.${version}" lib_files="${build_dir}/lib/libtaos.so.${version}"
header_files="${code_dir}/include/client/taos.h ${code_dir}/include/common/taosdef.h ${code_dir}/include/util/taoserror.h" header_files="${code_dir}/include/client/taos.h ${code_dir}/include/common/taosdef.h ${code_dir}/include/util/taoserror.h ${code_dir}/include/libs/function/taosudf.h"
if [ "$dbName" != "taos" ]; then if [ "$dbName" != "taos" ]; then
cfg_dir="${top_dir}/../enterprise/packaging/cfg" cfg_dir="${top_dir}/../enterprise/packaging/cfg"
......
...@@ -81,10 +81,11 @@ function kill_taosd() { ...@@ -81,10 +81,11 @@ function kill_taosd() {
} }
function install_include() { function install_include() {
${csudo}rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taosdef.h ${inc_link_dir}/taoserror.h|| : ${csudo}rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taosdef.h ${inc_link_dir}/taoserror.h ${inc_link_dir}/taosudf.h || :
${csudo}ln -s ${inc_dir}/taos.h ${inc_link_dir}/taos.h ${csudo}ln -s ${inc_dir}/taos.h ${inc_link_dir}/taos.h
${csudo}ln -s ${inc_dir}/taosdef.h ${inc_link_dir}/taosdef.h ${csudo}ln -s ${inc_dir}/taosdef.h ${inc_link_dir}/taosdef.h
${csudo}ln -s ${inc_dir}/taoserror.h ${inc_link_dir}/taoserror.h ${csudo}ln -s ${inc_dir}/taoserror.h ${inc_link_dir}/taoserror.h
${csudo}ln -s ${inc_dir}/taosudf.h ${inc_link_dir}/taosudf.h
} }
function install_lib() { function install_lib() {
......
...@@ -128,6 +128,7 @@ ${csudo}rm -f ${cfg_link_dir}/*.new || : ...@@ -128,6 +128,7 @@ ${csudo}rm -f ${cfg_link_dir}/*.new || :
${csudo}rm -f ${inc_link_dir}/taos.h || : ${csudo}rm -f ${inc_link_dir}/taos.h || :
${csudo}rm -f ${inc_link_dir}/taosdef.h || : ${csudo}rm -f ${inc_link_dir}/taosdef.h || :
${csudo}rm -f ${inc_link_dir}/taoserror.h || : ${csudo}rm -f ${inc_link_dir}/taoserror.h || :
${csudo}rm -f ${inc_link_dir}/taosudf.h || :
${csudo}rm -f ${lib_link_dir}/libtaos.* || : ${csudo}rm -f ${lib_link_dir}/libtaos.* || :
${csudo}rm -f ${lib64_link_dir}/libtaos.* || : ${csudo}rm -f ${lib64_link_dir}/libtaos.* || :
......
...@@ -84,6 +84,7 @@ function clean_header() { ...@@ -84,6 +84,7 @@ function clean_header() {
${csudo} rm -f ${inc_link_dir}/taos.h || : ${csudo} rm -f ${inc_link_dir}/taos.h || :
${csudo} rm -f ${inc_link_dir}/taosdef.h || : ${csudo} rm -f ${inc_link_dir}/taosdef.h || :
${csudo} rm -f ${inc_link_dir}/taoserror.h || : ${csudo} rm -f ${inc_link_dir}/taoserror.h || :
${csudo} rm -f ${inc_link_dir}/taosudf.h || :
} }
function clean_config() { function clean_config() {
......
...@@ -59,6 +59,8 @@ function clean_header() { ...@@ -59,6 +59,8 @@ function clean_header() {
${csudo}rm -f ${inc_link_dir}/taos.h || : ${csudo}rm -f ${inc_link_dir}/taos.h || :
${csudo}rm -f ${inc_link_dir}/taosdef.h || : ${csudo}rm -f ${inc_link_dir}/taosdef.h || :
${csudo}rm -f ${inc_link_dir}/taoserror.h || : ${csudo}rm -f ${inc_link_dir}/taoserror.h || :
${csudo}rm -f ${inc_link_dir}/taosudf.h || :
} }
function clean_log() { function clean_log() {
......
...@@ -54,6 +54,7 @@ function clean_header() { ...@@ -54,6 +54,7 @@ function clean_header() {
${csudo}rm -f ${inc_link_dir}/taos.h || : ${csudo}rm -f ${inc_link_dir}/taos.h || :
${csudo}rm -f ${inc_link_dir}/taosdef.h || : ${csudo}rm -f ${inc_link_dir}/taosdef.h || :
${csudo}rm -f ${inc_link_dir}/taoserror.h || : ${csudo}rm -f ${inc_link_dir}/taoserror.h || :
${csudo}rm -f ${inc_link_dir}/taosudf.h || :
} }
function clean_config() { function clean_config() {
......
...@@ -15,7 +15,6 @@ ...@@ -15,7 +15,6 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndAcct.h" #include "mndAcct.h"
#include "mndPrivilege.h"
#include "mndBnode.h" #include "mndBnode.h"
#include "mndCluster.h" #include "mndCluster.h"
#include "mndConsumer.h" #include "mndConsumer.h"
...@@ -27,6 +26,7 @@ ...@@ -27,6 +26,7 @@
#include "mndMnode.h" #include "mndMnode.h"
#include "mndOffset.h" #include "mndOffset.h"
#include "mndPerfSchema.h" #include "mndPerfSchema.h"
#include "mndPrivilege.h"
#include "mndProfile.h" #include "mndProfile.h"
#include "mndQnode.h" #include "mndQnode.h"
#include "mndQuery.h" #include "mndQuery.h"
...@@ -416,7 +416,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { ...@@ -416,7 +416,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
char *syncNodeStr = sync2SimpleStr(pMgmt->sync); char *syncNodeStr = sync2SimpleStr(pMgmt->sync);
static int64_t mndTick = 0; static int64_t mndTick = 0;
if (++mndTick % 10 == 1) { if (++mndTick % 10 == 1) {
mTrace("vgId:%d, sync heartbeat msg:%s, %s", syncGetVgId(pMgmt->sync), TMSG_INFO(pMsg->msgType), syncNodeStr); mTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pMgmt->sync), TMSG_INFO(pMsg->msgType), syncNodeStr);
} }
if (gRaftDetailLog) { if (gRaftDetailLog) {
char logBuf[512] = {0}; char logBuf[512] = {0};
......
...@@ -205,16 +205,16 @@ struct STFile { ...@@ -205,16 +205,16 @@ struct STFile {
uint8_t state; uint8_t state;
}; };
#define TD_FILE_F(tf) (&((tf)->f)) #define TD_TFILE_F(tf) (&((tf)->f))
#define TD_FILE_PFILE(tf) ((tf)->pFile) #define TD_TFILE_PFILE(tf) ((tf)->pFile)
#define TD_FILE_OPENED(tf) (TD_FILE_PFILE(tf) != NULL) #define TD_TFILE_OPENED(tf) (TD_TFILE_PFILE(tf) != NULL)
#define TD_FILE_FULL_NAME(tf) (TD_FILE_F(tf)->aname) #define TD_TFILE_FULL_NAME(tf) (TD_TFILE_F(tf)->aname)
#define TD_FILE_REL_NAME(tf) (TD_FILE_F(tf)->rname) #define TD_TFILE_REL_NAME(tf) (TD_TFILE_F(tf)->rname)
#define TD_FILE_OPENED(tf) (TD_FILE_PFILE(tf) != NULL) #define TD_TFILE_OPENED(tf) (TD_TFILE_PFILE(tf) != NULL)
#define TD_FILE_CLOSED(tf) (!TD_FILE_OPENED(tf)) #define TD_TFILE_CLOSED(tf) (!TD_TFILE_OPENED(tf))
#define TD_FILE_SET_CLOSED(f) (TD_FILE_PFILE(f) = NULL) #define TD_TFILE_SET_CLOSED(f) (TD_TFILE_PFILE(f) = NULL)
#define TD_FILE_SET_STATE(tf, s) ((tf)->state = (s)) #define TD_TFILE_SET_STATE(tf, s) ((tf)->state = (s))
#define TD_FILE_DID(tf) (TD_FILE_F(tf)->did) #define TD_TFILE_DID(tf) (TD_TFILE_F(tf)->did)
int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname); int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname);
int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fType); int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fType);
......
...@@ -64,6 +64,7 @@ typedef struct STsdbSnapshotReader STsdbSnapshotReader; ...@@ -64,6 +64,7 @@ typedef struct STsdbSnapshotReader STsdbSnapshotReader;
#define VNODE_TQ_DIR "tq" #define VNODE_TQ_DIR "tq"
#define VNODE_WAL_DIR "wal" #define VNODE_WAL_DIR "wal"
#define VNODE_TSMA_DIR "tsma" #define VNODE_TSMA_DIR "tsma"
#define VNODE_RSMA_DIR "rsma"
#define VNODE_RSMA0_DIR "tsdb" #define VNODE_RSMA0_DIR "tsdb"
#define VNODE_RSMA1_DIR "rsma1" #define VNODE_RSMA1_DIR "rsma1"
#define VNODE_RSMA2_DIR "rsma2" #define VNODE_RSMA2_DIR "rsma2"
...@@ -161,7 +162,6 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool ...@@ -161,7 +162,6 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool
// sma // sma
int32_t smaOpen(SVnode* pVnode); int32_t smaOpen(SVnode* pVnode);
int32_t smaClose(SSma* pSma);
int32_t smaCloseEnv(SSma* pSma); int32_t smaCloseEnv(SSma* pSma);
int32_t smaCloseEx(SSma* pSma); int32_t smaCloseEx(SSma* pSma);
......
...@@ -123,7 +123,7 @@ int32_t smaOpen(SVnode *pVnode) { ...@@ -123,7 +123,7 @@ int32_t smaOpen(SVnode *pVnode) {
} }
// restore the rsma // restore the rsma
#if 0 #if 1
if (rsmaRestore(pSma) < 0) { if (rsmaRestore(pSma) < 0) {
goto _err; goto _err;
} }
...@@ -154,12 +154,6 @@ int32_t smaCloseEx(SSma *pSma) { ...@@ -154,12 +154,6 @@ int32_t smaCloseEx(SSma *pSma) {
return 0; return 0;
} }
int32_t smaClose(SSma *pSma) {
smaCloseEnv(pSma);
smaCloseEx(pSma);
return 0;
}
/** /**
* @brief rsma env restore * @brief rsma env restore
* *
......
...@@ -22,7 +22,6 @@ ...@@ -22,7 +22,6 @@
#define TD_FILE_INIT_MAGIC 0xFFFFFFFF #define TD_FILE_INIT_MAGIC 0xFFFFFFFF
static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo); static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo);
static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo); static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo);
...@@ -46,7 +45,7 @@ static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo) { ...@@ -46,7 +45,7 @@ static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo) {
} }
int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte) { int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte) {
ASSERT(TD_FILE_OPENED(pTFile)); ASSERT(TD_TFILE_OPENED(pTFile));
int64_t nwrite = taosWriteFile(pTFile->pFile, buf, nbyte); int64_t nwrite = taosWriteFile(pTFile->pFile, buf, nbyte);
if (nwrite < nbyte) { if (nwrite < nbyte) {
...@@ -58,9 +57,9 @@ int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte) { ...@@ -58,9 +57,9 @@ int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte) {
} }
int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence) { int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence) {
ASSERT(TD_FILE_OPENED(pTFile)); ASSERT(TD_TFILE_OPENED(pTFile));
int64_t loffset = taosLSeekFile(TD_FILE_PFILE(pTFile), offset, whence); int64_t loffset = taosLSeekFile(TD_TFILE_PFILE(pTFile), offset, whence);
if (loffset < 0) { if (loffset < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
...@@ -70,12 +69,12 @@ int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence) { ...@@ -70,12 +69,12 @@ int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence) {
} }
int64_t tdGetTFileSize(STFile *pTFile, int64_t *size) { int64_t tdGetTFileSize(STFile *pTFile, int64_t *size) {
ASSERT(TD_FILE_OPENED(pTFile)); ASSERT(TD_TFILE_OPENED(pTFile));
return taosFStatFile(pTFile->pFile, size, NULL); return taosFStatFile(pTFile->pFile, size, NULL);
} }
int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte) { int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte) {
ASSERT(TD_FILE_OPENED(pTFile)); ASSERT(TD_TFILE_OPENED(pTFile));
int64_t nread = taosReadFile(pTFile->pFile, buf, nbyte); int64_t nread = taosReadFile(pTFile->pFile, buf, nbyte);
if (nread < 0) { if (nread < 0) {
...@@ -108,7 +107,7 @@ int32_t tdLoadTFileHeader(STFile *pTFile, STFInfo *pInfo) { ...@@ -108,7 +107,7 @@ int32_t tdLoadTFileHeader(STFile *pTFile, STFInfo *pInfo) {
char buf[TD_FILE_HEAD_SIZE] = "\0"; char buf[TD_FILE_HEAD_SIZE] = "\0";
uint32_t _version; uint32_t _version;
ASSERT(TD_FILE_OPENED(pTFile)); ASSERT(TD_TFILE_OPENED(pTFile));
if (tdSeekTFile(pTFile, 0, SEEK_SET) < 0) { if (tdSeekTFile(pTFile, 0, SEEK_SET) < 0) {
return -1; return -1;
...@@ -133,7 +132,7 @@ void tdUpdateTFileMagic(STFile *pTFile, void *pCksm) { ...@@ -133,7 +132,7 @@ void tdUpdateTFileMagic(STFile *pTFile, void *pCksm) {
} }
int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset) { int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset) {
ASSERT(TD_FILE_OPENED(pTFile)); ASSERT(TD_TFILE_OPENED(pTFile));
int64_t toffset; int64_t toffset;
...@@ -141,6 +140,11 @@ int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset) ...@@ -141,6 +140,11 @@ int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset)
return -1; return -1;
} }
#if 1
smaDebug("append to file %s, offset:%" PRIi64 " + nbyte:%" PRIi64 " =%" PRIi64, TD_TFILE_FULL_NAME(pTFile), toffset,
nbyte, toffset + nbyte);
#endif
ASSERT(pTFile->info.fsize == toffset); ASSERT(pTFile->info.fsize == toffset);
if (offset) { if (offset) {
...@@ -157,9 +161,9 @@ int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset) ...@@ -157,9 +161,9 @@ int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset)
} }
int32_t tdOpenTFile(STFile *pTFile, int flags) { int32_t tdOpenTFile(STFile *pTFile, int flags) {
ASSERT(!TD_FILE_OPENED(pTFile)); ASSERT(!TD_TFILE_OPENED(pTFile));
pTFile->pFile = taosOpenFile(TD_FILE_FULL_NAME(pTFile), flags); pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), flags);
if (pTFile->pFile == NULL) { if (pTFile->pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
...@@ -169,9 +173,9 @@ int32_t tdOpenTFile(STFile *pTFile, int flags) { ...@@ -169,9 +173,9 @@ int32_t tdOpenTFile(STFile *pTFile, int flags) {
} }
void tdCloseTFile(STFile *pTFile) { void tdCloseTFile(STFile *pTFile) {
if (TD_FILE_OPENED(pTFile)) { if (TD_TFILE_OPENED(pTFile)) {
taosCloseFile(&pTFile->pFile); taosCloseFile(&pTFile->pFile);
TD_FILE_SET_CLOSED(pTFile); TD_TFILE_SET_CLOSED(pTFile);
} }
} }
...@@ -183,8 +187,8 @@ int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname) { ...@@ -183,8 +187,8 @@ int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname) {
char fullname[TSDB_FILENAME_LEN]; char fullname[TSDB_FILENAME_LEN];
SDiskID did = {0}; SDiskID did = {0};
TD_FILE_SET_STATE(pTFile, TD_FILE_STATE_OK); TD_TFILE_SET_STATE(pTFile, TD_FILE_STATE_OK);
TD_FILE_SET_CLOSED(pTFile); TD_TFILE_SET_CLOSED(pTFile);
memset(&(pTFile->info), 0, sizeof(pTFile->info)); memset(&(pTFile->info), 0, sizeof(pTFile->info));
pTFile->info.magic = TD_FILE_INIT_MAGIC; pTFile->info.magic = TD_FILE_INIT_MAGIC;
...@@ -202,18 +206,18 @@ int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname) { ...@@ -202,18 +206,18 @@ int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname) {
int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fType) { int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fType) {
ASSERT(pTFile->info.fsize == 0 && pTFile->info.magic == TD_FILE_INIT_MAGIC); ASSERT(pTFile->info.fsize == 0 && pTFile->info.magic == TD_FILE_INIT_MAGIC);
pTFile->pFile = taosOpenFile(TD_FILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pTFile->pFile == NULL) { if (pTFile->pFile == NULL) {
if (errno == ENOENT) { if (errno == ENOENT) {
// Try to create directory recursively // Try to create directory recursively
char *s = strdup(TD_FILE_REL_NAME(pTFile)); char *s = strdup(TD_TFILE_REL_NAME(pTFile));
if (tfsMkdirRecurAt(pTfs, taosDirName(s), TD_FILE_DID(pTFile)) < 0) { if (tfsMkdirRecurAt(pTfs, taosDirName(s), TD_TFILE_DID(pTFile)) < 0) {
taosMemoryFreeClear(s); taosMemoryFreeClear(s);
return -1; return -1;
} }
taosMemoryFreeClear(s); taosMemoryFreeClear(s);
pTFile->pFile = taosOpenFile(TD_FILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pTFile->pFile == NULL) { if (pTFile->pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
...@@ -240,7 +244,7 @@ int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fTyp ...@@ -240,7 +244,7 @@ int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fTyp
return 0; return 0;
} }
int32_t tdRemoveTFile(STFile *pTFile) { return tfsRemoveFile(TD_FILE_F(pTFile)); } int32_t tdRemoveTFile(STFile *pTFile) { return tfsRemoveFile(TD_TFILE_F(pTFile)); }
// smaXXXUtil ================ // smaXXXUtil ================
// ... // ...
\ No newline at end of file
...@@ -152,14 +152,14 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -152,14 +152,14 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
return pVnode; return pVnode;
_err: _err:
if (pVnode->pSma) smaClose(pVnode->pSma); if (pVnode->pSma) smaCloseEnv(pVnode->pSma);
if (pVnode->pQuery) vnodeQueryClose(pVnode); if (pVnode->pQuery) vnodeQueryClose(pVnode);
if (pVnode->pTq) tqClose(pVnode->pTq); if (pVnode->pTq) tqClose(pVnode->pTq);
if (pVnode->pWal) walClose(pVnode->pWal); if (pVnode->pWal) walClose(pVnode->pWal);
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb); if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
if (pVnode->pSma) smaCloseEx(pVnode->pSma);
if (pVnode->pMeta) metaClose(pVnode->pMeta); if (pVnode->pMeta) metaClose(pVnode->pMeta);
tsem_destroy(&(pVnode->canCommit)); tsem_destroy(&(pVnode->canCommit));
taosMemoryFree(pVnode); taosMemoryFree(pVnode);
return NULL; return NULL;
......
...@@ -243,7 +243,7 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -243,7 +243,7 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
char *syncNodeStr = sync2SimpleStr(pVnode->sync); char *syncNodeStr = sync2SimpleStr(pVnode->sync);
static int64_t vndTick = 0; static int64_t vndTick = 0;
if (++vndTick % 10 == 1) { if (++vndTick % 10 == 1) {
vGTrace("vgId:%d, sync heartbeat msg:%s, %s", syncGetVgId(pVnode->sync), TMSG_INFO(pMsg->msgType), syncNodeStr); vGTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pVnode->sync), TMSG_INFO(pMsg->msgType), syncNodeStr);
} }
if (gRaftDetailLog) { if (gRaftDetailLog) {
char logBuf[512] = {0}; char logBuf[512] = {0};
......
...@@ -273,8 +273,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR ...@@ -273,8 +273,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
} }
// 1. close current opened time window // 1. close current opened time window
if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId && if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
pResult->offset != pResultRowInfo->cur.offset))) {
SResultRowPosition pos = pResultRowInfo->cur; SResultRowPosition pos = pResultRowInfo->cur;
SFilePage* pPage = getBufPage(pResultBuf, pos.pageId); SFilePage* pPage = getBufPage(pResultBuf, pos.pageId);
releaseBufPage(pResultBuf, pPage); releaseBufPage(pResultBuf, pPage);
......
...@@ -191,6 +191,11 @@ bool getUniqueFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); ...@@ -191,6 +191,11 @@ bool getUniqueFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool uniqueFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); bool uniqueFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t uniqueFunction(SqlFunctionCtx *pCtx); int32_t uniqueFunction(SqlFunctionCtx *pCtx);
bool getModeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool modeFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t modeFunction(SqlFunctionCtx *pCtx);
int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool twaFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); bool twaFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t twaFunction(SqlFunctionCtx *pCtx); int32_t twaFunction(SqlFunctionCtx *pCtx);
......
...@@ -1045,20 +1045,28 @@ static int32_t translateFirstLastMerge(SFunctionNode* pFunc, char* pErrBuf, int3 ...@@ -1045,20 +1045,28 @@ static int32_t translateFirstLastMerge(SFunctionNode* pFunc, char* pErrBuf, int3
return translateFirstLastImpl(pFunc, pErrBuf, len, false); return translateFirstLastImpl(pFunc, pErrBuf, len, false);
} }
static int32_t translateUnique(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { static int32_t translateUniqueMode(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isUnique) {
if (1 != LIST_LENGTH(pFunc->pParameterList)) { if (1 != LIST_LENGTH(pFunc->pParameterList)) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
} }
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0); SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
if (!nodesExprHasColumn(pPara)) { if (!nodesExprHasColumn(pPara)) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "The parameters of UNIQUE must contain columns"); return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "The parameters of %s must contain columns", isUnique ? "UNIQUE" : "MODE");
} }
pFunc->node.resType = ((SExprNode*)pPara)->resType; pFunc->node.resType = ((SExprNode*)pPara)->resType;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t translateUnique(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateUniqueMode(pFunc, pErrBuf, len, true);
}
static int32_t translateMode(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateUniqueMode(pFunc, pErrBuf, len, false);
}
static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
if (numOfParams == 0 || numOfParams > 2) { if (numOfParams == 0 || numOfParams > 2) {
...@@ -2109,7 +2117,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -2109,7 +2117,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "unique", .name = "unique",
.type = FUNCTION_TYPE_UNIQUE, .type = FUNCTION_TYPE_UNIQUE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC |
FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_WINDOW_FUNC | FUNC_MGT_FORBID_GROUP_BY_FUNC, FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_WINDOW_FUNC | FUNC_MGT_FORBID_GROUP_BY_FUNC,
.translateFunc = translateUnique, .translateFunc = translateUnique,
.getEnvFunc = getUniqueFuncEnv, .getEnvFunc = getUniqueFuncEnv,
...@@ -2117,6 +2125,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -2117,6 +2125,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = uniqueFunction, .processFunc = uniqueFunction,
.finalizeFunc = NULL .finalizeFunc = NULL
}, },
{
.name = "mode",
.type = FUNCTION_TYPE_MODE,
.classification = FUNC_MGT_AGG_FUNC,
.translateFunc = translateMode,
.getEnvFunc = getModeFuncEnv,
.initFunc = modeFunctionSetup,
.processFunc = modeFunction,
.finalizeFunc = modeFinalize,
},
{ {
.name = "abs", .name = "abs",
.type = FUNCTION_TYPE_ABS, .type = FUNCTION_TYPE_ABS,
......
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
#define TAIL_MAX_OFFSET 100 #define TAIL_MAX_OFFSET 100
#define UNIQUE_MAX_RESULT_SIZE (1024 * 1024 * 10) #define UNIQUE_MAX_RESULT_SIZE (1024 * 1024 * 10)
#define MODE_MAX_RESULT_SIZE UNIQUE_MAX_RESULT_SIZE
#define HLL_BUCKET_BITS 14 // The bits of the bucket #define HLL_BUCKET_BITS 14 // The bits of the bucket
#define HLL_DATA_BITS (64 - HLL_BUCKET_BITS) #define HLL_DATA_BITS (64 - HLL_BUCKET_BITS)
...@@ -246,6 +247,19 @@ typedef struct SUniqueInfo { ...@@ -246,6 +247,19 @@ typedef struct SUniqueInfo {
char pItems[]; char pItems[];
} SUniqueInfo; } SUniqueInfo;
typedef struct SModeItem {
int64_t count;
char data[];
} SModeItem;
typedef struct SModeInfo {
int32_t numOfPoints;
uint8_t colType;
int16_t colBytes;
SHashObj* pHash;
char pItems[];
} SModeInfo;
typedef struct SDerivInfo { typedef struct SDerivInfo {
double prevValue; // previous value double prevValue; // previous value
TSKEY prevTs; // previous timestamp TSKEY prevTs; // previous timestamp
...@@ -4694,21 +4708,100 @@ int32_t uniqueFunction(SqlFunctionCtx* pCtx) { ...@@ -4694,21 +4708,100 @@ int32_t uniqueFunction(SqlFunctionCtx* pCtx) {
return pInfo->numOfPoints; return pInfo->numOfPoints;
} }
int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { bool getModeFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SModeInfo) + MODE_MAX_RESULT_SIZE;
return true;
}
bool modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
if (!functionSetup(pCtx, pResInfo)) {
return false;
}
SModeInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
pInfo->numOfPoints = 0;
pInfo->colType = pCtx->resDataInfo.type;
pInfo->colBytes = pCtx->resDataInfo.bytes;
if (pInfo->pHash != NULL) {
taosHashClear(pInfo->pHash);
} else {
pInfo->pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
}
return true;
}
static void doModeAdd(SModeInfo* pInfo, char* data, bool isNull) {
// ignore null elements
if (isNull) {
return;
}
int32_t hashKeyBytes = IS_VAR_DATA_TYPE(pInfo->colType) ? varDataTLen(data) : pInfo->colBytes;
SModeItem** pHashItem = taosHashGet(pInfo->pHash, data, hashKeyBytes);
if (pHashItem == NULL) {
int32_t size = sizeof(SModeItem) + pInfo->colBytes;
SModeItem* pItem = (SModeItem*)(pInfo->pItems + pInfo->numOfPoints * size);
memcpy(pItem->data, data, pInfo->colBytes);
pItem->count += 1;
taosHashPut(pInfo->pHash, data, hashKeyBytes, &pItem, sizeof(SModeItem*));
pInfo->numOfPoints++;
} else {
(*pHashItem)->count += 1;
}
}
int32_t modeFunction(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); SModeInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
int32_t startOffset = pCtx->offset;
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
char* data = colDataGetData(pInputCol, i);
doModeAdd(pInfo, data, colDataIsNull_s(pInputCol, i));
if (sizeof(SModeInfo) + pInfo->numOfPoints * (sizeof(SModeItem) + pInfo->colBytes) >= MODE_MAX_RESULT_SIZE) {
taosHashCleanup(pInfo->pHash);
return TSDB_CODE_OUT_OF_MEMORY;
}
}
SET_VAL(pResInfo, 1, 1);
return TSDB_CODE_SUCCESS;
}
int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SModeInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
int32_t slotId = pCtx->pExpr->base.resSchema.slotId; int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
int32_t currentRow = pBlock->info.rows;
for (int32_t i = 0; i < pResInfo->numOfRes; ++i) { int32_t resIndex;
SUniqueItem* pItem = (SUniqueItem*)(pInfo->pItems + i * (sizeof(SUniqueItem) + pInfo->colBytes)); int32_t maxCount = 0;
colDataAppend(pCol, i, pItem->data, false); for (int32_t i = 0; i < pInfo->numOfPoints; ++i) {
// TODO: handle ts output SModeItem* pItem = (SModeItem*)(pInfo->pItems + i * (sizeof(SModeItem) + pInfo->colBytes));
if (pItem->count > maxCount) {
maxCount = pItem->count;
resIndex = i;
} else if (pItem->count == maxCount) {
resIndex = -1;
}
} }
SModeItem* pResItem = (SModeItem*)(pInfo->pItems + resIndex * (sizeof(SModeItem) + pInfo->colBytes));
colDataAppend(pCol, currentRow, pResItem->data, (resIndex == -1) ? true : false);
return pResInfo->numOfRes; return pResInfo->numOfRes;
} }
bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) { bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(STwaInfo); pEnv->calcMemSize = sizeof(STwaInfo);
return true; return true;
......
...@@ -2,12 +2,8 @@ ...@@ -2,12 +2,8 @@
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <stdio.h>
#include "tudf.h" #include "taosudf.h"
#undef malloc
#define malloc malloc
#undef free
#define free free
DLL_EXPORT int32_t udf1_init() { DLL_EXPORT int32_t udf1_init() {
return 0; return 0;
......
#include <string.h> #include <string.h>
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <stdio.h>
#include <math.h>
#include "tudf.h" #include "taosudf.h"
#undef malloc
#define malloc malloc
#undef free
#define free free
DLL_EXPORT int32_t udf2_init() { DLL_EXPORT int32_t udf2_init() {
return 0; return 0;
......
...@@ -1360,6 +1360,83 @@ static int32_t eliminateSetOpOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLo ...@@ -1360,6 +1360,83 @@ static int32_t eliminateSetOpOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLo
return eliminateSetOpOptimizeImpl(pCxt, pLogicSubplan, pSetOpNode); return eliminateSetOpOptimizeImpl(pCxt, pLogicSubplan, pSetOpNode);
} }
//===================================================================================================================
// merge projects
static bool mergeProjectsMayBeOptimized(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren)) {
return false;
}
SLogicNode *pChild = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
if (QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pChild) || 1 < LIST_LENGTH(pChild->pChildren) ||
NULL != pChild->pConditions || NULL != pNode->pLimit || NULL != pNode->pSlimit) {
return false;
}
return true;
}
typedef struct SMergeProjectionsContext {
SProjectLogicNode* pChildProj;
int32_t errCode;
} SMergeProjectionsContext;
static EDealRes mergeProjectionsExpr(SNode** pNode, void* pContext) {
SMergeProjectionsContext* pCxt = pContext;
SProjectLogicNode* pChildProj = pCxt->pChildProj;
if (QUERY_NODE_COLUMN == nodeType(*pNode)) {
SNode* pTarget;
FOREACH(pTarget, ((SLogicNode*)pChildProj)->pTargets) {
if (nodesEqualNode(pTarget, *pNode)) {
SNode* pProjection;
FOREACH(pProjection, pChildProj->pProjections) {
if (0 == strcmp(((SColumnNode*)pTarget)->colName, ((SExprNode*)pProjection)->aliasName)) {
SNode* pExpr = nodesCloneNode(pProjection);
if (pExpr == NULL) {
pCxt->errCode = terrno;
return DEAL_RES_ERROR;
}
nodesDestroyNode(*pNode);
*pNode = pExpr;
}
}
}
}
return DEAL_RES_IGNORE_CHILD;
}
return DEAL_RES_CONTINUE;
}
static int32_t mergeProjectsOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SLogicNode* pSelfNode) {
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pSelfNode->pChildren, 0);
SMergeProjectionsContext cxt = {.pChildProj = (SProjectLogicNode*)pChild, .errCode = TSDB_CODE_SUCCESS};
nodesRewriteExprs(((SProjectLogicNode*)pSelfNode)->pProjections, mergeProjectionsExpr, &cxt);
int32_t code = cxt.errCode;
if (TSDB_CODE_SUCCESS == code) {
if (1 == LIST_LENGTH(pChild->pChildren)) {
SLogicNode* pGrandChild = (SLogicNode*)nodesListGetNode(pChild->pChildren, 0);
code = replaceLogicNode(pLogicSubplan, pChild, pGrandChild);
} else { // no grand child
NODES_CLEAR_LIST(pSelfNode->pChildren);
}
}
if (TSDB_CODE_SUCCESS == code) {
NODES_CLEAR_LIST(pChild->pChildren);
}
nodesDestroyNode((SNode*)pChild);
return code;
}
static int32_t mergeProjectsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
SLogicNode* pProjectNode = optFindPossibleNode(pLogicSubplan->pNode, mergeProjectsMayBeOptimized);
if (NULL == pProjectNode) {
return TSDB_CODE_SUCCESS;
}
return mergeProjectsOptimizeImpl(pCxt, pLogicSubplan, pProjectNode);
}
// clang-format off // clang-format off
static const SOptimizeRule optimizeRuleSet[] = { static const SOptimizeRule optimizeRuleSet[] = {
{.pName = "ScanPath", .optimizeFunc = scanPathOptimize}, {.pName = "ScanPath", .optimizeFunc = scanPathOptimize},
...@@ -1367,6 +1444,7 @@ static const SOptimizeRule optimizeRuleSet[] = { ...@@ -1367,6 +1444,7 @@ static const SOptimizeRule optimizeRuleSet[] = {
{.pName = "SortPrimaryKey", .optimizeFunc = sortPrimaryKeyOptimize}, {.pName = "SortPrimaryKey", .optimizeFunc = sortPrimaryKeyOptimize},
{.pName = "SmaIndex", .optimizeFunc = smaIndexOptimize}, {.pName = "SmaIndex", .optimizeFunc = smaIndexOptimize},
{.pName = "PartitionTags", .optimizeFunc = partTagsOptimize}, {.pName = "PartitionTags", .optimizeFunc = partTagsOptimize},
{.pName = "MergeProjects", .optimizeFunc = mergeProjectsOptimize},
{.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize}, {.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize},
{.pName = "EliminateSetOperator", .optimizeFunc = eliminateSetOpOptimize}, {.pName = "EliminateSetOperator", .optimizeFunc = eliminateSetOpOptimize},
{.pName = "RewriteTail", .optimizeFunc = rewriteTailOptimize} {.pName = "RewriteTail", .optimizeFunc = rewriteTailOptimize}
......
...@@ -57,7 +57,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI ...@@ -57,7 +57,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
void snapshotSenderDestroy(SSyncSnapshotSender *pSender); void snapshotSenderDestroy(SSyncSnapshotSender *pSender);
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender); bool snapshotSenderIsStart(SSyncSnapshotSender *pSender);
void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void *pReader); void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void *pReader);
void snapshotSenderStop(SSyncSnapshotSender *pSender); void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish);
int32_t snapshotSend(SSyncSnapshotSender *pSender); int32_t snapshotSend(SSyncSnapshotSender *pSender);
int32_t snapshotReSend(SSyncSnapshotSender *pSender); int32_t snapshotReSend(SSyncSnapshotSender *pSender);
...@@ -82,7 +82,7 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from ...@@ -82,7 +82,7 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg); void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg);
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver); bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply); void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
......
...@@ -240,7 +240,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ...@@ -240,7 +240,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId)); SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId));
ASSERT(pSender != NULL); ASSERT(pSender != NULL);
SSnapshot snapshot; SSnapshot snapshot = {.data = NULL,
.lastApplyIndex = SYNC_INDEX_INVALID,
.lastApplyTerm = 0,
.lastConfigIndex = SYNC_INDEX_INVALID};
void* pReader = NULL; void* pReader = NULL;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot, NULL, &pReader); ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot, NULL, &pReader);
if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN && nextIndex <= snapshot.lastApplyIndex + 1 && if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN && nextIndex <= snapshot.lastApplyIndex + 1 &&
...@@ -249,10 +252,6 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ...@@ -249,10 +252,6 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
ASSERT(pReader != NULL); ASSERT(pReader != NULL);
snapshotSenderStart(pSender, snapshot, pReader); snapshotSenderStart(pSender, snapshot, pReader);
char* eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender start");
syncNodeEventLog(ths, eventLog);
taosMemoryFree(eventLog);
} else { } else {
// no snapshot // no snapshot
if (pReader != NULL) { if (pReader != NULL) {
...@@ -260,23 +259,6 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ...@@ -260,23 +259,6 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
} }
} }
/*
bool hasSnapshot = syncNodeHasSnapshot(ths);
SSnapshot snapshot;
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
// start sending snapshot first time
// start here, stop by receiver
if (hasSnapshot && nextIndex <= snapshot.lastApplyIndex + 1 && !snapshotSenderIsStart(pSender) &&
pMsg->privateTerm < pSender->privateTerm) {
snapshotSenderStart(pSender);
char* eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender start");
syncNodeEventLog(ths, eventLog);
taosMemoryFree(eventLog);
}
*/
SyncIndex sentryIndex = pSender->snapshot.lastApplyIndex + 1; SyncIndex sentryIndex = pSender->snapshot.lastApplyIndex + 1;
// update nextIndex to sentryIndex // update nextIndex to sentryIndex
...@@ -300,5 +282,5 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ...@@ -300,5 +282,5 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
syncIndexMgrLog2("recv sync-append-entries-reply, after pNextIndex:", ths->pNextIndex); syncIndexMgrLog2("recv sync-append-entries-reply, after pNextIndex:", ths->pNextIndex);
syncIndexMgrLog2("recv sync-append-entries-reply, after pMatchIndex:", ths->pMatchIndex); syncIndexMgrLog2("recv sync-append-entries-reply, after pMatchIndex:", ths->pMatchIndex);
return ret; return 0;
} }
\ No newline at end of file
...@@ -698,7 +698,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { ...@@ -698,7 +698,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
} else { } else {
ret = -1; ret = -1;
terrno = TSDB_CODE_SYN_NOT_LEADER; terrno = TSDB_CODE_SYN_NOT_LEADER;
sError("syncPropose not leader, %s", syncUtilState2String(pSyncNode->state)); sError("sync propose not leader, %s", syncUtilState2String(pSyncNode->state));
goto _END; goto _END;
} }
...@@ -1414,46 +1414,59 @@ void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) { ...@@ -1414,46 +1414,59 @@ void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) {
int32_t userStrLen = strlen(str); int32_t userStrLen = strlen(str);
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
} }
SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
SyncIndex logBeginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore); SyncIndex logLastIndex = SYNC_INDEX_INVALID;
SyncIndex logBeginIndex = SYNC_INDEX_INVALID;
if (pSyncNode->pLogStore != NULL) {
logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
logBeginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
}
char* pCfgStr = syncCfg2SimpleStr(&(pSyncNode->pRaftCfg->cfg));
char* printStr = "";
if (pCfgStr != NULL) {
printStr = pCfgStr;
}
if (userStrLen < 256) { if (userStrLen < 256) {
char logBuf[128 + 256]; char logBuf[256 + 256];
if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) { if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
snprintf(logBuf, sizeof(logBuf), snprintf(logBuf, sizeof(logBuf),
"vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, " "vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, "
"replica-num:%d, " "replica-num:%d, "
"lconfig:%ld, changing:%d", "lconfig:%ld, changing:%d, restore:%d, %s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
pSyncNode->changing); pSyncNode->changing, pSyncNode->restoreFinish, printStr);
} else { } else {
snprintf(logBuf, sizeof(logBuf), "%s", str); snprintf(logBuf, sizeof(logBuf), "%s", str);
} }
sError("%s", logBuf); sError("%s", logBuf);
} else { } else {
int len = 128 + userStrLen; int len = 256 + userStrLen;
char* s = (char*)taosMemoryMalloc(len); char* s = (char*)taosMemoryMalloc(len);
if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) { if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
snprintf(s, len, snprintf(s, len,
"vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, " "vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, "
"replica-num:%d, " "replica-num:%d, "
"lconfig:%ld, changing:%d", "lconfig:%ld, changing:%d, restore:%d, %s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
pSyncNode->changing); pSyncNode->changing, pSyncNode->restoreFinish, printStr);
} else { } else {
snprintf(s, len, "%s", str); snprintf(s, len, "%s", str);
} }
sError("%s", s); sError("%s", s);
taosMemoryFree(s); taosMemoryFree(s);
} }
taosMemoryFree(pCfgStr);
} }
char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
//---------------------------------- //----------------------------------
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm,
SyncSnapshotSend *pBeginMsg); SyncSnapshotSend *pBeginMsg);
static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg);
//---------------------------------- //----------------------------------
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) { SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
...@@ -50,7 +51,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI ...@@ -50,7 +51,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot)); pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot));
pSender->finish = false; pSender->finish = false;
} else { } else {
sError("vgId:%d cannot create snapshot sender", pSyncNode->vgId); sError("vgId:%d, cannot create snapshot sender", pSyncNode->vgId);
} }
return pSender; return pSender;
...@@ -127,7 +128,7 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void ...@@ -127,7 +128,7 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void
syncEntryDestory(pEntry); syncEntryDestory(pEntry);
} else { } else {
if (pSender->snapshot.lastConfigIndex == pSender->pSyncNode->pRaftCfg->lastConfigIndex) { if (pSender->snapshot.lastConfigIndex == pSender->pSyncNode->pRaftCfg->lastConfigIndex) {
sTrace("vgId:%d sync sender get cfg from local", pSender->pSyncNode->vgId); sTrace("vgId:%d, sync sender get cfg from local", pSender->pSyncNode->vgId);
pSender->lastConfig = pSender->pSyncNode->pRaftCfg->cfg; pSender->lastConfig = pSender->pSyncNode->pRaftCfg->cfg;
getLastConfig = true; getLastConfig = true;
} }
...@@ -176,13 +177,13 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void ...@@ -176,13 +177,13 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void
// event log // event log
do { do {
char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender send"); char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender start");
syncNodeEventLog(pSender->pSyncNode, eventLog); syncNodeEventLog(pSender->pSyncNode, eventLog);
taosMemoryFree(eventLog); taosMemoryFree(eventLog);
} while (0); } while (0);
} }
void snapshotSenderStop(SSyncSnapshotSender *pSender) { void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
// close reader // close reader
if (pSender->pReader != NULL) { if (pSender->pReader != NULL) {
int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader); int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
...@@ -199,6 +200,14 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender) { ...@@ -199,6 +200,14 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender) {
// update flag // update flag
pSender->start = false; pSender->start = false;
pSender->finish = finish;
// event log
do {
char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender stop");
syncNodeEventLog(pSender->pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
} }
// when sender receive ack, call this function to send msg from seq // when sender receive ack, call this function to send msg from seq
...@@ -354,8 +363,9 @@ char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) { ...@@ -354,8 +363,9 @@ char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
uint16_t port; uint16_t port;
syncUtilU642Addr(destId.addr, host, sizeof(host), &port); syncUtilU642Addr(destId.addr, host, sizeof(host), &port);
snprintf(s, len, "%s %p laindex:%ld laterm:%lu lcindex:%ld seq:%d ack:%d finish:%d pterm:%lu replica-index:%d %s:%d", snprintf(s, len,
event, pSender, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, "%s {%p laindex:%ld laterm:%lu lcindex:%ld seq:%d ack:%d finish:%d pterm:%lu replica-index:%d %s:%d}", event,
pSender, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm,
pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, pSender->finish, pSender->privateTerm, pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, pSender->finish, pSender->privateTerm,
pSender->replicaIndex, host, port); pSender->replicaIndex, host, port);
...@@ -386,7 +396,7 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from ...@@ -386,7 +396,7 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from
pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID; pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
} else { } else {
sError("vgId:%d cannot create snapshot receiver", pSyncNode->vgId); sError("vgId:%d, cannot create snapshot receiver", pSyncNode->vgId);
} }
return pReceiver; return pReceiver;
...@@ -409,9 +419,9 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) { ...@@ -409,9 +419,9 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; } bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }
// static do start // static do start by privateTerm, pBeginMsg
// receive first snapshot data // receive first snapshot data
// privateTerm, pBeginMsg // write first block data
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm,
SyncSnapshotSend *pBeginMsg) { SyncSnapshotSend *pBeginMsg) {
// update state // update state
...@@ -419,6 +429,7 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm p ...@@ -419,6 +429,7 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm p
pReceiver->privateTerm = privateTerm; pReceiver->privateTerm = privateTerm;
pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN; pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
pReceiver->fromId = pBeginMsg->srcId; pReceiver->fromId = pBeginMsg->srcId;
pReceiver->start = true;
// update snapshot // update snapshot
pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex; pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
...@@ -429,8 +440,16 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm p ...@@ -429,8 +440,16 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm p
ASSERT(pReceiver->pWriter == NULL); ASSERT(pReceiver->pWriter == NULL);
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &(pReceiver->pWriter)); int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &(pReceiver->pWriter));
ASSERT(ret == 0); ASSERT(ret == 0);
// event log
do {
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver start");
syncNodeEventLog(pReceiver->pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
} }
// force stop
static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) { static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
// force close, abandon incomplete data // force close, abandon incomplete data
if (pReceiver->pWriter != NULL) { if (pReceiver->pWriter != NULL) {
...@@ -441,33 +460,35 @@ static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) { ...@@ -441,33 +460,35 @@ static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
} }
pReceiver->start = false; pReceiver->start = false;
// event log
do {
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver force stop");
syncNodeEventLog(pReceiver->pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
} }
// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver // if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver
// if already start, force close, start again // if already start, force close, start again
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg) { void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg) {
if (!snapshotReceiverIsStart(pReceiver)) { if (!snapshotReceiverIsStart(pReceiver)) {
// start // first start
snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg); snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg);
pReceiver->start = true;
} else { } else {
// already start // already start
sInfo("snapshot recv, receiver already start"); sInfo("vgId:%d, snapshot recv, receiver already start", pReceiver->pSyncNode->vgId);
// force close, abandon incomplete data // force close, abandon incomplete data
int32_t ret = snapshotReceiverForceStop(pReceiver);
pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
ASSERT(ret == 0);
pReceiver->pWriter = NULL;
// start again // start again
snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg); snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg);
pReceiver->start = true;
} }
} }
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) { void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
if (pReceiver->pWriter != NULL) { if (pReceiver->pWriter != NULL) {
int32_t ret = int32_t ret =
pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false); pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
...@@ -477,8 +498,69 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) { ...@@ -477,8 +498,69 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) {
pReceiver->start = false; pReceiver->start = false;
if (apply) { // event log
// ++(pReceiver->privateTerm); do {
SSnapshot snapshot;
pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot);
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver stop");
syncNodeEventLog(pReceiver->pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
}
static void snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
ASSERT(pMsg->seq == SYNC_SNAPSHOT_SEQ_END);
if (pReceiver->pWriter != NULL) {
int32_t code = 0;
if (pMsg->dataLen > 0) {
code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data,
pMsg->dataLen);
ASSERT(code == 0);
}
code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true);
ASSERT(code == 0);
pReceiver->pWriter = NULL;
}
pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;
// update commit index
if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) {
pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
}
// reset wal
pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex);
// event log
do {
SSnapshot snapshot;
pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot);
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver got last data, finish, apply snapshot");
syncNodeEventLog(pReceiver->pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
}
static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
ASSERT(pMsg->seq == pReceiver->ack + 1);
if (pReceiver->pWriter != NULL) {
if (pMsg->dataLen > 0) {
int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
pMsg->data, pMsg->dataLen);
ASSERT(code == 0);
}
pReceiver->ack = pMsg->seq;
// event log
do {
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver receiving");
syncNodeEventLog(pReceiver->pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
} }
} }
...@@ -548,7 +630,7 @@ char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) ...@@ -548,7 +630,7 @@ char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event)
uint16_t port; uint16_t port;
syncUtilU642Addr(fromId.addr, host, sizeof(host), &port); syncUtilU642Addr(fromId.addr, host, sizeof(host), &port);
snprintf(s, len, "%s %p start:%d ack:%d term:%lu pterm:%lu from:%s:%d laindex:%ld laterm:%lu lcindex:%ld", event, snprintf(s, len, "%s {%p start:%d ack:%d term:%lu pterm:%lu from:%s:%d laindex:%ld laterm:%lu lcindex:%ld}", event,
pReceiver, pReceiver->start, pReceiver->ack, pReceiver->term, pReceiver->privateTerm, host, port, pReceiver, pReceiver->start, pReceiver->ack, pReceiver->term, pReceiver->privateTerm, host, port,
pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex); pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex);
...@@ -560,33 +642,20 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -560,33 +642,20 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
// get receiver // get receiver
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
bool needRsp = false; bool needRsp = false;
int32_t writeCode = 0;
// state, term, seq/ack // state, term, seq/ack
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) { if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) { if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
// begin // begin, no data
snapshotReceiverStart(pReceiver, pMsg->privateTerm, pMsg); snapshotReceiverStart(pReceiver, pMsg->privateTerm, pMsg);
pReceiver->ack = pMsg->seq;
needRsp = true; needRsp = true;
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver begin");
syncNodeEventLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
// end, finish FSM // end, finish FSM
writeCode = pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen); snapshotReceiverFinish(pReceiver, pMsg);
ASSERT(writeCode == 0); snapshotReceiverStop(pReceiver);
needRsp = true;
pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true);
if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) {
pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
}
// pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1);
pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pSyncNode->pLogStore, pMsg->lastIndex);
// maybe update lastconfig // maybe update lastconfig
if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) { if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) {
...@@ -601,89 +670,74 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -601,89 +670,74 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
syncNodeDoConfigChange(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex); syncNodeDoConfigChange(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex);
} }
SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
do {
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver finish, apply snapshot");
syncNodeEventLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
pReceiver->pWriter = NULL;
snapshotReceiverStop(pReceiver, true);
pReceiver->ack = pMsg->seq;
needRsp = true;
do {
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver stop");
syncNodeEventLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) { } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, false); // force close
snapshotReceiverStop(pReceiver, false); snapshotReceiverForceStop(pReceiver);
needRsp = false; needRsp = false;
do {
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver force close");
syncNodeEventLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
// transfering // transfering
if (pMsg->seq == pReceiver->ack + 1) { if (pMsg->seq == pReceiver->ack + 1) {
writeCode = snapshotReceiverGotData(pReceiver, pMsg);
pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
ASSERT(writeCode == 0);
pReceiver->ack = pMsg->seq;
} }
needRsp = true; needRsp = true;
do {
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver receiving");
syncNodeEventLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
} else { } else {
ASSERT(0); ASSERT(0);
} }
// send ack
if (needRsp) { if (needRsp) {
// build msg
SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId); SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId);
pRspMsg->srcId = pSyncNode->myRaftId; pRspMsg->srcId = pSyncNode->myRaftId;
pRspMsg->destId = pMsg->srcId; pRspMsg->destId = pMsg->srcId;
pRspMsg->term = pSyncNode->pRaftStore->currentTerm; pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
pRspMsg->lastIndex = pMsg->lastIndex; pRspMsg->lastIndex = pMsg->lastIndex;
pRspMsg->lastTerm = pMsg->lastTerm; pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->ack = pReceiver->ack; pRspMsg->ack = pReceiver->ack; // receiver maybe already closed
pRspMsg->code = writeCode; pRspMsg->code = 0;
pRspMsg->privateTerm = pReceiver->privateTerm; pRspMsg->privateTerm = pReceiver->privateTerm; // receiver maybe already closed
// send msg
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg); syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg);
syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg); syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg);
syncSnapshotRspDestroy(pRspMsg); syncSnapshotRspDestroy(pRspMsg);
} }
} else {
// error log
do {
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver term not equal");
syncNodeErrorLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
} }
} else { } else {
syncNodeLog2("syncNodeOnSnapshotSendCb not follower", pSyncNode); // error log
do {
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver not follower");
syncNodeErrorLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
} }
return 0; return 0;
} }
static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
ASSERT(pMsg->ack == pSender->seq);
pSender->ack = pMsg->ack;
++(pSender->seq);
}
// sender receives ack, set seq = ack + 1, send msg from seq // sender receives ack, set seq = ack + 1, send msg from seq
// if ack == SYNC_SNAPSHOT_SEQ_END, stop sender // if ack == SYNC_SNAPSHOT_SEQ_END, stop sender
int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
sInfo("recv SyncSnapshotRsp maybe replica already dropped"); sError("vgId:%d, recv sync-snapshot-rsp, maybe replica already dropped", pSyncNode->vgId);
return 0; return -1;
} }
// get sender // get sender
...@@ -695,27 +749,49 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { ...@@ -695,27 +749,49 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) { if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
// receiver ack is finish, close sender // receiver ack is finish, close sender
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) { if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
pSender->finish = true; snapshotSenderStop(pSender, true);
snapshotSenderStop(pSender);
return 0; return 0;
} }
// send next msg // send next msg
if (pMsg->ack == pSender->seq) { if (pMsg->ack == pSender->seq) {
// update sender ack // update sender ack
pSender->ack = pMsg->ack; snapshotSenderUpdateProgress(pSender, pMsg);
(pSender->seq)++;
snapshotSend(pSender); snapshotSend(pSender);
} else if (pMsg->ack == pSender->seq - 1) { } else if (pMsg->ack == pSender->seq - 1) {
snapshotReSend(pSender); snapshotReSend(pSender);
} else { } else {
ASSERT(0); do {
char logBuf[64];
snprintf(logBuf, sizeof(logBuf), "error ack, recv ack:%d, my seq:%d", pMsg->ack, pSender->seq);
char *eventLog = snapshotSender2SimpleStr(pSender, logBuf);
syncNodeErrorLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
return -1;
} }
} else {
// error log
do {
char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender term not equal");
syncNodeErrorLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
return -1;
} }
} else { } else {
syncNodeLog2("syncNodeOnSnapshotRspCb not leader", pSyncNode); // error log
do {
char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender not leader");
syncNodeErrorLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
return -1;
} }
return 0; return 0;
......
...@@ -229,7 +229,7 @@ typedef struct { ...@@ -229,7 +229,7 @@ typedef struct {
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb); SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb);
void transDestroyAsyncPool(SAsyncPool* pool); void transDestroyAsyncPool(SAsyncPool* pool);
int transSendAsync(SAsyncPool* pool, queue* mq); int transAsyncSend(SAsyncPool* pool, queue* mq);
#define TRANS_DESTROY_ASYNC_POOL_MSG(pool, msgType, freeFunc) \ #define TRANS_DESTROY_ASYNC_POOL_MSG(pool, msgType, freeFunc) \
do { \ do { \
......
...@@ -967,7 +967,7 @@ void cliSendQuit(SCliThrd* thrd) { ...@@ -967,7 +967,7 @@ void cliSendQuit(SCliThrd* thrd) {
// cli can stop gracefully // cli can stop gracefully
SCliMsg* msg = taosMemoryCalloc(1, sizeof(SCliMsg)); SCliMsg* msg = taosMemoryCalloc(1, sizeof(SCliMsg));
msg->type = Quit; msg->type = Quit;
transSendAsync(thrd->asyncPool, &msg->q); transAsyncSend(thrd->asyncPool, &msg->q);
} }
void cliWalkCb(uv_handle_t* handle, void* arg) { void cliWalkCb(uv_handle_t* handle, void* arg) {
if (!uv_is_closing(handle)) { if (!uv_is_closing(handle)) {
...@@ -1138,7 +1138,7 @@ void transReleaseCliHandle(void* handle) { ...@@ -1138,7 +1138,7 @@ void transReleaseCliHandle(void* handle) {
cmsg->msg = tmsg; cmsg->msg = tmsg;
cmsg->type = Release; cmsg->type = Release;
transSendAsync(pThrd->asyncPool, &cmsg->q); transAsyncSend(pThrd->asyncPool, &cmsg->q);
return; return;
} }
...@@ -1171,7 +1171,7 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra ...@@ -1171,7 +1171,7 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra
STraceId* trace = &pReq->info.traceId; STraceId* trace = &pReq->info.traceId;
tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), pThrd->pid, tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
ASSERT(transSendAsync(pThrd->asyncPool, &(cliMsg->q)) == 0); ASSERT(transAsyncSend(pThrd->asyncPool, &(cliMsg->q)) == 0);
return; return;
} }
...@@ -1205,7 +1205,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM ...@@ -1205,7 +1205,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), pThrd->pid, tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
transSendAsync(pThrd->asyncPool, &(cliMsg->q)); transAsyncSend(pThrd->asyncPool, &(cliMsg->q));
tsem_wait(sem); tsem_wait(sem);
tsem_destroy(sem); tsem_destroy(sem);
taosMemoryFree(sem); taosMemoryFree(sem);
...@@ -1234,7 +1234,7 @@ void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { ...@@ -1234,7 +1234,7 @@ void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
SCliThrd* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i]; SCliThrd* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i];
tDebug("%s update epset at thread:%08" PRId64 "", pTransInst->label, thrd->pid); tDebug("%s update epset at thread:%08" PRId64 "", pTransInst->label, thrd->pid);
transSendAsync(thrd->asyncPool, &(cliMsg->q)); transAsyncSend(thrd->asyncPool, &(cliMsg->q));
} }
} }
#endif #endif
...@@ -202,7 +202,7 @@ void transDestroyAsyncPool(SAsyncPool* pool) { ...@@ -202,7 +202,7 @@ void transDestroyAsyncPool(SAsyncPool* pool) {
taosMemoryFree(pool->asyncs); taosMemoryFree(pool->asyncs);
taosMemoryFree(pool); taosMemoryFree(pool);
} }
int transSendAsync(SAsyncPool* pool, queue* q) { int transAsyncSend(SAsyncPool* pool, queue* q) {
int idx = pool->index; int idx = pool->index;
idx = idx % pool->nAsync; idx = idx % pool->nAsync;
// no need mutex here // no need mutex here
......
...@@ -999,7 +999,7 @@ void sendQuitToWorkThrd(SWorkThrd* pThrd) { ...@@ -999,7 +999,7 @@ void sendQuitToWorkThrd(SWorkThrd* pThrd) {
SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg)); SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg));
msg->type = Quit; msg->type = Quit;
tDebug("server send quit msg to work thread"); tDebug("server send quit msg to work thread");
transSendAsync(pThrd->asyncPool, &msg->q); transAsyncSend(pThrd->asyncPool, &msg->q);
} }
void transCloseServer(void* arg) { void transCloseServer(void* arg) {
...@@ -1070,7 +1070,7 @@ void transReleaseSrvHandle(void* handle) { ...@@ -1070,7 +1070,7 @@ void transReleaseSrvHandle(void* handle) {
m->type = Release; m->type = Release;
tTrace("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle); tTrace("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle);
transSendAsync(pThrd->asyncPool, &m->q); transAsyncSend(pThrd->asyncPool, &m->q);
transReleaseExHandle(refMgt, refId); transReleaseExHandle(refMgt, refId);
return; return;
_return1: _return1:
...@@ -1099,7 +1099,7 @@ void transSendResponse(const STransMsg* msg) { ...@@ -1099,7 +1099,7 @@ void transSendResponse(const STransMsg* msg) {
STraceId* trace = (STraceId*)&msg->info.traceId; STraceId* trace = (STraceId*)&msg->info.traceId;
tGTrace("conn %p start to send resp (1/2)", exh->handle); tGTrace("conn %p start to send resp (1/2)", exh->handle);
transSendAsync(pThrd->asyncPool, &m->q); transAsyncSend(pThrd->asyncPool, &m->q);
transReleaseExHandle(refMgt, refId); transReleaseExHandle(refMgt, refId);
return; return;
_return1: _return1:
...@@ -1128,7 +1128,7 @@ void transRegisterMsg(const STransMsg* msg) { ...@@ -1128,7 +1128,7 @@ void transRegisterMsg(const STransMsg* msg) {
m->type = Register; m->type = Register;
tTrace("%s conn %p start to register brokenlink callback", transLabel(pThrd->pTransInst), exh->handle); tTrace("%s conn %p start to register brokenlink callback", transLabel(pThrd->pTransInst), exh->handle);
transSendAsync(pThrd->asyncPool, &m->q); transAsyncSend(pThrd->asyncPool, &m->q);
transReleaseExHandle(refMgt, refId); transReleaseExHandle(refMgt, refId);
return; return;
......
...@@ -637,5 +637,13 @@ class TDCom: ...@@ -637,5 +637,13 @@ class TDCom:
column_value_str = ", ".join(str(v) for v in column_value_list) column_value_str = ", ".join(str(v) for v in column_value_list)
insert_sql = f'insert into {dbname}.{tbname} values ({column_value_str});' insert_sql = f'insert into {dbname}.{tbname} values ({column_value_str});'
tsql.execute(insert_sql) tsql.execute(insert_sql)
def getOneRow(self, location, containElm):
res_list = list()
if 0 <= location < tdSql.queryRows:
for row in tdSql.queryResult:
if row[location] == containElm:
res_list.append(row)
return res_list
else:
tdLog.exit(f"getOneRow out of range: row_index={location} row_count={self.query_row}")
tdCom = TDCom() tdCom = TDCom()
...@@ -91,6 +91,7 @@ ...@@ -91,6 +91,7 @@
./test.sh -f tsim/stream/basic1.sim ./test.sh -f tsim/stream/basic1.sim
./test.sh -f tsim/stream/basic2.sim ./test.sh -f tsim/stream/basic2.sim
./test.sh -f tsim/stream/distributeInterval0.sim ./test.sh -f tsim/stream/distributeInterval0.sim
# ./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
# ./test.sh -f tsim/stream/distributesession0.sim # ./test.sh -f tsim/stream/distributesession0.sim
# ./test.sh -f tsim/stream/session0.sim # ./test.sh -f tsim/stream/session0.sim
./test.sh -f tsim/stream/session1.sim ./test.sh -f tsim/stream/session1.sim
......
...@@ -19,7 +19,7 @@ for /F "usebackq tokens=*" %%i in (!caseFile!) do ( ...@@ -19,7 +19,7 @@ for /F "usebackq tokens=*" %%i in (!caseFile!) do (
call :GetTimeSeconds !time! call :GetTimeSeconds !time!
set time1=!_timeTemp! set time1=!_timeTemp!
echo Start at !time! echo Start at !time!
call !line:./test.sh=wtest.bat! > result_!a!.txt 2>error_!a!.txt call !line:./test.sh=wtest.bat! > result_!a!.txt 2>error_!a!.txt || set /a errorlevel=8
if errorlevel 1 ( call :colorEcho 0c "failed" &echo. && set /a exitNum=8 && echo %%i >>failed.txt ) else ( call :colorEcho 0a "Success" &echo. ) if errorlevel 1 ( call :colorEcho 0c "failed" &echo. && set /a exitNum=8 && echo %%i >>failed.txt ) else ( call :colorEcho 0a "Success" &echo. )
) )
) )
......
...@@ -56,8 +56,14 @@ echo charset UTF-8 >> %TAOS_CFG% ...@@ -56,8 +56,14 @@ echo charset UTF-8 >> %TAOS_CFG%
set "FILE_NAME=testSuite.sim" set "FILE_NAME=testSuite.sim"
if "%1" == "-f" set "FILE_NAME=%2" if "%1" == "-f" set "FILE_NAME=%2"
set FILE_NAME=%FILE_NAME:/=\%
start cmd /k "timeout /t 600 /NOBREAK && taskkill /f /im tsim.exe & exit /b"
rem echo FILE_NAME: %FILE_NAME% rem echo FILE_NAME: %FILE_NAME%
echo ExcuteCmd: %tsim% -c %CFG_DIR% -f %FILE_NAME% echo ExcuteCmd: %tsim% -c %CFG_DIR% -f %FILE_NAME%
set result=false
%TSIM% -c %CFG_DIR% -f %FILE_NAME% && set result=true
%TSIM% -c %CFG_DIR% -f %FILE_NAME% tasklist | grep timeout && taskkill /f /im timeout.exe
\ No newline at end of file if "%result%" == "true" ( exit /b ) else ( exit /b 8 )
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册