提交 59f4d2c9 编写于 作者: C Cary Xu

Merge branch '3.0' into feature/TD-11274-3.0

/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TAOSUDF_H
#define TDENGINE_TAOSUDF_H
#include <stdint.h>
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>
#include <taos.h>
#include <taoserror.h>
#ifdef __cplusplus
extern "C" {
#endif
#if defined(__GNUC__)
#define FORCE_INLINE inline __attribute__((always_inline))
#else
#define FORCE_INLINE
#endif
typedef struct SUdfColumnMeta {
int16_t type;
int32_t bytes;
uint8_t precision;
uint8_t scale;
} SUdfColumnMeta;
typedef struct SUdfColumnData {
int32_t numOfRows;
int32_t rowsAlloc;
union {
struct {
int32_t nullBitmapLen;
char *nullBitmap;
int32_t dataLen;
char *data;
} fixLenCol;
struct {
int32_t varOffsetsLen;
int32_t *varOffsets;
int32_t payloadLen;
char *payload;
int32_t payloadAllocLen;
} varLenCol;
};
} SUdfColumnData;
typedef struct SUdfColumn {
SUdfColumnMeta colMeta;
bool hasNull;
SUdfColumnData colData;
} SUdfColumn;
typedef struct SUdfDataBlock {
int32_t numOfRows;
int32_t numOfCols;
SUdfColumn **udfCols;
} SUdfDataBlock;
typedef struct SUdfInterBuf {
int32_t bufLen;
char* buf;
int8_t numOfResult; //zero or one
} SUdfInterBuf;
typedef void *UdfcFuncHandle;
// dynamic lib init and destroy
typedef int32_t (*TUdfInitFunc)();
typedef int32_t (*TUdfDestroyFunc)();
#define UDF_MEMORY_EXP_GROWTH 1.5
#define NBIT (3u)
#define BitPos(_n) ((_n) & ((1 << NBIT) - 1))
#define BMCharPos(bm_, r_) ((bm_)[(r_) >> NBIT])
#define BitmapLen(_n) (((_n) + ((1 << NBIT) - 1)) >> NBIT)
#define udfColDataIsNull_var(pColumn, row) ((pColumn->colData.varLenCol.varOffsets)[row] == -1)
#define udfColDataIsNull_f(pColumn, row) ((BMCharPos(pColumn->colData.fixLenCol.nullBitmap, row) & (1u << (7u - BitPos(row)))) == (1u << (7u - BitPos(row))))
#define udfColDataSetNull_f(pColumn, row) \
do { \
BMCharPos(pColumn->colData.fixLenCol.nullBitmap, row) |= (1u << (7u - BitPos(row))); \
} while (0)
#define udfColDataSetNotNull_f(pColumn, r_) \
do { \
BMCharPos(pColumn->colData.fixLenCol.nullBitmap, r_) &= ~(1u << (7u - BitPos(r_))); \
} while (0)
#define udfColDataSetNull_var(pColumn, row) ((pColumn->colData.varLenCol.varOffsets)[row] = -1)
typedef uint16_t VarDataLenT; // maxVarDataLen: 32767
#define VARSTR_HEADER_SIZE sizeof(VarDataLenT)
#define varDataLen(v) ((VarDataLenT *)(v))[0]
#define varDataVal(v) ((char *)(v) + VARSTR_HEADER_SIZE)
#define varDataTLen(v) (sizeof(VarDataLenT) + varDataLen(v))
#define varDataCopy(dst, v) memcpy((dst), (void *)(v), varDataTLen(v))
#define varDataLenByData(v) (*(VarDataLenT *)(((char *)(v)) - VARSTR_HEADER_SIZE))
#define varDataSetLen(v, _len) (((VarDataLenT *)(v))[0] = (VarDataLenT)(_len))
#define IS_VAR_DATA_TYPE(t) \
(((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_NCHAR) || ((t) == TSDB_DATA_TYPE_JSON))
#define IS_STR_DATA_TYPE(t) (((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_NCHAR))
static FORCE_INLINE char* udfColDataGetData(const SUdfColumn* pColumn, int32_t row) {
if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
return pColumn->colData.varLenCol.payload + pColumn->colData.varLenCol.varOffsets[row];
} else {
return pColumn->colData.fixLenCol.data + pColumn->colMeta.bytes * row;
}
}
static FORCE_INLINE bool udfColDataIsNull(const SUdfColumn* pColumn, int32_t row) {
if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
if (pColumn->colMeta.type == TSDB_DATA_TYPE_JSON) {
if (udfColDataIsNull_var(pColumn, row)) {
return true;
}
char* data = udfColDataGetData(pColumn, row);
return (*data == TSDB_DATA_TYPE_NULL);
} else {
return udfColDataIsNull_var(pColumn, row);
}
} else {
return udfColDataIsNull_f(pColumn, row);
}
}
static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn* pColumn, int32_t newCapacity) {
SUdfColumnMeta *meta = &pColumn->colMeta;
SUdfColumnData *data = &pColumn->colData;
if (newCapacity== 0 || newCapacity <= data->rowsAlloc) {
return TSDB_CODE_SUCCESS;
}
int allocCapacity = (data->rowsAlloc< 8) ? 8 : data->rowsAlloc;
while (allocCapacity < newCapacity) {
allocCapacity *= UDF_MEMORY_EXP_GROWTH;
}
if (IS_VAR_DATA_TYPE(meta->type)) {
char* tmp = (char*)realloc(data->varLenCol.varOffsets, sizeof(int32_t) * allocCapacity);
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
data->varLenCol.varOffsets = (int32_t*)tmp;
data->varLenCol.varOffsetsLen = sizeof(int32_t) * allocCapacity;
// for payload, add data in udfColDataAppend
} else {
char* tmp = (char*)realloc(data->fixLenCol.nullBitmap, BitmapLen(allocCapacity));
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
data->fixLenCol.nullBitmap = tmp;
data->fixLenCol.nullBitmapLen = BitmapLen(allocCapacity);
if (meta->type == TSDB_DATA_TYPE_NULL) {
return TSDB_CODE_SUCCESS;
}
tmp = (char*)realloc(data->fixLenCol.data, allocCapacity* meta->bytes);
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
data->fixLenCol.data = tmp;
data->fixLenCol.dataLen = allocCapacity* meta->bytes;
}
data->rowsAlloc = allocCapacity;
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE void udfColDataSetNull(SUdfColumn* pColumn, int32_t row) {
udfColEnsureCapacity(pColumn, row+1);
if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
udfColDataSetNull_var(pColumn, row);
} else {
udfColDataSetNull_f(pColumn, row);
}
pColumn->hasNull = true;
}
static FORCE_INLINE int32_t udfColDataSet(SUdfColumn* pColumn, uint32_t currentRow, const char* pData, bool isNull) {
SUdfColumnMeta *meta = &pColumn->colMeta;
SUdfColumnData *data = &pColumn->colData;
udfColEnsureCapacity(pColumn, currentRow+1);
bool isVarCol = IS_VAR_DATA_TYPE(meta->type);
if (isNull) {
udfColDataSetNull(pColumn, currentRow);
} else {
if (!isVarCol) {
udfColDataSetNotNull_f(pColumn, currentRow);
memcpy(data->fixLenCol.data + meta->bytes * currentRow, pData, meta->bytes);
} else {
int32_t dataLen = varDataTLen(pData);
if (meta->type == TSDB_DATA_TYPE_JSON) {
if (*pData == TSDB_DATA_TYPE_NULL) {
dataLen = 0;
} else if (*pData == TSDB_DATA_TYPE_NCHAR) {
dataLen = varDataTLen(pData + sizeof(char));
} else if (*pData == TSDB_DATA_TYPE_BIGINT || *pData == TSDB_DATA_TYPE_DOUBLE) {
dataLen = sizeof(int64_t);
} else if (*pData == TSDB_DATA_TYPE_BOOL) {
dataLen = sizeof(char);
}
dataLen += sizeof(char);
}
if (data->varLenCol.payloadAllocLen < data->varLenCol.payloadLen + dataLen) {
uint32_t newSize = data->varLenCol.payloadAllocLen;
if (newSize <= 1) {
newSize = 8;
}
while (newSize < data->varLenCol.payloadLen + dataLen) {
newSize = newSize * UDF_MEMORY_EXP_GROWTH;
}
char *buf = (char*)realloc(data->varLenCol.payload, newSize);
if (buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
data->varLenCol.payload = buf;
data->varLenCol.payloadAllocLen = newSize;
}
uint32_t len = data->varLenCol.payloadLen;
data->varLenCol.varOffsets[currentRow] = len;
memcpy(data->varLenCol.payload + len, pData, dataLen);
data->varLenCol.payloadLen += dataLen;
}
}
data->numOfRows = (currentRow + 1 > data->numOfRows) ? (currentRow+1) : data->numOfRows;
return 0;
}
typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol);
typedef int32_t (*TUdfAggStartFunc)(SUdfInterBuf *buf);
typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf);
typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf* buf, SUdfInterBuf *resultData);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TAOSUDF_H
......@@ -16,6 +16,13 @@
#ifndef TDENGINE_TUDF_H
#define TDENGINE_TUDF_H
#undef malloc
#define malloc malloc
#undef free
#define free free
#undef realloc
#define alloc alloc
#include <taosudf.h>
#include <stdint.h>
#include <stdbool.h>
......@@ -36,56 +43,6 @@ extern "C" {
#endif
#define UDF_DNODE_ID_ENV_NAME "DNODE_ID"
//======================================================================================
//begin API to taosd and qworker
typedef struct SUdfColumnMeta {
int16_t type;
int32_t bytes;
uint8_t precision;
uint8_t scale;
} SUdfColumnMeta;
typedef struct SUdfColumnData {
int32_t numOfRows;
int32_t rowsAlloc;
union {
struct {
int32_t nullBitmapLen;
char *nullBitmap;
int32_t dataLen;
char *data;
} fixLenCol;
struct {
int32_t varOffsetsLen;
int32_t *varOffsets;
int32_t payloadLen;
char *payload;
int32_t payloadAllocLen;
} varLenCol;
};
} SUdfColumnData;
typedef struct SUdfColumn {
SUdfColumnMeta colMeta;
bool hasNull;
SUdfColumnData colData;
} SUdfColumn;
typedef struct SUdfDataBlock {
int32_t numOfRows;
int32_t numOfCols;
SUdfColumn **udfCols;
} SUdfDataBlock;
typedef struct SUdfInterBuf {
int32_t bufLen;
char* buf;
int8_t numOfResult; //zero or one
} SUdfInterBuf;
typedef void *UdfcFuncHandle;
//low level APIs
/**
......@@ -127,177 +84,6 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock);
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output);
int32_t cleanUpUdfs();
// end API to taosd and qworker
//=============================================================================================================================
// begin API to UDF writer.
// dynamic lib init and destroy
typedef int32_t (*TUdfInitFunc)();
typedef int32_t (*TUdfDestroyFunc)();
//TODO: add API to check function arguments type, number etc.
#define UDF_MEMORY_EXP_GROWTH 1.5
#define udfColDataIsNull_var(pColumn, row) ((pColumn->colData.varLenCol.varOffsets)[row] == -1)
#define udfColDataIsNull_f(pColumn, row) ((BMCharPos(pColumn->colData.fixLenCol.nullBitmap, row) & (1u << (7u - BitPos(row)))) == (1u << (7u - BitPos(row))))
#define udfColDataSetNull_f(pColumn, row) \
do { \
BMCharPos(pColumn->colData.fixLenCol.nullBitmap, row) |= (1u << (7u - BitPos(row))); \
} while (0)
#define udfColDataSetNotNull_f(pColumn, r_) \
do { \
BMCharPos(pColumn->colData.fixLenCol.nullBitmap, r_) &= ~(1u << (7u - BitPos(r_))); \
} while (0)
#define udfColDataSetNull_var(pColumn, row) ((pColumn->colData.varLenCol.varOffsets)[row] = -1)
static FORCE_INLINE char* udfColDataGetData(const SUdfColumn* pColumn, int32_t row) {
if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
return pColumn->colData.varLenCol.payload + pColumn->colData.varLenCol.varOffsets[row];
} else {
return pColumn->colData.fixLenCol.data + pColumn->colMeta.bytes * row;
}
}
static FORCE_INLINE bool udfColDataIsNull(const SUdfColumn* pColumn, int32_t row) {
if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
if (pColumn->colMeta.type == TSDB_DATA_TYPE_JSON) {
if (udfColDataIsNull_var(pColumn, row)) {
return true;
}
char* data = udfColDataGetData(pColumn, row);
return (*data == TSDB_DATA_TYPE_NULL);
} else {
return udfColDataIsNull_var(pColumn, row);
}
} else {
return udfColDataIsNull_f(pColumn, row);
}
}
static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn* pColumn, int32_t newCapacity) {
SUdfColumnMeta *meta = &pColumn->colMeta;
SUdfColumnData *data = &pColumn->colData;
if (newCapacity== 0 || newCapacity <= data->rowsAlloc) {
return TSDB_CODE_SUCCESS;
}
int allocCapacity = TMAX(data->rowsAlloc, 8);
while (allocCapacity < newCapacity) {
allocCapacity *= UDF_MEMORY_EXP_GROWTH;
}
if (IS_VAR_DATA_TYPE(meta->type)) {
char* tmp = taosMemoryRealloc(data->varLenCol.varOffsets, sizeof(int32_t) * allocCapacity);
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
data->varLenCol.varOffsets = (int32_t*)tmp;
data->varLenCol.varOffsetsLen = sizeof(int32_t) * allocCapacity;
// for payload, add data in udfColDataAppend
} else {
char* tmp = taosMemoryRealloc(data->fixLenCol.nullBitmap, BitmapLen(allocCapacity));
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
data->fixLenCol.nullBitmap = tmp;
data->fixLenCol.nullBitmapLen = BitmapLen(allocCapacity);
if (meta->type == TSDB_DATA_TYPE_NULL) {
return TSDB_CODE_SUCCESS;
}
tmp = taosMemoryRealloc(data->fixLenCol.data, allocCapacity* meta->bytes);
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
data->fixLenCol.data = tmp;
data->fixLenCol.dataLen = allocCapacity* meta->bytes;
}
data->rowsAlloc = allocCapacity;
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE void udfColDataSetNull(SUdfColumn* pColumn, int32_t row) {
udfColEnsureCapacity(pColumn, row+1);
if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
udfColDataSetNull_var(pColumn, row);
} else {
udfColDataSetNull_f(pColumn, row);
}
pColumn->hasNull = true;
}
static FORCE_INLINE int32_t udfColDataSet(SUdfColumn* pColumn, uint32_t currentRow, const char* pData, bool isNull) {
SUdfColumnMeta *meta = &pColumn->colMeta;
SUdfColumnData *data = &pColumn->colData;
udfColEnsureCapacity(pColumn, currentRow+1);
bool isVarCol = IS_VAR_DATA_TYPE(meta->type);
if (isNull) {
udfColDataSetNull(pColumn, currentRow);
} else {
if (!isVarCol) {
colDataSetNotNull_f(data->fixLenCol.nullBitmap, currentRow);
memcpy(data->fixLenCol.data + meta->bytes * currentRow, pData, meta->bytes);
} else {
int32_t dataLen = varDataTLen(pData);
if (meta->type == TSDB_DATA_TYPE_JSON) {
if (*pData == TSDB_DATA_TYPE_NULL) {
dataLen = 0;
} else if (*pData == TSDB_DATA_TYPE_NCHAR) {
dataLen = varDataTLen(pData + CHAR_BYTES);
} else if (*pData == TSDB_DATA_TYPE_BIGINT || *pData == TSDB_DATA_TYPE_DOUBLE) {
dataLen = LONG_BYTES;
} else if (*pData == TSDB_DATA_TYPE_BOOL) {
dataLen = CHAR_BYTES;
}
dataLen += CHAR_BYTES;
}
if (data->varLenCol.payloadAllocLen < data->varLenCol.payloadLen + dataLen) {
uint32_t newSize = data->varLenCol.payloadAllocLen;
if (newSize <= 1) {
newSize = 8;
}
while (newSize < data->varLenCol.payloadLen + dataLen) {
newSize = newSize * UDF_MEMORY_EXP_GROWTH;
}
char *buf = taosMemoryRealloc(data->varLenCol.payload, newSize);
if (buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
data->varLenCol.payload = buf;
data->varLenCol.payloadAllocLen = newSize;
}
uint32_t len = data->varLenCol.payloadLen;
data->varLenCol.varOffsets[currentRow] = len;
memcpy(data->varLenCol.payload + len, pData, dataLen);
data->varLenCol.payloadLen += dataLen;
}
}
data->numOfRows = TMAX(currentRow + 1, data->numOfRows);
return 0;
}
typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol);
typedef int32_t (*TUdfAggStartFunc)(SUdfInterBuf *buf);
typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf);
typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf* buf, SUdfInterBuf *resultData);
// end API to UDF writer
//=======================================================================================================================
#ifdef __cplusplus
}
......
......@@ -69,7 +69,7 @@ typedef struct SRpcMsg {
} SRpcMsg;
typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *rf);
typedef bool (*RpcRfp)(int32_t code);
typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType);
typedef struct SRpcInit {
char localFqdn[TSDB_FQDN_LEN];
......
......@@ -25,7 +25,7 @@ extern "C" {
// clang-format off
#define TAOS_DEF_ERROR_CODE(mod, code) ((int32_t)((0x80000000 | ((mod)<<16) | (code))))
#define TAOS_SYSTEM_ERROR(code) (0x80ff0000 | (code))
#define TAOS_SUCCEEDED(err) ((err) >= 0)
#define TAOS_FAILED(err) ((err) < 0)
......@@ -35,7 +35,7 @@ const char* terrstr();
int32_t* taosGetErrno();
#define terrno (*taosGetErrno())
#define TSDB_CODE_SUCCESS 0
#define TSDB_CODE_FAILED -1 // unknown or needn't tell detail error
......
......@@ -84,9 +84,12 @@ void closeTransporter(STscObj *pTscObj) {
rpcClose(pTscObj->pAppInfo->pTransporter);
}
static bool clientRpcRfp(int32_t code) {
static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) {
if (msgType == TDMT_VND_QUERY || msgType == TDMT_VND_FETCH) {
return false;
}
return true;
} else {
return false;
......
......@@ -248,9 +248,12 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) {
}
}
static bool rpcRfp(int32_t code) {
static bool rpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) {
if (msgType == TDMT_VND_QUERY || msgType == TDMT_VND_FETCH) {
return false;
}
return true;
} else {
return false;
......
......@@ -273,8 +273,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
}
// 1. close current opened time window
if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId &&
pResult->offset != pResultRowInfo->cur.offset))) {
if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
SResultRowPosition pos = pResultRowInfo->cur;
SFilePage* pPage = getBufPage(pResultBuf, pos.pageId);
releaseBufPage(pResultBuf, pPage);
......
......@@ -191,6 +191,11 @@ bool getUniqueFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool uniqueFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t uniqueFunction(SqlFunctionCtx *pCtx);
bool getModeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool modeFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t modeFunction(SqlFunctionCtx *pCtx);
int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool twaFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t twaFunction(SqlFunctionCtx *pCtx);
......
......@@ -1045,20 +1045,28 @@ static int32_t translateFirstLastMerge(SFunctionNode* pFunc, char* pErrBuf, int3
return translateFirstLastImpl(pFunc, pErrBuf, len, false);
}
static int32_t translateUnique(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
static int32_t translateUniqueMode(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isUnique) {
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
if (!nodesExprHasColumn(pPara)) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "The parameters of UNIQUE must contain columns");
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "The parameters of %s must contain columns", isUnique ? "UNIQUE" : "MODE");
}
pFunc->node.resType = ((SExprNode*)pPara)->resType;
return TSDB_CODE_SUCCESS;
}
static int32_t translateUnique(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateUniqueMode(pFunc, pErrBuf, len, true);
}
static int32_t translateMode(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateUniqueMode(pFunc, pErrBuf, len, false);
}
static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
if (numOfParams == 0 || numOfParams > 2) {
......@@ -2109,7 +2117,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "unique",
.type = FUNCTION_TYPE_UNIQUE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC |
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC |
FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_WINDOW_FUNC | FUNC_MGT_FORBID_GROUP_BY_FUNC,
.translateFunc = translateUnique,
.getEnvFunc = getUniqueFuncEnv,
......@@ -2117,6 +2125,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = uniqueFunction,
.finalizeFunc = NULL
},
{
.name = "mode",
.type = FUNCTION_TYPE_MODE,
.classification = FUNC_MGT_AGG_FUNC,
.translateFunc = translateMode,
.getEnvFunc = getModeFuncEnv,
.initFunc = modeFunctionSetup,
.processFunc = modeFunction,
.finalizeFunc = modeFinalize,
},
{
.name = "abs",
.type = FUNCTION_TYPE_ABS,
......
......@@ -32,6 +32,7 @@
#define TAIL_MAX_OFFSET 100
#define UNIQUE_MAX_RESULT_SIZE (1024 * 1024 * 10)
#define MODE_MAX_RESULT_SIZE UNIQUE_MAX_RESULT_SIZE
#define HLL_BUCKET_BITS 14 // The bits of the bucket
#define HLL_DATA_BITS (64 - HLL_BUCKET_BITS)
......@@ -246,6 +247,19 @@ typedef struct SUniqueInfo {
char pItems[];
} SUniqueInfo;
typedef struct SModeItem {
int64_t count;
char data[];
} SModeItem;
typedef struct SModeInfo {
int32_t numOfPoints;
uint8_t colType;
int16_t colBytes;
SHashObj* pHash;
char pItems[];
} SModeInfo;
typedef struct SDerivInfo {
double prevValue; // previous value
TSKEY prevTs; // previous timestamp
......@@ -4694,21 +4708,100 @@ int32_t uniqueFunction(SqlFunctionCtx* pCtx) {
return pInfo->numOfPoints;
}
int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
bool getModeFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SModeInfo) + MODE_MAX_RESULT_SIZE;
return true;
}
bool modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
if (!functionSetup(pCtx, pResInfo)) {
return false;
}
SModeInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
pInfo->numOfPoints = 0;
pInfo->colType = pCtx->resDataInfo.type;
pInfo->colBytes = pCtx->resDataInfo.bytes;
if (pInfo->pHash != NULL) {
taosHashClear(pInfo->pHash);
} else {
pInfo->pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
}
return true;
}
static void doModeAdd(SModeInfo* pInfo, char* data, bool isNull) {
// ignore null elements
if (isNull) {
return;
}
int32_t hashKeyBytes = IS_VAR_DATA_TYPE(pInfo->colType) ? varDataTLen(data) : pInfo->colBytes;
SModeItem** pHashItem = taosHashGet(pInfo->pHash, data, hashKeyBytes);
if (pHashItem == NULL) {
int32_t size = sizeof(SModeItem) + pInfo->colBytes;
SModeItem* pItem = (SModeItem*)(pInfo->pItems + pInfo->numOfPoints * size);
memcpy(pItem->data, data, pInfo->colBytes);
pItem->count += 1;
taosHashPut(pInfo->pHash, data, hashKeyBytes, &pItem, sizeof(SModeItem*));
pInfo->numOfPoints++;
} else {
(*pHashItem)->count += 1;
}
}
int32_t modeFunction(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
SModeInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
int32_t startOffset = pCtx->offset;
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
char* data = colDataGetData(pInputCol, i);
doModeAdd(pInfo, data, colDataIsNull_s(pInputCol, i));
if (sizeof(SModeInfo) + pInfo->numOfPoints * (sizeof(SModeItem) + pInfo->colBytes) >= MODE_MAX_RESULT_SIZE) {
taosHashCleanup(pInfo->pHash);
return TSDB_CODE_OUT_OF_MEMORY;
}
}
SET_VAL(pResInfo, 1, 1);
return TSDB_CODE_SUCCESS;
}
int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SModeInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
int32_t currentRow = pBlock->info.rows;
for (int32_t i = 0; i < pResInfo->numOfRes; ++i) {
SUniqueItem* pItem = (SUniqueItem*)(pInfo->pItems + i * (sizeof(SUniqueItem) + pInfo->colBytes));
colDataAppend(pCol, i, pItem->data, false);
// TODO: handle ts output
int32_t resIndex;
int32_t maxCount = 0;
for (int32_t i = 0; i < pInfo->numOfPoints; ++i) {
SModeItem* pItem = (SModeItem*)(pInfo->pItems + i * (sizeof(SModeItem) + pInfo->colBytes));
if (pItem->count > maxCount) {
maxCount = pItem->count;
resIndex = i;
} else if (pItem->count == maxCount) {
resIndex = -1;
}
}
SModeItem* pResItem = (SModeItem*)(pInfo->pItems + resIndex * (sizeof(SModeItem) + pInfo->colBytes));
colDataAppend(pCol, currentRow, pResItem->data, (resIndex == -1) ? true : false);
return pResInfo->numOfRes;
}
bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(STwaInfo);
return true;
......
......@@ -110,7 +110,7 @@ static void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf);
static int32_t udfdConnectToMnode();
static int32_t udfdLoadUdf(char *udfName, SUdf *udf);
static bool udfdRpcRfp(int32_t code);
static bool udfdRpcRfp(int32_t code, tmsg_t msgType);
static int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
static int32_t udfdOpenClientRpc();
static int32_t udfdCloseClientRpc();
......@@ -546,9 +546,12 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
}
return 0;
}
static bool udfdRpcRfp(int32_t code) {
static bool udfdRpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) {
if (msgType == TDMT_VND_QUERY || msgType == TDMT_VND_FETCH) {
return false;
}
return true;
} else {
return false;
......
......@@ -2,12 +2,8 @@
#include <stdlib.h>
#include <stdio.h>
#include "tudf.h"
#include "taosudf.h"
#undef malloc
#define malloc malloc
#undef free
#define free free
DLL_EXPORT int32_t udf1_init() {
return 0;
......
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <math.h>
#include "tudf.h"
#undef malloc
#define malloc malloc
#undef free
#define free free
#include "taosudf.h"
DLL_EXPORT int32_t udf2_init() {
return 0;
......
......@@ -1360,6 +1360,83 @@ static int32_t eliminateSetOpOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLo
return eliminateSetOpOptimizeImpl(pCxt, pLogicSubplan, pSetOpNode);
}
//===================================================================================================================
// merge projects
static bool mergeProjectsMayBeOptimized(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren)) {
return false;
}
SLogicNode *pChild = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
if (QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pChild) || 1 < LIST_LENGTH(pChild->pChildren) ||
NULL != pChild->pConditions || NULL != pNode->pLimit || NULL != pNode->pSlimit) {
return false;
}
return true;
}
typedef struct SMergeProjectionsContext {
SProjectLogicNode* pChildProj;
int32_t errCode;
} SMergeProjectionsContext;
static EDealRes mergeProjectionsExpr(SNode** pNode, void* pContext) {
SMergeProjectionsContext* pCxt = pContext;
SProjectLogicNode* pChildProj = pCxt->pChildProj;
if (QUERY_NODE_COLUMN == nodeType(*pNode)) {
SNode* pTarget;
FOREACH(pTarget, ((SLogicNode*)pChildProj)->pTargets) {
if (nodesEqualNode(pTarget, *pNode)) {
SNode* pProjection;
FOREACH(pProjection, pChildProj->pProjections) {
if (0 == strcmp(((SColumnNode*)pTarget)->colName, ((SExprNode*)pProjection)->aliasName)) {
SNode* pExpr = nodesCloneNode(pProjection);
if (pExpr == NULL) {
pCxt->errCode = terrno;
return DEAL_RES_ERROR;
}
nodesDestroyNode(*pNode);
*pNode = pExpr;
}
}
}
}
return DEAL_RES_IGNORE_CHILD;
}
return DEAL_RES_CONTINUE;
}
static int32_t mergeProjectsOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SLogicNode* pSelfNode) {
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pSelfNode->pChildren, 0);
SMergeProjectionsContext cxt = {.pChildProj = (SProjectLogicNode*)pChild, .errCode = TSDB_CODE_SUCCESS};
nodesRewriteExprs(((SProjectLogicNode*)pSelfNode)->pProjections, mergeProjectionsExpr, &cxt);
int32_t code = cxt.errCode;
if (TSDB_CODE_SUCCESS == code) {
if (1 == LIST_LENGTH(pChild->pChildren)) {
SLogicNode* pGrandChild = (SLogicNode*)nodesListGetNode(pChild->pChildren, 0);
code = replaceLogicNode(pLogicSubplan, pChild, pGrandChild);
} else { // no grand child
NODES_CLEAR_LIST(pSelfNode->pChildren);
}
}
if (TSDB_CODE_SUCCESS == code) {
NODES_CLEAR_LIST(pChild->pChildren);
}
nodesDestroyNode((SNode*)pChild);
return code;
}
static int32_t mergeProjectsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
SLogicNode* pProjectNode = optFindPossibleNode(pLogicSubplan->pNode, mergeProjectsMayBeOptimized);
if (NULL == pProjectNode) {
return TSDB_CODE_SUCCESS;
}
return mergeProjectsOptimizeImpl(pCxt, pLogicSubplan, pProjectNode);
}
// clang-format off
static const SOptimizeRule optimizeRuleSet[] = {
{.pName = "ScanPath", .optimizeFunc = scanPathOptimize},
......@@ -1367,6 +1444,7 @@ static const SOptimizeRule optimizeRuleSet[] = {
{.pName = "SortPrimaryKey", .optimizeFunc = sortPrimaryKeyOptimize},
{.pName = "SmaIndex", .optimizeFunc = smaIndexOptimize},
{.pName = "PartitionTags", .optimizeFunc = partTagsOptimize},
{.pName = "MergeProjects", .optimizeFunc = mergeProjectsOptimize},
{.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize},
{.pName = "EliminateSetOperator", .optimizeFunc = eliminateSetOpOptimize},
{.pName = "RewriteTail", .optimizeFunc = rewriteTailOptimize}
......
......@@ -57,7 +57,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
void snapshotSenderDestroy(SSyncSnapshotSender *pSender);
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender);
void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void *pReader);
void snapshotSenderStop(SSyncSnapshotSender *pSender);
void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish);
int32_t snapshotSend(SSyncSnapshotSender *pSender);
int32_t snapshotReSend(SSyncSnapshotSender *pSender);
......@@ -82,7 +82,7 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg);
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply);
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
......
......@@ -240,7 +240,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId));
ASSERT(pSender != NULL);
SSnapshot snapshot;
SSnapshot snapshot = {.data = NULL,
.lastApplyIndex = SYNC_INDEX_INVALID,
.lastApplyTerm = 0,
.lastConfigIndex = SYNC_INDEX_INVALID};
void* pReader = NULL;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot, NULL, &pReader);
if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN && nextIndex <= snapshot.lastApplyIndex + 1 &&
......@@ -249,10 +252,6 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
ASSERT(pReader != NULL);
snapshotSenderStart(pSender, snapshot, pReader);
char* eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender start");
syncNodeEventLog(ths, eventLog);
taosMemoryFree(eventLog);
} else {
// no snapshot
if (pReader != NULL) {
......@@ -260,23 +259,6 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
}
}
/*
bool hasSnapshot = syncNodeHasSnapshot(ths);
SSnapshot snapshot;
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
// start sending snapshot first time
// start here, stop by receiver
if (hasSnapshot && nextIndex <= snapshot.lastApplyIndex + 1 && !snapshotSenderIsStart(pSender) &&
pMsg->privateTerm < pSender->privateTerm) {
snapshotSenderStart(pSender);
char* eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender start");
syncNodeEventLog(ths, eventLog);
taosMemoryFree(eventLog);
}
*/
SyncIndex sentryIndex = pSender->snapshot.lastApplyIndex + 1;
// update nextIndex to sentryIndex
......@@ -300,5 +282,5 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
syncIndexMgrLog2("recv sync-append-entries-reply, after pNextIndex:", ths->pNextIndex);
syncIndexMgrLog2("recv sync-append-entries-reply, after pMatchIndex:", ths->pMatchIndex);
return ret;
return 0;
}
\ No newline at end of file
......@@ -24,6 +24,7 @@
//----------------------------------
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm,
SyncSnapshotSend *pBeginMsg);
static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg);
//----------------------------------
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
......@@ -50,7 +51,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot));
pSender->finish = false;
} else {
sError("vgId:%d cannot create snapshot sender", pSyncNode->vgId);
sError("vgId:%d, cannot create snapshot sender", pSyncNode->vgId);
}
return pSender;
......@@ -127,7 +128,7 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void
syncEntryDestory(pEntry);
} else {
if (pSender->snapshot.lastConfigIndex == pSender->pSyncNode->pRaftCfg->lastConfigIndex) {
sTrace("vgId:%d sync sender get cfg from local", pSender->pSyncNode->vgId);
sTrace("vgId:%d, sync sender get cfg from local", pSender->pSyncNode->vgId);
pSender->lastConfig = pSender->pSyncNode->pRaftCfg->cfg;
getLastConfig = true;
}
......@@ -176,13 +177,13 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void
// event log
do {
char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender send");
char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender start");
syncNodeEventLog(pSender->pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
}
void snapshotSenderStop(SSyncSnapshotSender *pSender) {
void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
// close reader
if (pSender->pReader != NULL) {
int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
......@@ -199,6 +200,14 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender) {
// update flag
pSender->start = false;
pSender->finish = finish;
// event log
do {
char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender stop");
syncNodeEventLog(pSender->pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
}
// when sender receive ack, call this function to send msg from seq
......@@ -386,7 +395,7 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from
pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
} else {
sError("vgId:%d cannot create snapshot receiver", pSyncNode->vgId);
sError("vgId:%d, cannot create snapshot receiver", pSyncNode->vgId);
}
return pReceiver;
......@@ -409,9 +418,9 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }
// static do start
// static do start by privateTerm, pBeginMsg
// receive first snapshot data
// privateTerm, pBeginMsg
// write first block data
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm,
SyncSnapshotSend *pBeginMsg) {
// update state
......@@ -419,6 +428,7 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm p
pReceiver->privateTerm = privateTerm;
pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
pReceiver->fromId = pBeginMsg->srcId;
pReceiver->start = true;
// update snapshot
pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
......@@ -429,8 +439,16 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm p
ASSERT(pReceiver->pWriter == NULL);
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &(pReceiver->pWriter));
ASSERT(ret == 0);
// event log
do {
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver start");
syncNodeEventLog(pReceiver->pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
}
// force stop
static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
// force close, abandon incomplete data
if (pReceiver->pWriter != NULL) {
......@@ -441,33 +459,35 @@ static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
}
pReceiver->start = false;
// event log
do {
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver force stop");
syncNodeEventLog(pReceiver->pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
}
// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver
// if already start, force close, start again
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg) {
if (!snapshotReceiverIsStart(pReceiver)) {
// start
// first start
snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg);
pReceiver->start = true;
} else {
// already start
sInfo("snapshot recv, receiver already start");
sInfo("vgId:%d, snapshot recv, receiver already start", pReceiver->pSyncNode->vgId);
// force close, abandon incomplete data
int32_t ret =
pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
ASSERT(ret == 0);
pReceiver->pWriter = NULL;
snapshotReceiverForceStop(pReceiver);
// start again
snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg);
pReceiver->start = true;
}
}
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) {
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
if (pReceiver->pWriter != NULL) {
int32_t ret =
pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
......@@ -477,8 +497,69 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) {
pReceiver->start = false;
if (apply) {
// ++(pReceiver->privateTerm);
// event log
do {
SSnapshot snapshot;
pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot);
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver stop");
syncNodeEventLog(pReceiver->pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
}
static void snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
ASSERT(pMsg->seq == SYNC_SNAPSHOT_SEQ_END);
if (pReceiver->pWriter != NULL) {
int32_t code = 0;
if (pMsg->dataLen > 0) {
code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data,
pMsg->dataLen);
ASSERT(code == 0);
}
code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true);
ASSERT(code == 0);
pReceiver->pWriter = NULL;
}
pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;
// update commit index
if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) {
pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
}
// reset wal
pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex);
// event log
do {
SSnapshot snapshot;
pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot);
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver got last data, finish, apply snapshot");
syncNodeEventLog(pReceiver->pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
}
static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
ASSERT(pMsg->seq == pReceiver->ack + 1);
if (pReceiver->pWriter != NULL) {
if (pMsg->dataLen > 0) {
int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
pMsg->data, pMsg->dataLen);
ASSERT(code == 0);
}
pReceiver->ack = pMsg->seq;
// event log
do {
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver receiving");
syncNodeEventLog(pReceiver->pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
}
}
......@@ -560,33 +641,20 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
// get receiver
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
bool needRsp = false;
int32_t writeCode = 0;
// state, term, seq/ack
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
// begin
// begin, no data
snapshotReceiverStart(pReceiver, pMsg->privateTerm, pMsg);
pReceiver->ack = pMsg->seq;
needRsp = true;
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver begin");
syncNodeEventLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
// end, finish FSM
writeCode = pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
ASSERT(writeCode == 0);
pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true);
if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) {
pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
}
// pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1);
pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pSyncNode->pLogStore, pMsg->lastIndex);
snapshotReceiverFinish(pReceiver, pMsg);
snapshotReceiverStop(pReceiver);
needRsp = true;
// maybe update lastconfig
if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) {
......@@ -601,89 +669,74 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
syncNodeDoConfigChange(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex);
}
SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
do {
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver finish, apply snapshot");
syncNodeEventLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
pReceiver->pWriter = NULL;
snapshotReceiverStop(pReceiver, true);
pReceiver->ack = pMsg->seq;
needRsp = true;
do {
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver stop");
syncNodeEventLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, false);
snapshotReceiverStop(pReceiver, false);
// force close
snapshotReceiverForceStop(pReceiver);
needRsp = false;
do {
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver force close");
syncNodeEventLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
// transfering
if (pMsg->seq == pReceiver->ack + 1) {
writeCode =
pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
ASSERT(writeCode == 0);
pReceiver->ack = pMsg->seq;
snapshotReceiverGotData(pReceiver, pMsg);
}
needRsp = true;
do {
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver receiving");
syncNodeEventLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
} else {
ASSERT(0);
}
// send ack
if (needRsp) {
// build msg
SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId);
pRspMsg->srcId = pSyncNode->myRaftId;
pRspMsg->destId = pMsg->srcId;
pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
pRspMsg->lastIndex = pMsg->lastIndex;
pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->ack = pReceiver->ack;
pRspMsg->code = writeCode;
pRspMsg->privateTerm = pReceiver->privateTerm;
pRspMsg->ack = pReceiver->ack; // receiver maybe already closed
pRspMsg->code = 0;
pRspMsg->privateTerm = pReceiver->privateTerm; // receiver maybe already closed
// send msg
SRpcMsg rpcMsg;
syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg);
syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg);
syncSnapshotRspDestroy(pRspMsg);
}
} else {
// error log
do {
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver term not equal");
syncNodeErrorLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
}
} else {
syncNodeLog2("syncNodeOnSnapshotSendCb not follower", pSyncNode);
// error log
do {
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver not follower");
syncNodeErrorLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
}
return 0;
}
static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
ASSERT(pMsg->ack == pSender->seq);
pSender->ack = pMsg->ack;
++(pSender->seq);
}
// sender receives ack, set seq = ack + 1, send msg from seq
// if ack == SYNC_SNAPSHOT_SEQ_END, stop sender
int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
// if already drop replica, do not process
if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
sInfo("recv SyncSnapshotRsp maybe replica already dropped");
return 0;
sError("vgId:%d, recv sync-snapshot-rsp, maybe replica already dropped", pSyncNode->vgId);
return -1;
}
// get sender
......@@ -695,27 +748,49 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
// receiver ack is finish, close sender
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
pSender->finish = true;
snapshotSenderStop(pSender);
snapshotSenderStop(pSender, true);
return 0;
}
// send next msg
if (pMsg->ack == pSender->seq) {
// update sender ack
pSender->ack = pMsg->ack;
(pSender->seq)++;
snapshotSenderUpdateProgress(pSender, pMsg);
snapshotSend(pSender);
} else if (pMsg->ack == pSender->seq - 1) {
snapshotReSend(pSender);
} else {
ASSERT(0);
do {
char logBuf[64];
snprintf(logBuf, sizeof(logBuf), "error ack, recv ack:%d, my seq:%d", pMsg->ack, pSender->seq);
char *eventLog = snapshotSender2SimpleStr(pSender, logBuf);
syncNodeErrorLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
return -1;
}
} else {
// error log
do {
char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender term not equal");
syncNodeErrorLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
return -1;
}
} else {
syncNodeLog2("syncNodeOnSnapshotRspCb not leader", pSyncNode);
// error log
do {
char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender not leader");
syncNodeErrorLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
return -1;
}
return 0;
......
......@@ -229,7 +229,7 @@ typedef struct {
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb);
void transDestroyAsyncPool(SAsyncPool* pool);
int transSendAsync(SAsyncPool* pool, queue* mq);
int transAsyncSend(SAsyncPool* pool, queue* mq);
#define TRANS_DESTROY_ASYNC_POOL_MSG(pool, msgType, freeFunc) \
do { \
......
......@@ -52,7 +52,7 @@ typedef struct {
char user[TSDB_UNI_LEN]; // meter ID
void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
bool (*retry)(int32_t code);
bool (*retry)(int32_t code, tmsg_t msgType);
int index;
void* parent;
......
......@@ -164,11 +164,6 @@ void rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) {
transSetDefaultAddr(thandle, ip, fqdn);
}
// void rpcSetMsgTraceId(SRpcMsg* pMsg, STraceId uid) {
// SRpcHandleInfo* pInfo = &pMsg->info;
// pInfo->traceId = uid;
//}
int32_t rpcInit() {
// impl later
return 0;
......
......@@ -967,7 +967,7 @@ void cliSendQuit(SCliThrd* thrd) {
// cli can stop gracefully
SCliMsg* msg = taosMemoryCalloc(1, sizeof(SCliMsg));
msg->type = Quit;
transSendAsync(thrd->asyncPool, &msg->q);
transAsyncSend(thrd->asyncPool, &msg->q);
}
void cliWalkCb(uv_handle_t* handle, void* arg) {
if (!uv_is_closing(handle)) {
......@@ -1030,7 +1030,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
*/
STransConnCtx* pCtx = pMsg->ctx;
int32_t code = pResp->code;
if (pTransInst->retry != NULL && pTransInst->retry(code)) {
if (pTransInst->retry != NULL && pTransInst->retry(code, pResp->msgType - 1)) {
pMsg->sent = 0;
pCtx->retryCnt += 1;
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
......@@ -1138,7 +1138,7 @@ void transReleaseCliHandle(void* handle) {
cmsg->msg = tmsg;
cmsg->type = Release;
transSendAsync(pThrd->asyncPool, &cmsg->q);
transAsyncSend(pThrd->asyncPool, &cmsg->q);
return;
}
......@@ -1171,7 +1171,7 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra
STraceId* trace = &pReq->info.traceId;
tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
ASSERT(transSendAsync(pThrd->asyncPool, &(cliMsg->q)) == 0);
ASSERT(transAsyncSend(pThrd->asyncPool, &(cliMsg->q)) == 0);
return;
}
......@@ -1205,7 +1205,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
transSendAsync(pThrd->asyncPool, &(cliMsg->q));
transAsyncSend(pThrd->asyncPool, &(cliMsg->q));
tsem_wait(sem);
tsem_destroy(sem);
taosMemoryFree(sem);
......@@ -1234,7 +1234,7 @@ void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
SCliThrd* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i];
tDebug("%s update epset at thread:%08" PRId64 "", pTransInst->label, thrd->pid);
transSendAsync(thrd->asyncPool, &(cliMsg->q));
transAsyncSend(thrd->asyncPool, &(cliMsg->q));
}
}
#endif
......@@ -202,7 +202,7 @@ void transDestroyAsyncPool(SAsyncPool* pool) {
taosMemoryFree(pool->asyncs);
taosMemoryFree(pool);
}
int transSendAsync(SAsyncPool* pool, queue* q) {
int transAsyncSend(SAsyncPool* pool, queue* q) {
int idx = pool->index;
idx = idx % pool->nAsync;
// no need mutex here
......
......@@ -999,7 +999,7 @@ void sendQuitToWorkThrd(SWorkThrd* pThrd) {
SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg));
msg->type = Quit;
tDebug("server send quit msg to work thread");
transSendAsync(pThrd->asyncPool, &msg->q);
transAsyncSend(pThrd->asyncPool, &msg->q);
}
void transCloseServer(void* arg) {
......@@ -1070,7 +1070,7 @@ void transReleaseSrvHandle(void* handle) {
m->type = Release;
tTrace("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle);
transSendAsync(pThrd->asyncPool, &m->q);
transAsyncSend(pThrd->asyncPool, &m->q);
transReleaseExHandle(refMgt, refId);
return;
_return1:
......@@ -1099,7 +1099,7 @@ void transSendResponse(const STransMsg* msg) {
STraceId* trace = (STraceId*)&msg->info.traceId;
tGTrace("conn %p start to send resp (1/2)", exh->handle);
transSendAsync(pThrd->asyncPool, &m->q);
transAsyncSend(pThrd->asyncPool, &m->q);
transReleaseExHandle(refMgt, refId);
return;
_return1:
......@@ -1128,7 +1128,7 @@ void transRegisterMsg(const STransMsg* msg) {
m->type = Register;
tTrace("%s conn %p start to register brokenlink callback", transLabel(pThrd->pTransInst), exh->handle);
transSendAsync(pThrd->asyncPool, &m->q);
transAsyncSend(pThrd->asyncPool, &m->q);
transReleaseExHandle(refMgt, refId);
return;
......
......@@ -637,5 +637,13 @@ class TDCom:
column_value_str = ", ".join(str(v) for v in column_value_list)
insert_sql = f'insert into {dbname}.{tbname} values ({column_value_str});'
tsql.execute(insert_sql)
def getOneRow(self, location, containElm):
res_list = list()
if 0 <= location < tdSql.queryRows:
for row in tdSql.queryResult:
if row[location] == containElm:
res_list.append(row)
return res_list
else:
tdLog.exit(f"getOneRow out of range: row_index={location} row_count={self.query_row}")
tdCom = TDCom()
......@@ -85,6 +85,7 @@
./test.sh -f tsim/stream/basic1.sim
./test.sh -f tsim/stream/basic2.sim
./test.sh -f tsim/stream/distributeInterval0.sim
# ./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
# ./test.sh -f tsim/stream/distributesession0.sim
# ./test.sh -f tsim/stream/session0.sim
./test.sh -f tsim/stream/session1.sim
......
......@@ -19,7 +19,7 @@ for /F "usebackq tokens=*" %%i in (!caseFile!) do (
call :GetTimeSeconds !time!
set time1=!_timeTemp!
echo Start at !time!
call !line:./test.sh=wtest.bat! > result_!a!.txt 2>error_!a!.txt
call !line:./test.sh=wtest.bat! > result_!a!.txt 2>error_!a!.txt || set /a errorlevel=8
if errorlevel 1 ( call :colorEcho 0c "failed" &echo. && set /a exitNum=8 && echo %%i >>failed.txt ) else ( call :colorEcho 0a "Success" &echo. )
)
)
......
......@@ -56,8 +56,14 @@ echo charset UTF-8 >> %TAOS_CFG%
set "FILE_NAME=testSuite.sim"
if "%1" == "-f" set "FILE_NAME=%2"
set FILE_NAME=%FILE_NAME:/=\%
start cmd /k "timeout /t 600 /NOBREAK && taskkill /f /im tsim.exe & exit /b"
rem echo FILE_NAME: %FILE_NAME%
echo ExcuteCmd: %tsim% -c %CFG_DIR% -f %FILE_NAME%
set result=false
%TSIM% -c %CFG_DIR% -f %FILE_NAME% && set result=true
%TSIM% -c %CFG_DIR% -f %FILE_NAME%
\ No newline at end of file
tasklist | grep timeout && taskkill /f /im timeout.exe
if "%result%" == "true" ( exit /b ) else ( exit /b 8 )
\ No newline at end of file
......@@ -23,7 +23,6 @@ class TDTestCase:
self.time_step = 1000
def insert_datas_and_check_abs(self ,tbnums , rownums , time_step ):
tdLog.info(" prepare datas for auto check abs function ")
tdSql.execute(" create database test ")
......@@ -36,7 +35,7 @@ class TDTestCase:
ts = self.ts
for row in range(rownums):
ts += time_step*row
ts = self.ts + time_step*row
c1 = random.randint(0,10000)
c2 = random.randint(0,100000)
c3 = random.randint(0,125)
......@@ -538,25 +537,41 @@ class TDTestCase:
# tdSql.query(" select sum(c1) from stb1 where t1+10 >1; ") # taosd crash
tdSql.query("select c1 ,t1 from stb1 where t1 =0 ")
tdSql.checkRows(13)
# tdSql.query("select t1 from stb1 where t1 >0 ")
# tdSql.checkRows(3)
tdSql.query("select t1 from stb1 where t1 >0 ")
tdSql.checkRows(3)
tdSql.query("select t1 from stb1 where t1 =3 ")
tdSql.checkRows(1)
# tdSql.query("select sum(t1) from (select c1 ,t1 from stb1)")
# tdSql.checkData(0,0,61)
# tdSql.query("select distinct(c1) ,t1 from stb1")
# tdSql.checkRows(20)
# tdSql.query("select max(t2) , t1 ,c1, t2 from stb1")
# tdSql.checkData(0,3,33333)
tdSql.query("select max(t2) , t1 ,c1, t2 from stb1")
tdSql.checkData(0,3,33333)
# tag filter with abs function
# tdSql.query("select t1 from stb1 where abs(t1)=1")
# tdSql.checkRows(1)
tdSql.query("select t1 from stb1 where abs(t1)=1")
tdSql.checkRows(1)
tdSql.query("select t1 from stb1 where abs(c1+t1)=1")
tdSql.checkRows(1)
# tdSql.query("select t1 from stb1 where abs(t1+c1)=1")
# tdSql.checkRows(1)
tdSql.checkData(0,0,0)
tdSql.query(
"select abs(c1+t1)*t1 from stb1 where abs(c1)/floor(abs(ceil(t1))) ==1")
def support_super_table_test(self):
tdSql.execute(" use testdb ")
self.check_result_auto( " select c1 from stb1 order by ts " , "select abs(c1) from stb1 order by ts" )
self.check_result_auto( " select c1 from stb1 order by tbname " , "select abs(c1) from stb1 order by tbname" )
self.check_result_auto( " select c1 from stb1 where c1 > 0 order by tbname " , "select abs(c1) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto( " select c1 from stb1 where c1 > 0 order by tbname " , "select abs(c1) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto( " select t1,c1 from stb1 order by ts " , "select t1, abs(c1) from stb1 order by ts" )
self.check_result_auto( " select t2,c1 from stb1 order by tbname " , "select t2 ,abs(c1) from stb1 order by tbname" )
self.check_result_auto( " select t3,c1 from stb1 where c1 > 0 order by tbname " , "select t3 ,abs(c1) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto( " select t4,c1 from stb1 where c1 > 0 order by tbname " , "select t4 , abs(c1) from stb1 where c1 > 0 order by tbname" )
pass
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare()
......@@ -593,6 +608,10 @@ class TDTestCase:
self.insert_datas_and_check_abs(self.tb_nums,self.row_nums,self.time_step)
tdLog.printNoPrefix("==========step8: check abs result of stable query ============")
self.support_super_table_test()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
......
......@@ -479,6 +479,20 @@ class TDTestCase:
tdSql.execute('insert into tb3 values (now()+{}s, {}, {})'.format(i,PI*(5+i)/2 ,PI*(5+i)/2))
self.check_result_auto_acos("select num1,num2 from tb3;" , "select acos(num1),acos(num2) from tb3")
def support_super_table_test(self):
tdSql.execute(" use db ")
self.check_result_auto_acos( " select c5 from stb1 order by ts " , "select acos(c5) from stb1 order by ts" )
self.check_result_auto_acos( " select c5 from stb1 order by tbname " , "select acos(c5) from stb1 order by tbname" )
self.check_result_auto_acos( " select c5 from stb1 where c1 > 0 order by tbname " , "select acos(c5) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto_acos( " select c5 from stb1 where c1 > 0 order by tbname " , "select acos(c5) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto_acos( " select t1,c5 from stb1 order by ts " , "select acos(t1), acos(c5) from stb1 order by ts" )
self.check_result_auto_acos( " select t1,c5 from stb1 order by tbname " , "select acos(t1) ,acos(c5) from stb1 order by tbname" )
self.check_result_auto_acos( " select t1,c5 from stb1 where c1 > 0 order by tbname " , "select acos(t1) ,acos(c5) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto_acos( " select t1,c5 from stb1 where c1 > 0 order by tbname " , "select acos(t1) , acos(c5) from stb1 where c1 > 0 order by tbname" )
pass
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare()
......@@ -512,6 +526,14 @@ class TDTestCase:
self.abs_func_filter()
tdLog.printNoPrefix("==========step7: acos filter query ============")
self.abs_func_filter()
tdLog.printNoPrefix("==========step8: check acos result of stable query ============")
self.support_super_table_test()
def stop(self):
tdSql.close()
......
......@@ -479,6 +479,20 @@ class TDTestCase:
tdSql.execute('insert into tb3 values (now()+{}s, {}, {})'.format(i,PI*(5+i)/2 ,PI*(5+i)/2))
self.check_result_auto_asin("select num1,num2 from tb3;" , "select asin(num1),asin(num2) from tb3")
def support_super_table_test(self):
tdSql.execute(" use db ")
self.check_result_auto_asin( " select c5 from stb1 order by ts " , "select asin(c5) from stb1 order by ts" )
self.check_result_auto_asin( " select c5 from stb1 order by tbname " , "select asin(c5) from stb1 order by tbname" )
self.check_result_auto_asin( " select c5 from stb1 where c1 > 0 order by tbname " , "select asin(c5) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto_asin( " select c5 from stb1 where c1 > 0 order by tbname " , "select asin(c5) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto_asin( " select t1,c5 from stb1 order by ts " , "select asin(t1), asin(c5) from stb1 order by ts" )
self.check_result_auto_asin( " select t1,c5 from stb1 order by tbname " , "select asin(t1) ,asin(c5) from stb1 order by tbname" )
self.check_result_auto_asin( " select t1,c5 from stb1 where c1 > 0 order by tbname " , "select asin(t1) ,asin(c5) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto_asin( " select t1,c5 from stb1 where c1 > 0 order by tbname " , "select asin(t1) , asin(c5) from stb1 where c1 > 0 order by tbname" )
pass
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare()
......@@ -512,6 +526,10 @@ class TDTestCase:
self.abs_func_filter()
tdLog.printNoPrefix("==========step8: check asin result of stable query ============")
self.support_super_table_test()
def stop(self):
tdSql.close()
......
......@@ -476,6 +476,20 @@ class TDTestCase:
tdSql.execute('insert into tb3 values (now()+{}s, {}, {})'.format(i,PI*(5+i)/2 ,PI*(5+i)/2))
self.check_result_auto_atan("select num1,num2 from tb3;" , "select atan(num1),atan(num2) from tb3")
def support_super_table_test(self):
tdSql.execute(" use db ")
self.check_result_auto_atan( " select c5 from stb1 order by ts " , "select atan(c5) from stb1 order by ts" )
self.check_result_auto_atan( " select c5 from stb1 order by tbname " , "select atan(c5) from stb1 order by tbname" )
self.check_result_auto_atan( " select c5 from stb1 where c1 > 0 order by tbname " , "select atan(c5) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto_atan( " select c5 from stb1 where c1 > 0 order by tbname " , "select atan(c5) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto_atan( " select t1,c5 from stb1 order by ts " , "select atan(t1), atan(c5) from stb1 order by ts" )
self.check_result_auto_atan( " select t1,c5 from stb1 order by tbname " , "select atan(t1) ,atan(c5) from stb1 order by tbname" )
self.check_result_auto_atan( " select t1,c5 from stb1 where c1 > 0 order by tbname " , "select atan(t1) ,atan(c5) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto_atan( " select t1,c5 from stb1 where c1 > 0 order by tbname " , "select atan(t1) , atan(c5) from stb1 where c1 > 0 order by tbname" )
pass
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare()
......@@ -509,6 +523,10 @@ class TDTestCase:
self.abs_func_filter()
tdLog.printNoPrefix("==========step8: check arctan result of stable query ============")
self.support_super_table_test()
def stop(self):
......
......@@ -427,6 +427,18 @@ class TDTestCase:
self.check_result_auto("select c1+1 ,c2 , c3*1 , c4/2, c5/2, c6 from sub1_bound" ,"select ceil(c1+1) ,ceil(c2) , ceil(c3*1) , ceil(c4/2), ceil(c5)/2, ceil(c6) from sub1_bound ")
def support_super_table_test(self):
tdSql.execute(" use db ")
self.check_result_auto( " select c5 from stb1 order by ts " , "select ceil(c5) from stb1 order by ts" )
self.check_result_auto( " select c5 from stb1 order by tbname " , "select ceil(c5) from stb1 order by tbname" )
self.check_result_auto( " select c5 from stb1 where c1 > 0 order by tbname " , "select ceil(c5) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto( " select c5 from stb1 where c1 > 0 order by tbname " , "select ceil(c5) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto( " select t1,c5 from stb1 order by ts " , "select ceil(t1), ceil(c5) from stb1 order by ts" )
self.check_result_auto( " select t1,c5 from stb1 order by tbname " , "select ceil(t1) ,ceil(c5) from stb1 order by tbname" )
self.check_result_auto( " select t1,c5 from stb1 where c1 > 0 order by tbname " , "select ceil(t1) ,ceil(c5) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto( " select t1,c5 from stb1 where c1 > 0 order by tbname " , "select ceil(t1) , ceil(c5) from stb1 where c1 > 0 order by tbname" )
pass
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare()
......@@ -455,6 +467,10 @@ class TDTestCase:
self.abs_func_filter()
tdLog.printNoPrefix("==========step7: check ceil result of stable query ============")
self.support_super_table_test()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
......
......@@ -476,6 +476,19 @@ class TDTestCase:
tdSql.execute('insert into tb3 values (now()+{}s, {}, {})'.format(i,PI*(5+i)/2 ,PI*(5+i)/2))
self.check_result_auto_cos("select num1,num2 from tb3;" , "select cos(num1),cos(num2) from tb3")
def support_super_table_test(self):
tdSql.execute(" use db ")
self.check_result_auto_cos( " select c5 from stb1 order by ts " , "select cos(c5) from stb1 order by ts" )
self.check_result_auto_cos( " select c5 from stb1 order by tbname " , "select cos(c5) from stb1 order by tbname" )
self.check_result_auto_cos( " select c5 from stb1 where c1 > 0 order by tbname " , "select cos(c5) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto_cos( " select c5 from stb1 where c1 > 0 order by tbname " , "select cos(c5) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto_cos( " select t1,c5 from stb1 order by ts " , "select cos(t1), cos(c5) from stb1 order by ts" )
self.check_result_auto_cos( " select t1,c5 from stb1 order by tbname " , "select cos(t1) ,cos(c5) from stb1 order by tbname" )
self.check_result_auto_cos( " select t1,c5 from stb1 where c1 > 0 order by tbname " , "select cos(t1) ,cos(c5) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto_cos( " select t1,c5 from stb1 where c1 > 0 order by tbname " , "select cos(t1) , cos(c5) from stb1 where c1 > 0 order by tbname" )
pass
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare()
......@@ -509,6 +522,10 @@ class TDTestCase:
self.abs_func_filter()
tdLog.printNoPrefix("==========step8: check cos result of stable query ============")
self.support_super_table_test()
def stop(self):
tdSql.close()
......
......@@ -427,6 +427,19 @@ class TDTestCase:
self.check_result_auto("select c1+1 ,c2 , c3*1 , c4/2, c5/2, c6 from sub1_bound" ,"select floor(c1+1) ,floor(c2) , floor(c3*1) , floor(c4/2), floor(c5)/2, floor(c6) from sub1_bound ")
def support_super_table_test(self):
tdSql.execute(" use db ")
self.check_result_auto( " select c5 from stb1 order by ts " , "select floor(c5) from stb1 order by ts" )
self.check_result_auto( " select c5 from stb1 order by tbname " , "select floor(c5) from stb1 order by tbname" )
self.check_result_auto( " select c5 from stb1 where c1 > 0 order by tbname " , "select floor(c5) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto( " select c5 from stb1 where c1 > 0 order by tbname " , "select floor(c5) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto( " select t1,c5 from stb1 order by ts " , "select floor(t1), floor(c5) from stb1 order by ts" )
self.check_result_auto( " select t1,c5 from stb1 order by tbname " , "select floor(t1) ,floor(c5) from stb1 order by tbname" )
self.check_result_auto( " select t1,c5 from stb1 where c1 > 0 order by tbname " , "select floor(t1) ,floor(c5) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto( " select t1,c5 from stb1 where c1 > 0 order by tbname " , "select floor(t1) , floor(c5) from stb1 where c1 > 0 order by tbname" )
pass
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare()
......@@ -454,6 +467,10 @@ class TDTestCase:
self.abs_func_filter()
tdLog.printNoPrefix("==========step7: check floor result of stable query ============")
self.support_super_table_test()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
......
......@@ -670,7 +670,19 @@ class TDTestCase:
tdSql.checkData(0,2,math.log(32767.000000000,2))
tdSql.checkData(0,3,math.log(63.500000000,2))
tdSql.checkData(0,4,63.999401166)
def support_super_table_test(self):
tdSql.execute(" use db ")
self.check_result_auto_log2( " select c5 from stb1 order by ts " , "select log(c5,2) from stb1 order by ts" )
self.check_result_auto_log2( " select c5 from stb1 order by tbname " , "select log(c5,2) from stb1 order by tbname" )
self.check_result_auto_log2( " select c5 from stb1 where c1 > 0 order by tbname " , "select log(c5,2) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto_log2( " select c5 from stb1 where c1 > 0 order by tbname " , "select log(c5,2) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto_log2( " select t1,c5 from stb1 order by ts " , "select log(t1,2), log(c5,2) from stb1 order by ts" )
self.check_result_auto_log2( " select t1,c5 from stb1 order by tbname " , "select log(t1,2) ,log(c5,2) from stb1 order by tbname" )
self.check_result_auto_log2( " select t1,c5 from stb1 where c1 > 0 order by tbname " , "select log(t1,2) ,log(c5,2) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto_log2( " select t1,c5 from stb1 where c1 > 0 order by tbname " , "select log(t1,2) , log(c5,2) from stb1 where c1 > 0 order by tbname" )
pass
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare()
......@@ -707,7 +719,9 @@ class TDTestCase:
self.abs_func_filter()
tdLog.printNoPrefix("==========step9: check log result of stable query ============")
self.support_super_table_test()
def stop(self):
tdSql.close()
......
......@@ -606,7 +606,23 @@ class TDTestCase:
tdSql.checkData(0,3,math.pow(63.500000000,2))
tdSql.checkData(0,5,None)
def support_super_table_test(self):
tdSql.execute(" use db ")
self.check_result_auto_pow2( " select c5 from stb1 order by ts " , "select pow(c5,2) from stb1 order by ts" )
self.check_result_auto_pow2( " select c5 from stb1 order by tbname " , "select pow(c5,2) from stb1 order by tbname" )
self.check_result_auto_pow2( " select c5 from stb1 where c1 > 0 order by tbname " , "select pow(c5,2) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto_pow2( " select c5 from stb1 where c1 > 0 order by tbname " , "select pow(c5,2) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto_pow2( " select t1,c5 from stb1 order by ts " , "select pow(t1,2), pow(c5,2) from stb1 order by ts" )
self.check_result_auto_pow2( " select t1,c5 from stb1 order by tbname " , "select pow(t1,2) ,pow(c5,2) from stb1 order by tbname" )
self.check_result_auto_pow2( " select t1,c5 from stb1 where c1 > 0 order by tbname " , "select pow(t1,2) ,pow(c5,2) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto_pow2( " select t1,c5 from stb1 where c1 > 0 order by tbname " , "select pow(t1,2) , pow(c5,2) from stb1 where c1 > 0 order by tbname" )
pass
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare()
......@@ -642,7 +658,9 @@ class TDTestCase:
self.abs_func_filter()
tdLog.printNoPrefix("==========step9: check pow result of stable query ============")
self.support_super_table_test()
def stop(self):
tdSql.close()
......
......@@ -432,6 +432,20 @@ class TDTestCase:
self.check_result_auto("select c1+1 ,c2 , c3*1 , c4/2, c5/2, c6 from sub1_bound" ,"select round(c1+1) ,round(c2) , round(c3*1) , round(c4/2), round(c5)/2, round(c6) from sub1_bound ")
def support_super_table_test(self):
tdSql.execute(" use db ")
self.check_result_auto( " select c5 from stb1 order by ts " , "select round(c5) from stb1 order by ts" )
self.check_result_auto( " select c5 from stb1 order by tbname " , "select round(c5) from stb1 order by tbname" )
self.check_result_auto( " select c5 from stb1 where c1 > 0 order by tbname " , "select round(c5) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto( " select c5 from stb1 where c1 > 0 order by tbname " , "select round(c5) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto( " select t1,c5 from stb1 order by ts " , "select round(t1), round(c5) from stb1 order by ts" )
self.check_result_auto( " select t1,c5 from stb1 order by tbname " , "select round(t1) ,round(c5) from stb1 order by tbname" )
self.check_result_auto( " select t1,c5 from stb1 where c1 > 0 order by tbname " , "select round(t1) ,round(c5) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto( " select t1,c5 from stb1 where c1 > 0 order by tbname " , "select round(t1) , round(c5) from stb1 where c1 > 0 order by tbname" )
pass
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare()
......@@ -459,6 +473,10 @@ class TDTestCase:
self.abs_func_filter()
tdLog.printNoPrefix("==========step7: check round result of stable query ============")
self.support_super_table_test()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
......
......@@ -476,6 +476,19 @@ class TDTestCase:
tdSql.execute('insert into tb3 values (now()+{}s, {}, {})'.format(i,PI*(5+i)/2 ,PI*(5+i)/2))
self.check_result_auto_sin("select num1,num2 from tb3;" , "select sin(num1),sin(num2) from tb3")
def support_super_table_test(self):
tdSql.execute(" use db ")
self.check_result_auto_sin( " select c5 from stb1 order by ts " , "select sin(c5) from stb1 order by ts" )
self.check_result_auto_sin( " select c5 from stb1 order by tbname " , "select sin(c5) from stb1 order by tbname" )
self.check_result_auto_sin( " select c5 from stb1 where c1 > 0 order by tbname " , "select sin(c5) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto_sin( " select c5 from stb1 where c1 > 0 order by tbname " , "select sin(c5) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto_sin( " select t1,c5 from stb1 order by ts " , "select sin(t1), sin(c5) from stb1 order by ts" )
self.check_result_auto_sin( " select t1,c5 from stb1 order by tbname " , "select sin(t1) ,sin(c5) from stb1 order by tbname" )
self.check_result_auto_sin( " select t1,c5 from stb1 where c1 > 0 order by tbname " , "select sin(t1) ,sin(c5) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto_sin( " select t1,c5 from stb1 where c1 > 0 order by tbname " , "select sin(t1) , sin(c5) from stb1 where c1 > 0 order by tbname" )
pass
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare()
......@@ -509,6 +522,10 @@ class TDTestCase:
self.abs_func_filter()
tdLog.printNoPrefix("==========step8: check sin result of stable query ============")
self.support_super_table_test()
def stop(self):
......
......@@ -505,7 +505,19 @@ class TDTestCase:
tdSql.checkData(0,2,math.sqrt(32767.000000000))
tdSql.checkData(0,3,math.sqrt(63.500000000))
def support_super_table_test(self):
tdSql.execute(" use db ")
self.check_result_auto_sqrt( " select c5 from stb1 order by ts " , "select sqrt(c5) from stb1 order by ts" )
self.check_result_auto_sqrt( " select c5 from stb1 order by tbname " , "select sqrt(c5) from stb1 order by tbname" )
self.check_result_auto_sqrt( " select c5 from stb1 where c1 > 0 order by tbname " , "select sqrt(c5) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto_sqrt( " select c5 from stb1 where c1 > 0 order by tbname " , "select sqrt(c5) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto_sqrt( " select t1,c5 from stb1 order by ts " , "select sqrt(t1), sqrt(c5) from stb1 order by ts" )
self.check_result_auto_sqrt( " select t1,c5 from stb1 order by tbname " , "select sqrt(t1) ,sqrt(c5) from stb1 order by tbname" )
self.check_result_auto_sqrt( " select t1,c5 from stb1 where c1 > 0 order by tbname " , "select sqrt(t1) ,sqrt(c5) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto_sqrt( " select t1,c5 from stb1 where c1 > 0 order by tbname " , "select sqrt(t1) , sqrt(c5) from stb1 where c1 > 0 order by tbname" )
pass
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare()
......@@ -541,6 +553,10 @@ class TDTestCase:
self.abs_func_filter()
tdLog.printNoPrefix("==========step9: check sqrt result of stable query ============")
self.support_super_table_test()
def stop(self):
......
......@@ -476,6 +476,20 @@ class TDTestCase:
tdSql.execute('insert into tb3 values (now()+{}s, {}, {})'.format(i,PI*(5+i)/2 ,PI*(5+i)/2))
self.check_result_auto_tan("select num1,num2 from tb3;" , "select tan(num1),tan(num2) from tb3")
def support_super_table_test(self):
tdSql.execute(" use db ")
self.check_result_auto_tan( " select c5 from stb1 order by ts " , "select tan(c5) from stb1 order by ts" )
self.check_result_auto_tan( " select c5 from stb1 order by tbname " , "select tan(c5) from stb1 order by tbname" )
self.check_result_auto_tan( " select c5 from stb1 where c1 > 0 order by tbname " , "select tan(c5) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto_tan( " select c5 from stb1 where c1 > 0 order by tbname " , "select tan(c5) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto_tan( " select t1,c5 from stb1 order by ts " , "select tan(t1), tan(c5) from stb1 order by ts" )
self.check_result_auto_tan( " select t1,c5 from stb1 order by tbname " , "select tan(t1) ,tan(c5) from stb1 order by tbname" )
self.check_result_auto_tan( " select t1,c5 from stb1 where c1 > 0 order by tbname " , "select tan(t1) ,tan(c5) from stb1 where c1 > 0 order by tbname" )
self.check_result_auto_tan( " select t1,c5 from stb1 where c1 > 0 order by tbname " , "select tan(t1) , tan(c5) from stb1 where c1 > 0 order by tbname" )
pass
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare()
......@@ -509,6 +523,10 @@ class TDTestCase:
self.abs_func_filter()
tdLog.printNoPrefix("==========step8: check tan result of stable query ============")
self.support_super_table_test()
def stop(self):
tdSql.close()
......
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db2',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1}
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1)
tdLog.info("create stb")
tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], ctbNum=paraDict['ctbNum'], ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
tmqCom.asyncInsertData(paraDict)
tdLog.info("create topics from stb with filter")
# queryString = "select ts, sin(c1), pow(c2,3) from %s.%s where t2 == 'beijing' or t2 == 'changsha'" %(paraDict['dbName'], paraDict['stbName'])
queryString = "select * from %s.%s where t2 == 'beijing' or t2 == 'changsha'" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
# start tmq consume processor
tdLog.info("insert consume info to consume processor")
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
topicList = topicNameList[0]
ifcheckdata = 0
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:false, auto.commit.interval.ms:2000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
# tmqCom.getStartCommitNotifyFromTmqsim()
tmqCom.getStartConsumeNotifyFromTmqsim()
tdLog.info("create some new ctb")
paraDict['ctbStartIdx'] = paraDict['ctbStartIdx'] + paraDict['ctbNum']
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], ctbNum=paraDict['ctbNum'], ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data into new ctb")
pThread = tmqCom.asyncInsertData(paraDict)
pThread.join()
tdLog.info("wait insert end")
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[0] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("0 tmq consume rows error!")
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 1 end ...... ")
def run(self):
tdSql.prepare()
self.tmqCase1()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
......@@ -159,7 +159,7 @@ class TMQCom:
tdLog.debug("complete to create %s.%s" %(dbName, stbName))
return
def create_ctable(self,tsql=None, dbName='dbx',stbName='stb',ctbPrefix='ctb',ctbNum=1):
def create_ctable(self,tsql=None, dbName='dbx',stbName='stb',ctbPrefix='ctb',ctbNum=1,ctbStartIdx=0):
tsql.execute("use %s" %dbName)
pre_create = "create table"
sql = pre_create
......@@ -168,8 +168,10 @@ class TMQCom:
tagValue = 'beijing'
if (i % 2 == 0):
tagValue = 'shanghai'
elif (i % 3 == 0):
tagValue = 'changsha'
sql += " %s%d using %s tags(%d, '%s')"%(ctbPrefix,i,stbName,i+1, tagValue)
sql += " %s%d using %s tags(%d, '%s')"%(ctbPrefix,i+ctbStartIdx,stbName,i+ctbStartIdx+1, tagValue)
if (i > 0) and (i%100 == 0):
tsql.execute(sql)
sql = pre_create
......@@ -235,7 +237,7 @@ class TMQCom:
tdLog.debug("insert data ............ [OK]")
return
def insert_data_2(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs):
def insert_data_2(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs,ctbStartIdx=0):
tdLog.debug("start to insert data ............")
tsql.execute("use %s" %dbName)
pre_insert = "insert into "
......@@ -245,7 +247,7 @@ class TMQCom:
startTs = int(round(t * 1000))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
for i in range(ctbNum):
sql += " %s%d values "%(ctbPrefix,i)
sql += " %s%d values "%(ctbPrefix,i+ctbStartIdx)
for j in range(rowsPerTbl):
if (j % 2 == 0):
sql += "(%d, %d, %d, 'tmqrow_%d', now) "%(startTs + j, j, j, j)
......@@ -254,7 +256,7 @@ class TMQCom:
if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)):
tsql.execute(sql)
if j < rowsPerTbl - 1:
sql = "insert into %s%d values " %(ctbPrefix,i)
sql = "insert into %s%d values " %(ctbPrefix,i+ctbStartIdx)
else:
sql = "insert into "
#end sql
......@@ -354,7 +356,10 @@ class TMQCom:
def threadFunctionForInsert(self, **paraDict):
# create new connector for new tdSql instance in my thread
newTdSql = tdCom.newTdSql()
self.insert_data_2(newTdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
if 'ctbStartIdx' in paraDict.keys():
self.insert_data_2(newTdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"],paraDict["ctbStartIdx"])
else:
self.insert_data_2(newTdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
return
def asyncInsertData(self, paraDict):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册