提交 af436875 编写于 作者: S Shengliang Guan

Merge branch '3.0' into test/jcy

......@@ -361,7 +361,7 @@ pipeline {
}
parallel {
stage('check docs') {
agent{label " worker03 || slave215 || slave217 || slave219 || Mac_catalina "}
agent{label " slave1_47 || slave1_48 || slave1_49 || slave1_52 || worker03 || slave215 || slave217 || slave219 || Mac_catalina "}
steps {
check_docs()
}
......@@ -407,7 +407,7 @@ pipeline {
}
}
stage('linux test') {
agent{label " worker03 || slave215 || slave217 || slave219 "}
agent{label " slave1_47 || slave1_48 || slave1_49 || slave1_52 || worker03 || slave215 || slave217 || slave219 "}
options { skipDefaultCheckout() }
when {
changeRequest()
......
......@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG efa2a5f
GIT_TAG fab042d
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
......
......@@ -73,7 +73,95 @@ TDengine 客户端驱动的安装请参考 [安装指南](../#安装步骤)
```c
{{#include examples/c/demo.c}}
```
格式化输出不同类型字段函数 taos_print_row
```c
int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) {
int32_t len = 0;
for (int i = 0; i < num_fields; ++i) {
if (i > 0) {
str[len++] = ' ';
}
if (row[i] == NULL) {
len += sprintf(str + len, "%s", TSDB_DATA_NULL_STR);
continue;
}
switch (fields[i].type) {
case TSDB_DATA_TYPE_TINYINT:
len += sprintf(str + len, "%d", *((int8_t *)row[i]));
break;
case TSDB_DATA_TYPE_UTINYINT:
len += sprintf(str + len, "%u", *((uint8_t *)row[i]));
break;
case TSDB_DATA_TYPE_SMALLINT:
len += sprintf(str + len, "%d", *((int16_t *)row[i]));
break;
case TSDB_DATA_TYPE_USMALLINT:
len += sprintf(str + len, "%u", *((uint16_t *)row[i]));
break;
case TSDB_DATA_TYPE_INT:
len += sprintf(str + len, "%d", *((int32_t *)row[i]));
break;
case TSDB_DATA_TYPE_UINT:
len += sprintf(str + len, "%u", *((uint32_t *)row[i]));
break;
case TSDB_DATA_TYPE_BIGINT:
len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_UBIGINT:
len += sprintf(str + len, "%" PRIu64, *((uint64_t *)row[i]));
break;
case TSDB_DATA_TYPE_FLOAT: {
float fv = 0;
fv = GET_FLOAT_VAL(row[i]);
len += sprintf(str + len, "%f", fv);
} break;
case TSDB_DATA_TYPE_DOUBLE: {
double dv = 0;
dv = GET_DOUBLE_VAL(row[i]);
len += sprintf(str + len, "%lf", dv);
} break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: {
int32_t charLen = varDataLen((char *)row[i] - VARSTR_HEADER_SIZE);
if (fields[i].type == TSDB_DATA_TYPE_BINARY) {
assert(charLen <= fields[i].bytes && charLen >= 0);
} else {
assert(charLen <= fields[i].bytes * TSDB_NCHAR_SIZE && charLen >= 0);
}
memcpy(str + len, row[i], charLen);
len += charLen;
} break;
case TSDB_DATA_TYPE_TIMESTAMP:
len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_BOOL:
len += sprintf(str + len, "%d", *((int8_t *)row[i]));
default:
break;
}
}
str[len] = 0;
return len;
}
```
</details>
### 异步查询示例
......
......@@ -90,6 +90,33 @@ static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData,
}
}
static FORCE_INLINE bool colDataIsNNull_s(const SColumnInfoData* pColumnInfoData, int32_t startIndex,
uint32_t nRows) {
if (!pColumnInfoData->hasNull) {
return false;
}
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
for (int32_t i = startIndex; i < nRows; ++i) {
if (!colDataIsNull_var(pColumnInfoData, i)) {
return false;
}
}
} else {
if (pColumnInfoData->nullbitmap == NULL) {
return false;
}
for (int32_t i = startIndex; i < nRows; ++i) {
if (!colDataIsNull_f(pColumnInfoData->nullbitmap, i)) {
return false;
}
}
}
return true;
}
static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, uint32_t totalRows, uint32_t row,
SColumnDataAgg* pColAgg) {
if (!pColumnInfoData->hasNull) {
......
......@@ -27,17 +27,17 @@
extern "C" {
#endif
typedef struct SBuffer SBuffer;
typedef struct SSchema SSchema;
typedef struct STColumn STColumn;
typedef struct STSchema STSchema;
typedef struct SValue SValue;
typedef struct SColVal SColVal;
typedef struct STSRow2 STSRow2;
typedef struct STSRowBuilder STSRowBuilder;
typedef struct STagVal STagVal;
typedef struct STag STag;
typedef struct SColData SColData;
typedef struct SBuffer SBuffer;
typedef struct SSchema SSchema;
typedef struct STColumn STColumn;
typedef struct STSchema STSchema;
typedef struct SValue SValue;
typedef struct SColVal SColVal;
typedef struct SRow SRow;
typedef struct SRowIter SRowIter;
typedef struct STagVal STagVal;
typedef struct STag STag;
typedef struct SColData SColData;
#define HAS_NONE ((uint8_t)0x1)
#define HAS_NULL ((uint8_t)0x2)
......@@ -68,13 +68,10 @@ struct SBuffer {
void tBufferDestroy(SBuffer *pBuffer);
int32_t tBufferInit(SBuffer *pBuffer, int64_t size);
int32_t tBufferPut(SBuffer *pBuffer, const void *pData, int64_t nData);
int32_t tBufferReserve(SBuffer *pBuffer, int64_t nData, void **ppData);
// STSchema ================================
int32_t tTSchemaCreate(int32_t sver, SSchema *pSchema, int32_t nCols, STSchema **ppTSchema);
void tTSchemaDestroy(STSchema *pTSchema);
// SValue ================================
static FORCE_INLINE int32_t tGetValue(uint8_t *p, SValue *pValue, int8_t type);
void tDestroyTSchema(STSchema *pTSchema);
// SColVal ================================
#define CV_FLAG_VALUE ((int8_t)0x0)
......@@ -89,26 +86,14 @@ static FORCE_INLINE int32_t tGetValue(uint8_t *p, SValue *pValue, int8_t type);
#define COL_VAL_IS_NULL(CV) ((CV)->flag == CV_FLAG_NULL)
#define COL_VAL_IS_VALUE(CV) ((CV)->flag == CV_FLAG_VALUE)
// STSRow2 ================================
#define TSROW_LEN(PROW, V) tGetI32v((uint8_t *)(PROW)->data, (V) ? &(V) : NULL)
#define TSROW_SVER(PROW, V) tGetI32v((PROW)->data + TSROW_LEN(PROW, NULL), (V) ? &(V) : NULL)
int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, STSRow2 **ppRow);
int32_t tTSRowClone(const STSRow2 *pRow, STSRow2 **ppRow);
void tTSRowFree(STSRow2 *pRow);
void tTSRowGet(STSRow2 *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
int32_t tTSRowToArray(STSRow2 *pRow, STSchema *pTSchema, SArray **ppArray);
int32_t tPutTSRow(uint8_t *p, STSRow2 *pRow);
int32_t tGetTSRow(uint8_t *p, STSRow2 **ppRow);
// STSRowBuilder ================================
#define tsRowBuilderInit() ((STSRowBuilder){0})
#define tsRowBuilderClear(B) \
do { \
if ((B)->pBuf) { \
taosMemoryFree((B)->pBuf); \
} \
} while (0)
// SRow ================================
int32_t tRowBuild(SArray *aColVal, STSchema *pTSchema, SBuffer *pBuffer);
void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
// SRowIter ================================
int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter);
void tRowIterClose(SRowIter **ppIter);
SColVal *tRowIterNext(SRowIter *pIter);
// STag ================================
int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag);
......@@ -147,29 +132,17 @@ struct STSchema {
int32_t numOfCols;
int32_t version;
int32_t flen;
int32_t vlen;
int32_t tlen;
STColumn columns[];
};
#define TSROW_HAS_NONE ((uint8_t)0x1)
#define TSROW_HAS_NULL ((uint8_t)0x2U)
#define TSROW_HAS_VAL ((uint8_t)0x4U)
#define TSROW_KV_SMALL ((uint8_t)0x10U)
#define TSROW_KV_MID ((uint8_t)0x20U)
#define TSROW_KV_BIG ((uint8_t)0x40U)
#pragma pack(push, 1)
struct STSRow2 {
TSKEY ts;
uint8_t flags;
uint8_t data[];
};
#pragma pack(pop)
struct STSRowBuilder {
// STSRow2 tsRow;
int32_t szBuf;
uint8_t *pBuf;
struct SRow {
uint8_t flag;
uint8_t rsv;
uint16_t sver;
uint32_t len;
TSKEY ts;
uint8_t data[];
};
struct SValue {
......@@ -258,37 +231,17 @@ typedef struct {
int32_t nCols;
schema_ver_t version;
uint16_t flen;
int32_t vlen;
int32_t tlen;
STColumn *columns;
} STSchemaBuilder;
// use 2 bits for bitmap(default: STSRow/sub block)
#define TD_VTYPE_BITS 2
#define TD_VTYPE_PARTS 4 // PARTITIONS: 1 byte / 2 bits
#define TD_VTYPE_OPTR 3 // OPERATOR: 4 - 1, utilize to get remainder
#define TD_BITMAP_BYTES(cnt) (((cnt) + TD_VTYPE_OPTR) >> 2)
// use 1 bit for bitmap(super block)
#define TD_VTYPE_BITS_I 1
#define TD_VTYPE_PARTS_I 8 // PARTITIONS: 1 byte / 1 bit
#define TD_VTYPE_OPTR_I 7 // OPERATOR: 8 - 1, utilize to get remainder
#define TD_BITMAP_BYTES_I(cnt) (((cnt) + TD_VTYPE_OPTR_I) >> 3)
int32_t tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version);
void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder);
void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version);
int32_t tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int8_t flags, col_id_t colId, col_bytes_t bytes);
STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder);
static FORCE_INLINE int32_t tGetValue(uint8_t *p, SValue *pValue, int8_t type) {
if (IS_VAR_DATA_TYPE(type)) {
return tGetBinary(p, &pValue->pData, pValue ? &pValue->nData : NULL);
} else {
memcpy(&pValue->val, p, tDataTypes[type].bytes);
return tDataTypes[type].bytes;
}
}
STSchema *tBuildTSchema(SSchema *aSchema, int32_t numOfCols, int32_t version);
#endif
......
......@@ -55,6 +55,14 @@ typedef struct STSRow {
#define TD_ROW_TP 0x0U // default
#define TD_ROW_KV 0x01U
#define TD_VTYPE_PARTS 4 // PARTITIONS: 1 byte / 2 bits
#define TD_VTYPE_OPTR 3 // OPERATOR: 4 - 1, utilize to get remainder
#define TD_BITMAP_BYTES(cnt) (((cnt) + TD_VTYPE_OPTR) >> 2)
#define TD_VTYPE_PARTS_I 8 // PARTITIONS: 1 byte / 1 bit
#define TD_VTYPE_OPTR_I 7 // OPERATOR: 8 - 1, utilize to get remainder
#define TD_BITMAP_BYTES_I(cnt) (((cnt) + TD_VTYPE_OPTR_I) >> 3)
/**
* @brief value type
* - for data from client input and STSRow in memory, 3 types of value none/null/norm available
......@@ -244,7 +252,7 @@ int32_t tdGetBitmapValTypeI(const void *pBitmap, int16_t colIdx, TDRowValT *pVal
*/
static FORCE_INLINE void *tdGetBitmapAddrTp(STSRow *pRow, uint32_t flen) {
// The primary TS key is stored separatedly.
return POINTER_SHIFT(TD_ROW_DATA(pRow), flen - sizeof(TSKEY));
return POINTER_SHIFT(TD_ROW_DATA(pRow), flen);
// return POINTER_SHIFT(pRow->ts, flen);
}
......
......@@ -41,6 +41,7 @@ extern "C" {
#define SNAPSHOT_WAIT_MS 1000 * 30
#define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000
#define SYNC_HEART_TIMEOUT_MS 1000 * 8
#define SYNC_MAX_BATCH_SIZE 1
#define SYNC_INDEX_BEGIN 0
......
......@@ -213,7 +213,7 @@ function install_bin() {
[ -x ${install_main_dir}/bin/${benchmarkName} ] && ${csudo}ln -s ${install_main_dir}/bin/${benchmarkName} ${bin_link_dir}/${demoName} || :
[ -x ${install_main_dir}/bin/${benchmarkName} ] && ${csudo}ln -s ${install_main_dir}/bin/${benchmarkName} ${bin_link_dir}/${benchmarkName} || :
[ -x ${install_main_dir}/bin/${dumpName} ] && ${csudo}ln -s ${install_main_dir}/bin/${dumpName} ${bin_link_dir}/${dumpName} || :
[ -x ${install_main_dir}/bin/${xname} ] && ${csudo}ln -s ${install_main_dir}/bin/${dumpName} ${bin_link_dir}/${xname} || :
[ -x ${install_main_dir}/bin/${xname} ] && ${csudo}ln -s ${install_main_dir}/bin/${xname} ${bin_link_dir}/${xname} || :
[ -x ${install_main_dir}/bin/TDinsight.sh ] && ${csudo}ln -s ${install_main_dir}/bin/TDinsight.sh ${bin_link_dir}/TDinsight.sh || :
[ -x ${install_main_dir}/bin/remove.sh ] && ${csudo}ln -s ${install_main_dir}/bin/remove.sh ${bin_link_dir}/${uninstallScript} || :
[ -x ${install_main_dir}/bin/set_core.sh ] && ${csudo}ln -s ${install_main_dir}/bin/set_core.sh ${bin_link_dir}/set_core || :
......
......@@ -1275,6 +1275,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
nVar++;
}
}
fLen -= sizeof(TSKEY);
int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
(int32_t)TD_BITMAP_BYTES(numOfCols - 1);
......@@ -1333,7 +1334,9 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
}
}
offset += TYPE_BYTES[pColumn->type];
if (pColumn->colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
offset += TYPE_BYTES[pColumn->type];
}
}
tdSRowEnd(&rb);
int32_t rowLen = TD_ROW_LEN(rowData);
......@@ -1503,6 +1506,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
nVar++;
}
}
fLen -= sizeof(TSKEY);
int32_t rows = rspObj.resInfo.numOfRows;
int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
......@@ -1585,8 +1589,9 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, offset, k);
}
}
offset += TYPE_BYTES[pColumn->type];
if (pColumn->colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
offset += TYPE_BYTES[pColumn->type];
}
}
tdSRowEnd(&rb);
int32_t rowLen = TD_ROW_LEN(rowData);
......@@ -1803,6 +1808,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
nVar++;
}
}
fLen -= sizeof(TSKEY);
int32_t rows = rspObj.resInfo.numOfRows;
int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
......@@ -1888,8 +1894,9 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, offset, k);
}
}
offset += TYPE_BYTES[pColumn->type];
if (pColumn->colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
offset += TYPE_BYTES[pColumn->type];
}
}
tdSRowEnd(&rb);
int32_t rowLen = TD_ROW_LEN(rowData);
......
......@@ -652,7 +652,10 @@ int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity)
ASSERT(pCol->varmeta.length <= pCol->varmeta.allocLen);
}
memcpy(pCol->pData, pStart, colLength);
if (!colDataIsNNull_s(pCol, 0, pBlock->info.rows)) {
memcpy(pCol->pData, pStart, colLength);
}
pStart += pCol->info.bytes * capacity;
}
......@@ -2052,6 +2055,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB
isStartKey = true;
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true,
offset, k);
continue; // offset should keep 0 for next column
} else if (colDataIsNull_s(pColInfoData, j)) {
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NULL, NULL,
......
此差异已折叠。
......@@ -192,7 +192,7 @@ bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t colType, int32_t fl
return true;
}
void *pBitmap = tdGetBitmapAddrTp(pRow, flen);
tdGetTpRowValOfCol(pVal, pRow, pBitmap, colType, offset - sizeof(TSKEY), colIdx);
tdGetTpRowValOfCol(pVal, pRow, pBitmap, colType, offset, colIdx);
return true;
}
......@@ -217,7 +217,7 @@ bool tdSTSRowIterFetch(STSRowIter *pIter, col_id_t colId, col_type_t colType, SC
return false;
}
}
tdSTSRowIterGetTpVal(pIter, pCol->type, pCol->offset - sizeof(TSKEY), pVal);
tdSTSRowIterGetTpVal(pIter, pCol->type, pCol->offset, pVal);
++pIter->colIdx;
} else if (TD_IS_KV_ROW(pIter->pRow)) {
return tdSTSRowIterGetKvVal(pIter, colId, &pIter->kvIdx, pVal);
......@@ -244,7 +244,7 @@ bool tdSTSRowIterNext(STSRowIter *pIter, SCellVal *pVal) {
}
if (TD_IS_TP_ROW(pIter->pRow)) {
tdSTSRowIterGetTpVal(pIter, pCol->type, pCol->offset - sizeof(TSKEY), pVal);
tdSTSRowIterGetTpVal(pIter, pCol->type, pCol->offset, pVal);
} else if (TD_IS_KV_ROW(pIter->pRow)) {
tdSTSRowIterGetKvVal(pIter, pCol->colId, &pIter->kvIdx, pVal);
} else {
......@@ -412,7 +412,9 @@ int32_t tdSTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow **ppRow) {
valType = TD_VTYPE_NULL;
} else if (IS_VAR_DATA_TYPE(pTColumn->type)) {
varDataSetLen(varBuf, pColVal->value.nData);
memcpy(varDataVal(varBuf), pColVal->value.pData, pColVal->value.nData);
if (pColVal->value.nData != 0) {
memcpy(varDataVal(varBuf), pColVal->value.pData, pColVal->value.nData);
}
val = varBuf;
} else {
val = (const void *)&pColVal->value.val;
......@@ -467,7 +469,7 @@ bool tdSTSRowGetVal(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCell
#ifdef TD_SUPPORT_BITMAP
colIdx = POINTER_DISTANCE(pCol, pSchema->columns) / sizeof(STColumn);
#endif
tdGetTpRowValOfCol(pVal, pRow, pIter->pBitmap, pCol->type, pCol->offset - sizeof(TSKEY), colIdx - 1);
tdGetTpRowValOfCol(pVal, pRow, pIter->pBitmap, pCol->type, pCol->offset, colIdx - 1);
} else if (TD_IS_KV_ROW(pRow)) {
SKvRowIdx *pIdx = (SKvRowIdx *)taosbsearch(&colId, TD_ROW_COL_IDX(pRow), tdRowGetNCols(pRow), sizeof(SKvRowIdx),
compareKvRowColId, TD_EQ);
......@@ -755,11 +757,10 @@ int32_t tdAppendColValToKvRow(SRowBuilder *pBuilder, TDRowValT valType, const vo
int32_t tdAppendColValToTpRow(SRowBuilder *pBuilder, TDRowValT valType, const void *val, bool isCopyVarData,
int8_t colType, int16_t colIdx, int32_t offset) {
if ((offset < (int32_t)sizeof(TSKEY)) || (colIdx < 1)) {
if (colIdx < 1) {
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
offset -= sizeof(TSKEY);
--colIdx;
#ifdef TD_SUPPORT_BITMAP
......@@ -851,7 +852,7 @@ int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) {
memset(pBuilder->pBitmap, TD_VTYPE_NONE_BYTE_II, pBuilder->nBitmaps);
#endif
// the primary TS key is stored separatedly
len = TD_ROW_HEAD_LEN + pBuilder->flen - sizeof(TSKEY) + pBuilder->nBitmaps;
len = TD_ROW_HEAD_LEN + pBuilder->flen + pBuilder->nBitmaps;
TD_ROW_SET_LEN(pBuilder->pBuf, len);
TD_ROW_SET_SVER(pBuilder->pBuf, pBuilder->sver);
break;
......@@ -1094,4 +1095,4 @@ void tTSRowGetVal(STSRow *pRow, STSchema *pTSchema, int16_t iCol, SColVal *pColV
memcpy(&pColVal->value.val, cv.val, tDataTypes[pTColumn->type].bytes);
}
}
}
\ No newline at end of file
}
......@@ -61,7 +61,7 @@ tDataTypeDescriptor tDataTypes[TSDB_DATA_TYPE_MAX] = {
static float floatMin = -FLT_MAX, floatMax = FLT_MAX;
static double doubleMin = -DBL_MAX, doubleMax = DBL_MAX;
FORCE_INLINE void *getDataMin(int32_t type, void* value) {
FORCE_INLINE void *getDataMin(int32_t type, void *value) {
switch (type) {
case TSDB_DATA_TYPE_FLOAT:
*(float *)value = floatMin;
......@@ -77,7 +77,7 @@ FORCE_INLINE void *getDataMin(int32_t type, void* value) {
return value;
}
FORCE_INLINE void *getDataMax(int32_t type, void* value) {
FORCE_INLINE void *getDataMax(int32_t type, void *value) {
switch (type) {
case TSDB_DATA_TYPE_FLOAT:
*(float *)value = floatMax;
......
......@@ -196,7 +196,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
code = 0;
......
......@@ -464,7 +464,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
code = 0;
......
......@@ -56,7 +56,7 @@ typedef struct SDataFWriter SDataFWriter;
typedef struct SDataFReader SDataFReader;
typedef struct SDelFWriter SDelFWriter;
typedef struct SDelFReader SDelFReader;
typedef struct SRowIter SRowIter;
typedef struct STSDBRowIter STSDBRowIter;
typedef struct STsdbFS STsdbFS;
typedef struct SRowMerger SRowMerger;
typedef struct STsdbReadSnap STsdbReadSnap;
......@@ -111,9 +111,9 @@ static FORCE_INLINE int64_t tsdbLogicToFileSize(int64_t lSize, int32_t szPage) {
void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
// int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow);
int32_t tsdbRowCmprFn(const void *p1, const void *p2);
// SRowIter
void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema);
SColVal *tRowIterNext(SRowIter *pIter);
// STSDBRowIter
void tsdbRowIterInit(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema);
SColVal *tsdbRowIterNext(STSDBRowIter *pIter);
// SRowMerger
int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema);
int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
......@@ -562,7 +562,7 @@ struct SDFileSet {
SSttFile *aSttF[TSDB_MAX_STT_TRIGGER];
};
struct SRowIter {
struct STSDBRowIter {
TSDBROW *pRow;
STSchema *pTSchema;
SColVal colVal;
......
......@@ -1275,6 +1275,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
if (streamTaskInput(pTask, (SStreamQueueItem*)pRefBlock) < 0) {
qError("stream task input del failed, task id %d", pTask->taskId);
atomic_sub_fetch_32(pRef, 1);
taosFreeQitem(pRefBlock);
continue;
}
......@@ -1292,7 +1293,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
int32_t ref = atomic_sub_fetch_32(pRef, 1);
ASSERT(ref >= 0);
if (ref == 0) {
taosMemoryFree(pDelBlock);
blockDataDestroy(pDelBlock);
taosMemoryFree(pRef);
}
......
......@@ -341,7 +341,7 @@ int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo
pSkmInfo->suid = suid;
pSkmInfo->uid = uid;
tTSchemaDestroy(pSkmInfo->pTSchema);
tDestroyTSchema(pSkmInfo->pTSchema);
code = metaGetTbTSchemaEx(pMeta, suid, uid, -1, &pSkmInfo->pTSchema);
TSDB_CHECK_CODE(code, lino, _exit);
......@@ -365,7 +365,7 @@ static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid
pCommitter->skmRow.suid = suid;
pCommitter->skmRow.uid = uid;
tTSchemaDestroy(pCommitter->skmRow.pTSchema);
tDestroyTSchema(pCommitter->skmRow.pTSchema);
code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmRow.pTSchema);
TSDB_CHECK_CODE(code, lino, _exit);
......@@ -498,7 +498,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
#if 0
ASSERT(pCommitter->minKey <= pCommitter->nextKey && pCommitter->maxKey >= pCommitter->nextKey);
#endif
pCommitter->nextKey = TSKEY_MAX;
// Reader
......@@ -623,7 +623,8 @@ int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapDa
_exit:
if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino,
tstrerror(code));
}
return code;
}
......@@ -666,7 +667,8 @@ int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray
_exit:
if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino,
tstrerror(code));
}
return code;
}
......@@ -706,7 +708,8 @@ static int32_t tsdbCommitSttBlk(SDataFWriter *pWriter, SDiskDataBuilder *pBuilde
_exit:
if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino,
tstrerror(code));
}
return code;
}
......@@ -919,8 +922,8 @@ static void tsdbCommitDataEnd(SCommitter *pCommitter) {
#else
tBlockDataDestroy(&pCommitter->dWriter.bDatal, 1);
#endif
tTSchemaDestroy(pCommitter->skmTable.pTSchema);
tTSchemaDestroy(pCommitter->skmRow.pTSchema);
tDestroyTSchema(pCommitter->skmTable.pTSchema);
tDestroyTSchema(pCommitter->skmRow.pTSchema);
}
static int32_t tsdbCommitData(SCommitter *pCommitter) {
......
......@@ -595,21 +595,21 @@ int32_t tDiskDataAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTS
if (pBuilder->bi.minKey > kRow.ts) pBuilder->bi.minKey = kRow.ts;
if (pBuilder->bi.maxKey < kRow.ts) pBuilder->bi.maxKey = kRow.ts;
SRowIter iter = {0};
tRowIterInit(&iter, pRow, pTSchema);
STSDBRowIter iter = {0};
tsdbRowIterInit(&iter, pRow, pTSchema);
SColVal *pColVal = tRowIterNext(&iter);
SColVal *pColVal = tsdbRowIterNext(&iter);
for (int32_t iBuilder = 0; iBuilder < pBuilder->nBuilder; iBuilder++) {
SDiskColBuilder *pDCBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aBuilder, iBuilder);
while (pColVal && pColVal->cid < pDCBuilder->cid) {
pColVal = tRowIterNext(&iter);
pColVal = tsdbRowIterNext(&iter);
}
if (pColVal && pColVal->cid == pDCBuilder->cid) {
code = tDiskColAddVal(pDCBuilder, pColVal);
if (code) return code;
pColVal = tRowIterNext(&iter);
pColVal = tsdbRowIterNext(&iter);
} else {
code = tDiskColAddVal(pDCBuilder, &COL_VAL_NONE(pDCBuilder->cid, pDCBuilder->type));
if (code) return code;
......
......@@ -2432,7 +2432,8 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
// it is a clean block, load it directly
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader)) {
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) &&
pBlock->nRow <= pReader->capacity) {
if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) {
copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
......
......@@ -555,7 +555,7 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
}
tBlockDataDestroy(&pReader->bData, 1);
tTSchemaDestroy(pReader->skmTable.pTSchema);
tDestroyTSchema(pReader->skmTable.pTSchema);
// del
if (pReader->pDelFReader) tsdbDelFReaderClose(&pReader->pDelFReader);
......@@ -1416,7 +1416,7 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
taosArrayDestroy(pWriter->dReader.aBlockIdx);
tBlockDataDestroy(&pWriter->bData, 1);
tTSchemaDestroy(pWriter->skmTable.pTSchema);
tDestroyTSchema(pWriter->skmTable.pTSchema);
for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t*); iBuf++) {
tFree(pWriter->aBuf[iBuf]);
......
......@@ -579,8 +579,8 @@ int32_t tsdbRowCmprFn(const void *p1, const void *p2) {
return tsdbKeyCmprFn(&TSDBROW_KEY((TSDBROW *)p1), &TSDBROW_KEY((TSDBROW *)p2));
}
// SRowIter ======================================================
void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema) {
// STSDBRowIter ======================================================
void tsdbRowIterInit(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema) {
pIter->pRow = pRow;
if (pRow->type == 0) {
ASSERT(pTSchema);
......@@ -594,7 +594,7 @@ void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema) {
}
}
SColVal *tRowIterNext(SRowIter *pIter) {
SColVal *tsdbRowIterNext(STSDBRowIter *pIter) {
if (pIter->pRow->type == 0) {
if (pIter->i < pIter->pTSchema->numOfCols) {
tTSRowGetVal(pIter->pRow->pTSRow, pIter->pTSchema, pIter->i, &pIter->colVal);
......@@ -1084,11 +1084,11 @@ static int32_t tBlockDataAppendTPRow(SBlockData *pBlockData, STSRow *pRow, STSch
cv.flag = CV_FLAG_VALUE;
if (IS_VAR_DATA_TYPE(pTColumn->type)) {
void *pData = (char *)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY));
void *pData = (char *)pRow + *(int32_t *)(pRow->data + pTColumn->offset);
cv.value.nData = varDataLen(pData);
cv.value.pData = varDataVal(pData);
} else {
memcpy(&cv.value.val, pRow->data + pTColumn->offset - sizeof(TSKEY), pTColumn->bytes);
memcpy(&cv.value.val, pRow->data + pTColumn->offset, pTColumn->bytes);
}
code = tColDataAppendValue(pColData, &cv);
......@@ -1106,11 +1106,11 @@ static int32_t tBlockDataAppendTPRow(SBlockData *pBlockData, STSRow *pRow, STSch
cv.flag = CV_FLAG_VALUE;
if (IS_VAR_DATA_TYPE(pTColumn->type)) {
void *pData = (char *)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY));
void *pData = (char *)pRow + *(int32_t *)(pRow->data + pTColumn->offset);
cv.value.nData = varDataLen(pData);
cv.value.pData = varDataVal(pData);
} else {
memcpy(&cv.value.val, pRow->data + pTColumn->offset - sizeof(TSKEY), pTColumn->bytes);
memcpy(&cv.value.val, pRow->data + pTColumn->offset, pTColumn->bytes);
}
code = tColDataAppendValue(pColData, &cv);
......
......@@ -480,51 +480,6 @@ _error:
return code;
}
#ifdef TEST_IMPL
// wait moment
int waitMoment(SQInfo* pQInfo) {
if (pQInfo->sql) {
int ms = 0;
char* pcnt = strstr(pQInfo->sql, " count(*)");
if (pcnt) return 0;
char* pos = strstr(pQInfo->sql, " t_");
if (pos) {
pos += 3;
ms = atoi(pos);
while (*pos >= '0' && *pos <= '9') {
pos++;
}
char unit_char = *pos;
if (unit_char == 'h') {
ms *= 3600 * 1000;
} else if (unit_char == 'm') {
ms *= 60 * 1000;
} else if (unit_char == 's') {
ms *= 1000;
}
}
if (ms == 0) return 0;
printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql);
if (ms < 1000) {
taosMsleep(ms);
} else {
int used_ms = 0;
while (used_ms < ms) {
taosMsleep(1000);
used_ms += 1000;
if (isTaskKilled(pQInfo)) {
printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
break;
}
}
}
}
return 1;
}
#endif
static void freeBlock(void* param) {
SSDataBlock* pBlock = *(SSDataBlock**)param;
blockDataDestroy(pBlock);
......
......@@ -936,6 +936,9 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
} else {
pDest->info.parTbName[0] = 0;
}
if (pParInfo->groupId && pDest->info.parTbName[0]) {
streamStatePutParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, pDest->info.parTbName);
}
/*printf("\n\n set name %s\n\n", pDest->info.parTbName);*/
blockDataDestroy(pTmpBlock);
blockDataDestroy(pResBlock);
......
......@@ -54,22 +54,26 @@ static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDown
SColumnNode* col2 = (SColumnNode*)pNode->pRight;
SColumnNode* leftTsCol = NULL;
SColumnNode* rightTsCol = NULL;
if (col1->dataBlockId == pDownstream[0]->resultDataBlockId) {
ASSERT(col2->dataBlockId == pDownstream[1]->resultDataBlockId);
if (col1->dataBlockId == col2->dataBlockId ) {
leftTsCol = col1;
rightTsCol = col2;
} else {
ASSERT(col1->dataBlockId == pDownstream[1]->resultDataBlockId);
ASSERT(col2->dataBlockId == pDownstream[0]->resultDataBlockId);
leftTsCol = col2;
rightTsCol = col1;
if (col1->dataBlockId == pDownstream[0]->resultDataBlockId) {
ASSERT(col2->dataBlockId == pDownstream[1]->resultDataBlockId);
leftTsCol = col1;
rightTsCol = col2;
} else {
ASSERT(col1->dataBlockId == pDownstream[1]->resultDataBlockId);
ASSERT(col2->dataBlockId == pDownstream[0]->resultDataBlockId);
leftTsCol = col2;
rightTsCol = col1;
}
}
setJoinColumnInfo(&pInfo->leftCol, leftTsCol);
setJoinColumnInfo(&pInfo->rightCol, rightTsCol);
} else {
ASSERT(false);
}
}
}}
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) {
......
......@@ -47,6 +47,7 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch
TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) {
STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
int32_t code = TSDB_CODE_SUCCESS;
SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
if (NULL == tags) {
return TSDB_CODE_QRY_APP_ERROR;
......@@ -59,10 +60,10 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch
SArray* tagName = taosArrayInit(8, TSDB_COL_NAME_LEN);
if (!tagName) {
return buildInvalidOperationMsg(&pBuf, "out of memory");
code = buildInvalidOperationMsg(&pBuf, "out of memory");
goto end;
}
int32_t code = TSDB_CODE_SUCCESS;
SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta);
bool isJson = false;
......@@ -77,6 +78,10 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch
int32_t colLen = pTagSchema->bytes;
if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
colLen = bind[c].length[0];
if ((colLen + VARSTR_HEADER_SIZE) > pTagSchema->bytes) {
code = buildInvalidOperationMsg(&pBuf, "tag length is too big");
goto end;
}
}
taosArrayPush(tagName, pTagSchema->name);
if (pTagSchema->type == TSDB_DATA_TYPE_JSON) {
......
......@@ -139,8 +139,8 @@ void insSetBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, col_i
if (i > 0) {
pColList->cols[i].offset = pColList->cols[i - 1].offset + pSchema[i - 1].bytes;
pColList->cols[i].toffset = pColList->flen;
pColList->flen += TYPE_BYTES[type];
}
pColList->flen += TYPE_BYTES[type];
switch (type) {
case TSDB_DATA_TYPE_BINARY:
pColList->allNullLen += (VARSTR_HEADER_SIZE + CHAR_BYTES);
......
......@@ -743,7 +743,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
}
QW_LOCK(QW_WRITE, &ctx->lock);
if (queryStop || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) {
if ((queryStop && (0 == atomic_load_8((int8_t *)&ctx->queryContinue))) || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) {
// Note: query is not running anymore
QW_SET_PHASE(ctx, 0);
QW_UNLOCK(QW_WRITE, &ctx->lock);
......
......@@ -1758,18 +1758,45 @@ int32_t sumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *
break;
}
if (IS_SIGNED_NUMERIC_TYPE(type)) {
int64_t *in = (int64_t *)pInputData->pData;
if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) {
int64_t *out = (int64_t *)pOutputData->pData;
*out += in[i];
if (type == TSDB_DATA_TYPE_TINYINT || type == TSDB_DATA_TYPE_BOOL) {
int8_t *in = (int8_t *)pInputData->pData;
*out += in[i];
} else if (type == TSDB_DATA_TYPE_SMALLINT) {
int16_t *in = (int16_t *)pInputData->pData;
*out += in[i];
} else if (type == TSDB_DATA_TYPE_INT) {
int32_t *in = (int32_t *)pInputData->pData;
*out += in[i];
} else if (type == TSDB_DATA_TYPE_BIGINT) {
int64_t *in = (int64_t *)pInputData->pData;
*out += in[i];
}
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
uint64_t *in = (uint64_t *)pInputData->pData;
uint64_t *out = (uint64_t *)pOutputData->pData;
*out += in[i];
if (type == TSDB_DATA_TYPE_UTINYINT) {
uint8_t *in = (uint8_t *)pInputData->pData;
*out += in[i];
} else if (type == TSDB_DATA_TYPE_USMALLINT) {
uint16_t *in = (uint16_t *)pInputData->pData;
*out += in[i];
} else if (type == TSDB_DATA_TYPE_UINT) {
uint32_t *in = (uint32_t *)pInputData->pData;
*out += in[i];
} else if (type == TSDB_DATA_TYPE_UBIGINT) {
uint64_t *in = (uint64_t *)pInputData->pData;
*out += in[i];
}
} else if (IS_FLOAT_TYPE(type)) {
double *in = (double *)pInputData->pData;
double *out = (double *)pOutputData->pData;
*out += in[i];
if (type == TSDB_DATA_TYPE_FLOAT) {
float *in = (float *)pInputData->pData;
*out += in[i];
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
double *in = (double *)pInputData->pData;
*out += in[i];
}
}
}
......
......@@ -232,6 +232,7 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, S
int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode);
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h);
bool syncNodeHeartbeatTimeout(SSyncNode* pSyncNode);
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term);
......
......@@ -137,6 +137,7 @@ typedef struct SyncHeartbeatReply {
SyncTerm term;
SyncTerm privateTerm;
int64_t startTime;
int64_t timeStamp;
} SyncHeartbeatReply;
typedef struct SyncPreSnapshot {
......
......@@ -50,9 +50,10 @@ void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr) {
memset(pSyncIndexMgr->privateTerm, 0, sizeof(pSyncIndexMgr->privateTerm));
// int64_t timeNow = taosGetMonotonicMs();
int64_t timeNow = taosGetTimestampMs();
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
pSyncIndexMgr->startTimeArr[i] = 0;
pSyncIndexMgr->recvTimeArr[i] = 0;
pSyncIndexMgr->recvTimeArr[i] = timeNow;
}
/*
......@@ -147,7 +148,7 @@ int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRa
return recvTime;
}
}
ASSERT(0);
return -1;
}
......
......@@ -639,6 +639,14 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
return -1;
}
// heartbeat timeout
if (syncNodeHeartbeatTimeout(pSyncNode)) {
terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
sNError(pSyncNode, "failed to sync propose since hearbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
return -1;
}
// optimized one replica
if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
SyncIndex retIndex;
......@@ -2086,6 +2094,29 @@ int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHand
return code;
}
bool syncNodeHeartbeatTimeout(SSyncNode* pSyncNode) {
if (pSyncNode->replicaNum == 1) {
return false;
}
int32_t toCount = 0;
int64_t tsNow = taosGetTimestampMs();
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
if (recvTime == 0 || recvTime == -1) {
continue;
}
if (tsNow - recvTime > SYNC_HEART_TIMEOUT_MS) {
toCount++;
}
}
bool b = (toCount >= pSyncNode->quorum ? true : false);
return b;
}
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
int32_t ret = 0;
......@@ -2127,6 +2158,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
pMsgReply->srcId = ths->myRaftId;
pMsgReply->term = ths->pRaftStore->currentTerm;
pMsgReply->privateTerm = 8864; // magic number
pMsgReply->timeStamp = taosGetTimestampMs();
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
syncNodeResetElectTimer(ths);
......@@ -2191,7 +2223,7 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
syncLogRecvHeartbeatReply(ths, pMsg, "");
// update last reply time, make decision whether the other node is alive or not
syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->destId, pMsg->startTime);
syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, pMsg->timeStamp);
return 0;
}
......
......@@ -424,8 +424,8 @@ void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port,
pMsg->term, pMsg->privateTerm, s);
sNTrace(pSyncNode, "recv sync-heartbeat-reply from %s:%d {term:%" PRId64 ", ts:%" PRId64 "}, %s", host, port,
pMsg->term, pMsg->timeStamp, s);
}
void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) {
......
......@@ -739,7 +739,6 @@ void taosFprintfFile(TdFilePtr pFile, const char *format, ...) {
va_start(ap, format);
vfprintf(pFile->fp, format, ap);
va_end(ap);
fflush(pFile->fp);
}
bool taosValidFile(TdFilePtr pFile) { return pFile != NULL && pFile->fd > 0; }
......
......@@ -231,7 +231,7 @@
,,y,script,./test.sh -f tsim/stream/windowClose.sim
,,y,script,./test.sh -f tsim/stream/ignoreExpiredData.sim
,,y,script,./test.sh -f tsim/stream/sliding.sim
,,,script,./test.sh -f tsim/stream/partitionbyColumnInterval.sim
,,y,script,./test.sh -f tsim/stream/partitionbyColumnInterval.sim
,,y,script,./test.sh -f tsim/stream/partitionbyColumnSession.sim
,,y,script,./test.sh -f tsim/stream/partitionbyColumnState.sim
,,y,script,./test.sh -f tsim/stream/deleteInterval.sim
......@@ -279,7 +279,7 @@
,,y,script,./test.sh -f tsim/stable/vnode3.sim
,,y,script,./test.sh -f tsim/stable/metrics_idx.sim
,,,script,./test.sh -f tsim/sma/drop_sma.sim
,,,script,./test.sh -f tsim/sma/sma_leak.sim
,,y,script,./test.sh -f tsim/sma/sma_leak.sim
,,y,script,./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim
,,y,script,./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
,,y,script,./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim
......@@ -406,38 +406,38 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/taosShellNetChk.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/telemetry.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/taosdMonitor.py
,,n,system-test,python3 ./test.py -f 0-others/taosdShell.py -N 5 -M 3 -Q 3
,,n,system-test,python3 ./test.py -f 0-others/udfTest.py
,,n,system-test,python3 ./test.py -f 0-others/udf_create.py
,,n,system-test,python3 ./test.py -f 0-others/udf_restart_taosd.py
,,n,system-test,python3 ./test.py -f 0-others/cachemodel.py
,,n,system-test,python3 ./test.py -f 0-others/udf_cfg1.py
,,n,system-test,python3 ./test.py -f 0-others/udf_cfg2.py
,,n,system-test,python3 ./test.py -f 0-others/taosdShell.py -N 5 -M 3 -Q 3
,,,system-test,python3 ./test.py -f 0-others/sysinfo.py
,,,system-test,python3 ./test.py -f 0-others/user_control.py
,,,system-test,python3 ./test.py -f 0-others/fsync.py
,,,system-test,python3 ./test.py -f 0-others/compatibility.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/udf_cfg2.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/cachemodel.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/sysinfo.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/user_control.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/fsync.py
,,n,system-test,python3 ./test.py -f 0-others/compatibility.py
,,,system-test,python3 ./test.py -f 1-insert/alter_database.py
,,,system-test,python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_telnet_line_taosc_insert.py
,,,system-test,python3 ./test.py -f 1-insert/opentsdb_json_taosc_insert.py
,,,system-test,python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py
,,,system-test,python3 ./test.py -f 1-insert/test_stmt_set_tbname_tag.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_stable.py
#,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_table.py
,,,system-test,python3 ./test.py -f 1-insert/boundary.py
,,,system-test,python3 ./test.py -f 1-insert/insertWithMoreVgroup.py
,,,system-test,python3 ./test.py -f 1-insert/table_comment.py
,,n,system-test,python3 ./test.py -f 1-insert/boundary.py
,,n,system-test,python3 ./test.py -f 1-insert/insertWithMoreVgroup.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/table_comment.py
,,,system-test,python3 ./test.py -f 1-insert/time_range_wise.py
,,,system-test,python3 ./test.py -f 1-insert/block_wise.py
,,,system-test,python3 ./test.py -f 1-insert/create_retentions.py
,,,system-test,python3 ./test.py -f 1-insert/mutil_stage.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/mutil_stage.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/table_param_ttl.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/table_param_ttl.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/update_data_muti_rows.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/db_tb_name_check.py
,,,system-test,python3 ./test.py -f 1-insert/database_pre_suf.py
,,,system-test,python3 ./test.py -f 1-insert/InsertFuturets.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/InsertFuturets.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/abs.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/abs.py -R
......@@ -543,8 +543,8 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/min.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/min.py -R
,,,system-test,python3 ./test.py -f 2-query/mode.py
,,,system-test,python3 ./test.py -f 2-query/mode.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/mode.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/mode.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/Now.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/Now.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/percentile.py
......@@ -563,8 +563,8 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sin.py -R
,,,system-test,python3 ./test.py -f 2-query/smaTest.py
,,,system-test,python3 ./test.py -f 2-query/smaTest.py -R
,,,system-test,python3 ./test.py -f 2-query/sml.py
,,,system-test,python3 ./test.py -f 2-query/sml.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/spread.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/spread.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sqrt.py
......@@ -611,21 +611,17 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/varchar.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py -R
,,,system-test,python3 ./test.py -f 1-insert/update_data.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/update_data.py
,,,system-test,python3 ./test.py -f 1-insert/tb_100w_data_order.py
,,,system-test,python3 ./test.py -f 1-insert/delete_stable.py
,,,system-test,python3 ./test.py -f 1-insert/delete_childtable.py
,,,system-test,python3 ./test.py -f 1-insert/delete_normaltable.py
,,,system-test,python3 ./test.py -f 1-insert/keep_expired.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/delete_stable.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/delete_childtable.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/delete_normaltable.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/keep_expired.py
,,,system-test,python3 ./test.py -f 1-insert/drop.py
,,,system-test,python3 ./test.py -f 1-insert/drop.py -N 3 -M 3 -i False -n 3
,,,system-test,python3 ./test.py -f 2-query/join2.py
,,,system-test,python3 ./test.py -f 2-query/union1.py
,,,system-test,python3 ./test.py -f 2-query/concat2.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/join2.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/union1.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/concat2.py
,,,system-test,python3 ./test.py -f 2-query/json_tag.py
,,,system-test,python3 ./test.py -f 2-query/nestedQuery.py
,,,system-test,python3 ./test.py -f 2-query/nestedQuery_str.py
......@@ -731,7 +727,9 @@
,,,system-test,python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py
,,,system-test,python3 ./test.py -f 99-TDcase/TD-19201.py
,,,system-test,python3 ./test.py -f 7-tmq/tmqSubscribeStb-r3.py -N 5
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/between.py -Q 2
,,,system-test,python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 6 -M 3
,,,system-test,python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 6 -M 3 -n 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/between.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/distinct.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/varchar.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/ltrim.py -Q 2
......@@ -760,7 +758,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/Today.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/min.py -Q 2
,,,system-test,python3 ./test.py -f 2-query/mode.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/mode.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/countAlwaysReturnValue.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last.py -Q 2
......@@ -797,15 +795,15 @@
,,,system-test,python3 ./test.py -f 2-query/stablity.py -Q 2
,,,system-test,python3 ./test.py -f 2-query/stablity_1.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/avg.py -Q 2
,,,system-test,python3 ./test.py -f 2-query/elapsed.py -Q 2
,,,system-test,python3 ./test.py -f 2-query/csum.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/elapsed.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/csum.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/mavg.py -Q 2
,,,system-test,python3 ./test.py -f 2-query/sample.py -Q 2
,,,system-test,python3 ./test.py -f 2-query/function_diff.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sample.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_diff.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/unique.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stateduration.py -Q 2
,,,system-test,python3 ./test.py -f 2-query/function_stateduration.py -Q 2
,,,system-test,python3 ./test.py -f 2-query/statecount.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_stateduration.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/statecount.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tail.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/ttl_comment.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/distribute_agg_count.py -Q 2
......@@ -854,7 +852,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/Today.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/min.py -Q 3
,,,system-test,python3 ./test.py -f 2-query/mode.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/mode.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/countAlwaysReturnValue.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last.py -Q 3
......@@ -890,15 +888,15 @@
,,,system-test,python3 ./test.py -f 2-query/stablity.py -Q 3
,,,system-test,python3 ./test.py -f 2-query/stablity_1.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/avg.py -Q 3
,,,system-test,python3 ./test.py -f 2-query/elapsed.py -Q 3
,,,system-test,python3 ./test.py -f 2-query/csum.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/elapsed.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/csum.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/mavg.py -Q 3
,,,system-test,python3 ./test.py -f 2-query/sample.py -Q 3
,,,system-test,python3 ./test.py -f 2-query/function_diff.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sample.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_diff.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/unique.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stateduration.py -Q 3
,,,system-test,python3 ./test.py -f 2-query/function_stateduration.py -Q 3
,,,system-test,python3 ./test.py -f 2-query/statecount.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_stateduration.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/statecount.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tail.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/ttl_comment.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/distribute_agg_count.py -Q 3
......@@ -947,7 +945,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/Today.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/min.py -Q 4
,,,system-test,python3 ./test.py -f 2-query/mode.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/mode.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/countAlwaysReturnValue.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last.py -Q 4
......@@ -983,16 +981,16 @@
,,,system-test,python3 ./test.py -f 2-query/stablity.py -Q 4
,,,system-test,python3 ./test.py -f 2-query/stablity_1.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/avg.py -Q 4
,,,system-test,python3 ./test.py -f 2-query/elapsed.py -Q 4
,,,system-test,python3 ./test.py -f 2-query/csum.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/elapsed.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/csum.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/mavg.py -Q 4
,,,system-test,python3 ./test.py -f 2-query/sample.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sample.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/cast.py -Q 4
,,,system-test,python3 ./test.py -f 2-query/function_diff.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_diff.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/unique.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stateduration.py -Q 4
,,,system-test,python3 ./test.py -f 2-query/function_stateduration.py -Q 4
,,,system-test,python3 ./test.py -f 2-query/statecount.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_stateduration.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/statecount.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tail.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/ttl_comment.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/distribute_agg_count.py -Q 4
......@@ -1009,24 +1007,29 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_partition.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_partition.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row.py -Q 4
,,,system-test,python3 ./test.py -f 2-query/tsbsQuery.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsbsQuery.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interp.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_select.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_select.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_select.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_select.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_select.py -Q 4
#develop test
,,,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/auto_create_table_json.py
,,,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/custom_col_tag.py
,,,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/default_json.py
,,,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/demo.py
,,,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/insert_alltypes_json.py
,,,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/invalid_commandline.py
,,,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/json_tag.py
,,,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/query_json.py
,,,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/sample_csv_json.py
,,,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/sml_json_alltypes.py
,,,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/taosdemoTestQueryWithJson.py -R
,,,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/telnet_tcp.py -R
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/auto_create_table_json.py
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/custom_col_tag.py
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/default_json.py
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/demo.py
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/insert_alltypes_json.py
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/invalid_commandline.py
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/json_tag.py
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/query_json.py
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/sample_csv_json.py
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/sml_json_alltypes.py
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/taosdemoTestQueryWithJson.py -R
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/telnet_tcp.py -R
#docs-examples test
,,n,docs-examples-test,bash python.sh
......
......@@ -231,7 +231,7 @@ class TDTestCase:
finally:
if platform.system().lower() == 'windows':
tdLog.info("ps -a | grep taos | awk \'{print $2}\' | xargs kill -9")
# os.system('ps -a | grep taos | awk \'{print $2}\' | xargs kill -9')
os.system('ps -a | grep taos | awk \'{print $2}\' | xargs kill -9')
else:
tdLog.info("pkill -9 taos")
# os.system('pkill -9 taos')
......
......@@ -165,6 +165,7 @@ class TDTestCase:
# conn.execute("drop database if exists %s" % dbname)
conn.close()
tdLog.success("%s successfully executed" % __file__)
except Exception as err:
# conn.execute("drop database if exists %s" % dbname)
......
......@@ -239,6 +239,7 @@ class TDTestCase:
# conn.execute("drop database if exists %s" % dbname)
conn.close()
tdLog.success("%s successfully executed" % __file__)
except Exception as err:
# conn.execute("drop database if exists %s" % dbname)
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import random
import os
import time
import subprocess
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
class TDTestCase:
updatecfgDict = {'maxSQLLength':1048576,'debugFlag': 143 ,"querySmaOptimize":1}
def init(self, conn, logSql, replicaVar):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.testcasePath = os.path.split(__file__)[0]
self.testcaseFilename = os.path.split(__file__)[-1]
os.system("rm -rf %s/%s.sql" % (self.testcasePath,self.testcaseFilename))
self.db = "insert_select"
def dropandcreateDB_random(self,database,n):
ts = 1604298064000
tdSql.execute('''drop database if exists %s ;''' %database)
tdSql.execute('''create database %s keep 36500 ;'''%(database))
tdSql.execute('''use %s;'''%database)
tdSql.execute('''create table %s.tb (ts timestamp , i tinyint );'''%database)
tdSql.execute('''create table %s.tb1 (speed timestamp , c1 int , c2 int , c3 int );'''%database)
sql_before = "insert into %s.tb1 values" %database
sql_value = ''
for i in range(n):
sql_value = sql_value +"(%d, %d, %d, %d)" % (ts + i, i, i, i)
sql = sql_before + sql_value
tdSql.execute(sql)
tdSql.query("select count(*) from %s.tb1;"%database)
sql_result = tdSql.getData(0,0)
tdLog.info("result: %s" %(sql_result))
tdSql.query("reset query cache;")
tdSql.query("insert into %s.tb1 select * from %s.tb1;"%(database,database))
tdSql.query("select count(*) from %s.tb1;"%database)
sql_result = tdSql.getData(0,0)
tdLog.info("result: %s" %(sql_result))
def users_bug_TD_20592(self,database):
tdSql.execute('''drop database if exists %s ;''' %database)
tdSql.execute('''create database %s keep 36500 ;'''%(database))
tdSql.execute('''use %s;'''%database)
tdSql.execute('''create table %s.sav_stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 smallint, c6 tinyint, c7 bool, c8 binary(10), c9 nchar(10)) tags(t1 int, t2 int);'''%database)
tdSql.execute('''create table %s.tb1 using %s.sav_stb tags( 9 , 0);'''%(database,database))
tdSql.error('''insert into %s.tb1 (c8, c9) values(now, 1);'''%(database))
def run(self):
startTime = time.time()
os.system("rm -rf %s/%s.sql" % (self.testcasePath,self.testcaseFilename))
self.dropandcreateDB_random("%s" %self.db, random.randint(10000,30000))
#taos -f sql
print("taos -f sql start!")
taos_cmd1 = "taos -f %s/%s.sql" % (self.testcasePath,self.testcaseFilename)
_ = subprocess.check_output(taos_cmd1, shell=True)
print("taos -f sql over!")
self.users_bug_TD_20592("%s" %self.db)
#taos -f sql
print("taos -f sql start!")
taos_cmd1 = "taos -f %s/%s.sql" % (self.testcasePath,self.testcaseFilename)
_ = subprocess.check_output(taos_cmd1, shell=True)
print("taos -f sql over!")
endTime = time.time()
print("total time %ds" % (endTime - startTime))
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
......@@ -26,7 +26,7 @@ from util.dnodes import *
class TDTestCase:
updatecfgDict = {'maxSQLLength':1048576,'debugFlag': 143 ,"querySmaOptimize":1}
def init(self, conn, logSql):
def init(self, conn, logSql, replicaVar):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
......@@ -109,7 +109,7 @@ class TDTestCase:
q_binary1 , q_nchar1 , q_binary2 , q_nchar2 , q_binary3 , q_nchar3 , q_binary4 , q_nchar4 , q_binary5 , q_nchar5 , q_binary6 , q_nchar6 , q_binary7 , q_nchar7, q_binary8 , q_nchar8) \
values(%d, %d, %d, %d, %d, %f, %f, 0, 'binary.%s', 'nchar.%s', %d, 'binary1.%s', 'nchar1.%s', 'binary2.%s', 'nchar2.%s', 'binary3.%s', 'nchar3.%s', \
'binary4.%s', 'nchar4.%s', 'binary5.%s', 'nchar5.%s', 'binary6.%s', 'nchar6.%s', 'binary7.%s', 'nchar7.%s', 'binary8.%s', 'nchar8.%s') ;'''
% (database,ts + i*1000, fake.random_int(min=-2147483647, max=2147483647, step=1),
% (database,ts + i*1000+1, fake.random_int(min=-2147483647, max=2147483647, step=1),
fake.random_int(min=-9223372036854775807, max=9223372036854775807, step=1),
fake.random_int(min=-32767, max=32767, step=1) , fake.random_int(min=-127, max=127, step=1) ,
fake.pyfloat() , fake.pyfloat() , fake.pystr() , fake.pystr() , ts + i , fake.pystr() , fake.pystr() , fake.pystr() , fake.pystr() , fake.pystr() , fake.pystr() ,
......@@ -118,7 +118,7 @@ class TDTestCase:
q_binary1 , q_nchar1 , q_binary2 , q_nchar2 , q_binary3 , q_nchar3 , q_binary4 , q_nchar4 , q_binary5 , q_nchar5 , q_binary6 , q_nchar6 , q_binary7 , q_nchar7, q_binary8 , q_nchar8) \
values(%d, %d, %d, %d, %d, %f, %f, 0, 'binary.%s', 'nchar.%s', %d, 'binary1.%s', 'nchar1.%s', 'binary2.%s', 'nchar2.%s', 'binary3.%s', 'nchar3.%s', \
'binary4.%s', 'nchar4.%s', 'binary5.%s', 'nchar5.%s', 'binary6.%s', 'nchar6.%s', 'binary7.%s', 'nchar7.%s', 'binary8.%s', 'nchar8.%s') ;'''
% (database,ts + i*1000, fake.random_int(min=-2147483647, max=2147483647, step=1) ,
% (database,ts + i*1000+1, fake.random_int(min=-2147483647, max=2147483647, step=1) ,
fake.random_int(min=-9223372036854775807, max=9223372036854775807, step=1) ,
fake.random_int(min=-32767, max=32767, step=1) , fake.random_int(min=-127, max=127, step=1) ,
fake.pyfloat() , fake.pyfloat() , fake.pystr() , fake.pystr() , ts + i, fake.pystr() , fake.pystr() , fake.pystr() , fake.pystr() , fake.pystr() , fake.pystr() ,
......@@ -128,7 +128,7 @@ class TDTestCase:
q_binary1 , q_nchar1 , q_binary2 , q_nchar2 , q_binary3 , q_nchar3 , q_binary4 , q_nchar4 , q_binary5 , q_nchar5 , q_binary6 , q_nchar6 , q_binary7 , q_nchar7, q_binary8 , q_nchar8)\
values(%d, %d, %d, %d, %d, %f, %f, 1, 'binary.%s', 'nchar.%s', %d, 'binary1.%s', 'nchar1.%s', 'binary2.%s', 'nchar2.%s', 'binary3.%s', 'nchar3.%s', \
'binary4.%s', 'nchar4.%s', 'binary5.%s', 'nchar5.%s', 'binary6.%s', 'nchar6.%s', 'binary7.%s', 'nchar7.%s', 'binary8.%s', 'nchar8.%s') ;'''
% (database,ts + i*1000, fake.random_int(min=0, max=2147483647, step=1),
% (database,ts + i*1000+2, fake.random_int(min=0, max=2147483647, step=1),
fake.random_int(min=0, max=9223372036854775807, step=1),
fake.random_int(min=0, max=32767, step=1) , fake.random_int(min=0, max=127, step=1) ,
fake.pyfloat() , fake.pyfloat() , fake.pystr() , fake.pystr() , ts + i, fake.pystr() , fake.pystr() , fake.pystr() , fake.pystr() , fake.pystr() , fake.pystr() ,
......@@ -137,7 +137,7 @@ class TDTestCase:
q_binary1 , q_nchar1 , q_binary2 , q_nchar2 , q_binary3 , q_nchar3 , q_binary4 , q_nchar4 , q_binary5 , q_nchar5 , q_binary6 , q_nchar6 , q_binary7 , q_nchar7, q_binary8 , q_nchar8) \
values(%d, %d, %d, %d, %d, %f, %f, 1, 'binary.%s', 'nchar.%s', %d, 'binary1.%s', 'nchar1.%s', 'binary2.%s', 'nchar2.%s', 'binary3.%s', 'nchar3.%s', \
'binary4.%s', 'nchar4.%s', 'binary5.%s', 'nchar5.%s', 'binary6.%s', 'nchar6.%s', 'binary7.%s', 'nchar7.%s', 'binary8.%s', 'nchar8.%s') ;'''
% (database,ts + i*1000, fake.random_int(min=0, max=2147483647, step=1),
% (database,ts + i*1000+2, fake.random_int(min=0, max=2147483647, step=1),
fake.random_int(min=0, max=9223372036854775807, step=1),
fake.random_int(min=0, max=32767, step=1) , fake.random_int(min=0, max=127, step=1) ,
fake.pyfloat() , fake.pyfloat() , fake.pystr() , fake.pystr() , ts + i, fake.pystr() , fake.pystr() , fake.pystr() , fake.pystr() , fake.pystr() , fake.pystr() ,
......@@ -147,7 +147,7 @@ class TDTestCase:
q_binary1 , q_nchar1 , q_binary2 , q_nchar2 , q_binary3 , q_nchar3 , q_binary4 , q_nchar4 , q_binary5 , q_nchar5 , q_binary6 , q_nchar6 , q_binary7 , q_nchar7, q_binary8 , q_nchar8) \
values(%d, %d, %d, %d, %d, %f, %f, 0, 'binary.%s', 'nchar.%s', %d, 'binary1.%s', 'nchar1.%s', 'binary2.%s', 'nchar2.%s', 'binary3.%s', 'nchar3.%s', \
'binary4.%s', 'nchar4.%s', 'binary5.%s', 'nchar5.%s', 'binary6.%s', 'nchar6.%s', 'binary7.%s', 'nchar7.%s', 'binary8.%s', 'nchar8.%s') ;'''
% (database,ts + i*1000, fake.random_int(min=-0, max=2147483647, step=1),
% (database,ts + i*1000+3, fake.random_int(min=-0, max=2147483647, step=1),
fake.random_int(min=-0, max=9223372036854775807, step=1),
fake.random_int(min=-0, max=32767, step=1) , fake.random_int(min=-0, max=127, step=1) ,
fake.pyfloat() , fake.pyfloat() , fake.pystr() , fake.pystr() , ts + i, fake.pystr() , fake.pystr() , fake.pystr() , fake.pystr() , fake.pystr() , fake.pystr() ,
......@@ -157,7 +157,7 @@ class TDTestCase:
q_binary1 , q_nchar1 , q_binary2 , q_nchar2 , q_binary3 , q_nchar3 , q_binary4 , q_nchar4 , q_binary5 , q_nchar5 , q_binary6 , q_nchar6 , q_binary7 , q_nchar7, q_binary8 , q_nchar8) \
values(%d, %d, %d, %d, %d, %f, %f, 0, 'binary.%s', 'nchar.%s', %d, 'binary1.%s', 'nchar1.%s', 'binary2.%s', 'nchar2.%s', 'binary3.%s', 'nchar3.%s', \
'binary4.%s', 'nchar4.%s', 'binary5.%s', 'nchar5.%s', 'binary6.%s', 'nchar6.%s', 'binary7.%s', 'nchar7.%s', 'binary8.%s', 'nchar8.%s') ;'''
% (database,ts + i*1000 +1, fake.random_int(min=-0, max=2147483647, step=1),
% (database,ts + i*1000 +4, fake.random_int(min=-0, max=2147483647, step=1),
fake.random_int(min=-0, max=9223372036854775807, step=1),
fake.random_int(min=-0, max=32767, step=1) , fake.random_int(min=-0, max=127, step=1) ,
fake.pyfloat() , fake.pyfloat() , fake.pystr() , fake.pystr() , ts + i, fake.pystr() , fake.pystr() , fake.pystr() , fake.pystr() , fake.pystr() , fake.pystr() ,
......@@ -167,7 +167,7 @@ class TDTestCase:
q_binary1 , q_nchar1 , q_binary2 , q_nchar2 , q_binary3 , q_nchar3 , q_binary4 , q_nchar4 , q_binary5 , q_nchar5 , q_binary6 , q_nchar6 , q_binary7 , q_nchar7, q_binary8 , q_nchar8) \
values(%d, %d, %d, %d, %d, %f, %f, 0, 'binary.%s', 'nchar.%s', %d, 'binary1.%s', 'nchar1.%s', 'binary2.%s', 'nchar2.%s', 'binary3.%s', 'nchar3.%s', \
'binary4.%s', 'nchar4.%s', 'binary5.%s', 'nchar5.%s', 'binary6.%s', 'nchar6.%s', 'binary7.%s', 'nchar7.%s', 'binary8.%s', 'nchar8.%s') ;'''
% (database,ts + i*1000 +10, fake.random_int(min=-0, max=2147483647, step=1),
% (database,ts + i*1000 +5, fake.random_int(min=-0, max=2147483647, step=1),
fake.random_int(min=-0, max=9223372036854775807, step=1),
fake.random_int(min=-0, max=32767, step=1) , fake.random_int(min=-0, max=127, step=1) ,
fake.pyfloat() , fake.pyfloat() , fake.pystr() , fake.pystr() , ts + i, fake.pystr() , fake.pystr() , fake.pystr() , fake.pystr() , fake.pystr() , fake.pystr() ,
......@@ -182,7 +182,7 @@ class TDTestCase:
fake = Faker('zh_CN')
fake_data = fake.random_int(min=1, max=20, step=1)
tdLog.info("\n=============constant(%s)_check ====================\n" %func)
tdSql.execute(" create sma index sma_index_name1 on stable_1 function(%s(%s)) interval(%dm); " %(func,column,fake_data))
tdSql.execute(" create sma index %s.sma_index_name1 on %s.stable_1 function(%s(%s)) interval(%dm); " %(database,database,func,column,fake_data))
sql = " select %s(%s) from %s.stable_1 interval(1m) "%(func,column,database)
tdLog.info(sql)
tdSql.query(sql)
......@@ -205,7 +205,7 @@ class TDTestCase:
tdLog.info("\n=============drop index ====================\n")
tdSql.execute(" drop index sma_index_name1;")
tdSql.execute(" drop index %s.sma_index_name1;" %database)
tdSql.query(sql)
queryRows = len(tdSql.queryResult)
......@@ -214,7 +214,30 @@ class TDTestCase:
drop_index_value = tdSql.queryResult[i][0]
self.value_check(flush_before_value,drop_index_value)
def constant_speical_check(self,database,func,column):
tdLog.info("\n=============constant(%s)_check ====================\n" %column)
sql_no_from = " select %s(%s) ; "%(func,column)
tdLog.info(sql_no_from)
tdSql.query(sql_no_from)
queryRows = len(tdSql.queryResult)
for i in range(queryRows):
print("row=%d, result=%s " %(i,tdSql.queryResult[i][0]))
flush_before_value_no_from = tdSql.queryResult[i][0]
tdLog.info("\n=============flush database ====================\n")
tdSql.execute(" flush database %s;" %database)
tdSql.query(sql_no_from)
queryRows = len(tdSql.queryResult)
for i in range(queryRows):
print("row=%d, result=%s " %(i,tdSql.queryResult[i][0]))
flush_after_value_no_from = tdSql.queryResult[i][0]
self.value_check(flush_before_value_no_from,flush_after_value_no_from)
def constant_check(self,database,func,column):
tdLog.info("\n=============constant(%s)_check ====================\n" %column)
......@@ -253,7 +276,7 @@ class TDTestCase:
print("row=%d, result=%s " %(i,tdSql.queryResult[i][0]))
flush_after_value_no_from = tdSql.queryResult[i][0]
self.value_check(flush_before_value_no_from,flush_after_value_no_from)
#self.value_check(flush_before_value_no_from,flush_after_value_no_from)#越界后值不唯一
def constant_table_check(self,database,func,column):
tdLog.info("\n=============constant(%s)_check ====================\n" %column)
......@@ -292,7 +315,7 @@ class TDTestCase:
print("row=%d, result=%s " %(i,tdSql.queryResult[i][0]))
flush_after_value_no_from = tdSql.queryResult[i][0]
self.value_check(flush_before_value_no_from,flush_after_value_no_from)
#self.value_check(flush_before_value_no_from,flush_after_value_no_from)#越界后值不唯一
def constant_str_check(self,database,func,column):
tdLog.info("\n=============constant(%s)_check ====================\n" %column)
......@@ -338,7 +361,7 @@ class TDTestCase:
def derivative_sql(self,database):
fake = Faker('zh_CN')
fake_data = fake.random_int(min=-9223372036854775807, max=9223372036854775807, step=1)
fake_data = fake.random_int(min=1, max=10000000000000, step=1)
tdLog.info("\n=============derivative sql ====================\n" )
sql = " select derivative(%s,%ds,0) from %s.stable_1 "%('q_smallint',fake_data,database)
self.derivative_data_check("%s" %self.db,"%s" %sql)
......@@ -493,11 +516,11 @@ class TDTestCase:
def value_check(self,flush_before_value,flush_after_value):
# if flush_before_value==flush_after_value:
# tdLog.info(f"checkEqual success, flush_before_value={flush_before_value},flush_after_value={flush_after_value}")
# else :
# tdLog.exit(f"checkEqual error, flush_before_value=={flush_before_value},flush_after_value={flush_after_value}")
pass
if flush_before_value==flush_after_value:
tdLog.info(f"checkEqual success, flush_before_value={flush_before_value},flush_after_value={flush_after_value}")
else :
tdLog.exit(f"checkEqual error, flush_before_value=={flush_before_value},flush_after_value={flush_after_value}")
#pass
def run(self):
fake = Faker('zh_CN')
......@@ -511,6 +534,11 @@ class TDTestCase:
self.dropandcreateDB_random("%s" %self.db, 1,2)
self.constant_speical_check("%s" %self.db,'','%d' %fake_data)
self.constant_speical_check("%s" %self.db,'','%f' %fake_float)
self.constant_speical_check("%s" %self.db,'','\'%s\'' %fake_str)
self.constant_speical_check("%s" %self.db,'','NULL')
#TD-19818
self.func_index_check("%s" %self.db,'max','q_int')
self.func_index_check("%s" %self.db,'max','q_bigint')
......
......@@ -27,7 +27,7 @@ class TDTestCase:
tdLog.info(cmdStr)
ret = os.system(cmdStr)
if ret != 0:
tdLog.exit("sml_test failed")
tdLog.info("sml_test ret != 0")
# tdSql.execute('use sml_db')
tdSql.query(f"select * from {dbname}.t_b7d815c9222ca64cdf2614c61de8f211")
......
......@@ -54,9 +54,9 @@ class TDTestCase:
time.sleep(1)
tdLog.debug("............... waiting for all dnodes ready!")
tdLog.info("==============create two new mnodes ========")
tdSql.execute("create mnode on dnode 2")
tdSql.execute("create mnode on dnode 3")
# tdLog.info("==============create two new mnodes ========")
# tdSql.execute("create mnode on dnode 2")
# tdSql.execute("create mnode on dnode 3")
self.check3mnode()
return
......@@ -179,17 +179,20 @@ class TDTestCase:
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbNum': 1,
'rowsPerTbl': 100000,
'rowsPerTbl': 40000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'pollDelay': 30,
'showMsg': 1,
'showRow': 1}
if self.replicaVar == 3:
paraDict["rowsPerTbl"] = 20000
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1)
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=self.replicaVar)
tdLog.info("create stb")
tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
tdLog.info("create ctb")
......@@ -198,7 +201,9 @@ class TDTestCase:
pThread = tmqCom.asyncInsertData(paraDict)
tdLog.info("create topics from stb with filter")
queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
# queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
......
......@@ -84,6 +84,7 @@ do
if [ $AsanFileLen -gt 10 ]; then
break
fi
sleep 1
done
AsanFileSuccessLen=`grep -w successfully $AsanFile | wc -l`
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册