diff --git a/Jenkinsfile2 b/Jenkinsfile2 index 920eb0c01d95276f58719a3203f99bbdca4eb93a..32e9b1152061d054f253d3bb431cc7d6159f8c7d 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -127,6 +127,25 @@ def pre_test(){ ''' return 1 } +def pre_test_build_mac() { + sh ''' + hostname + date + ''' + sh ''' + cd ${WK} + rm -rf debug + mkdir debug + ''' + sh ''' + cd ${WK}/debug + cmake .. + make -j8 + ''' + sh ''' + date + ''' +} def pre_test_win(){ bat ''' hostname @@ -334,6 +353,17 @@ pipeline { } } } + stage('mac test') { + agent{label " Mac_catalina "} + steps { + catchError(buildResult: 'FAILURE', stageResult: 'FAILURE') { + timeout(time: 20, unit: 'MINUTES'){ + pre_test() + pre_test_build_mac() + } + } + } + } stage('linux test') { agent{label " worker03 || slave215 || slave217 || slave219 "} options { skipDefaultCheckout() } diff --git a/source/dnode/mgmt/node_mgmt/src/dmProc.c b/source/dnode/mgmt/node_mgmt/src/dmProc.c index 72878d0d853e2067bdd73d42bb4b001ccfdb5295..cbf13924d73e9f2c6b5366e9f6be6254062e84b3 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmProc.c +++ b/source/dnode/mgmt/node_mgmt/src/dmProc.c @@ -87,8 +87,8 @@ static SProcQueue *dmInitProcQueue(SProc *proc, char *ptr, int32_t size) { static void dmCleanupProcQueue(SProcQueue *queue) {} static inline int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, SRpcMsg *pMsg, EProcFuncType ftype) { - const void *pHead = pMsg; - const void *pBody = pMsg->pCont; + const void * pHead = pMsg; + const void * pBody = pMsg->pCont; const int16_t rawHeadLen = sizeof(SRpcMsg); const int32_t rawBodyLen = pMsg->contLen; const int16_t headLen = CEIL8(rawHeadLen); @@ -257,7 +257,7 @@ int32_t dmInitProc(struct SMgmtWrapper *pWrapper) { proc->wrapper = pWrapper; proc->name = pWrapper->name; - SShm *shm = &proc->shm; + SShm * shm = &proc->shm; int32_t cstart = 0; int32_t csize = CEIL8(shm->size / 2); int32_t pstart = csize; @@ -281,13 +281,13 @@ int32_t dmInitProc(struct SMgmtWrapper *pWrapper) { } static void *dmConsumChildQueue(void *param) { - SProc *proc = param; + SProc * proc = param; SMgmtWrapper *pWrapper = proc->wrapper; - SProcQueue *queue = proc->cqueue; + SProcQueue * queue = proc->cqueue; int32_t numOfMsgs = 0; int32_t code = 0; EProcFuncType ftype = DND_FUNC_REQ; - SRpcMsg *pMsg = NULL; + SRpcMsg * pMsg = NULL; dDebug("node:%s, start to consume from cqueue", proc->name); do { @@ -324,13 +324,13 @@ static void *dmConsumChildQueue(void *param) { } static void *dmConsumParentQueue(void *param) { - SProc *proc = param; + SProc * proc = param; SMgmtWrapper *pWrapper = proc->wrapper; - SProcQueue *queue = proc->pqueue; + SProcQueue * queue = proc->pqueue; int32_t numOfMsgs = 0; int32_t code = 0; EProcFuncType ftype = DND_FUNC_REQ; - SRpcMsg *pMsg = NULL; + SRpcMsg * pMsg = NULL; dDebug("node:%s, start to consume from pqueue", proc->name); do { @@ -353,7 +353,7 @@ static void *dmConsumParentQueue(void *param) { rpcRegisterBrokenLinkArg(pMsg); } else if (ftype == DND_FUNC_RELEASE) { dmRemoveProcRpcHandle(proc, pMsg->info.handle); - rpcReleaseHandle(pMsg->info.handle, (int8_t)pMsg->code); + rpcReleaseHandle(&pMsg->info, TAOS_CONN_SERVER); } else { dError("node:%s, invalid ftype:%d from pqueue", proc->name, ftype); rpcFreeCont(pMsg->pCont); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 35d478177a3d77e5f8d05020d366b6e430df6946..df3c9c4e88f1d59ac23f1f56080236c40fdb729a 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -245,7 +245,7 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { SRpcMsg msg = {.code = type, .info = *pHandle}; dmPutToProcPQueue(&pWrapper->proc, &msg, DND_FUNC_RELEASE); } else { - rpcReleaseHandle(pHandle->handle, type); + rpcReleaseHandle(pHandle, type); } } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index c140de24d88d45e9bc86b05bf8e1bc4c6a755b95..a21e97d04999b99a6a175af4befaa250b3376c34 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -262,13 +262,17 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { #define REQUEST_PERSIS_HANDLE(msg) ((msg)->info.persistHandle == 1) #define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release) +#define EPSET_IS_VALID(epSet) ((epSet) != NULL && (epSet)->numOfEps != 0) #define EPSET_GET_SIZE(epSet) (epSet)->numOfEps #define EPSET_GET_INUSE_IP(epSet) ((epSet)->eps[(epSet)->inUse].fqdn) #define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port) -#define EPSET_FORWARD_INUSE(epSet) \ - do { \ - (epSet)->inUse = (++((epSet)->inUse)) % ((epSet)->numOfEps); \ +#define EPSET_FORWARD_INUSE(epSet) \ + do { \ + if ((epSet)->numOfEps != 0) { \ + (epSet)->inUse = (++((epSet)->inUse)) % ((epSet)->numOfEps); \ + } \ } while (0) + #define EPSET_DEBUG_STR(epSet, tbuf) \ do { \ int len = snprintf(tbuf, sizeof(tbuf), "epset:{"); \ @@ -512,7 +516,6 @@ static void allocConnRef(SCliConn* conn, bool update) { } static void addConnToPool(void* pool, SCliConn* conn) { if (conn->status == ConnInPool) { - // assert(0); return; } SCliThrd* thrd = conn->hostThrd; @@ -668,7 +671,6 @@ static void cliSendCb(uv_write_t* req, int status) { void cliSend(SCliConn* pConn) { CONN_HANDLE_BROKEN(pConn); - // assert(taosArrayGetSize(pConn->cliMsgs) > 0); assert(!transQueueEmpty(&pConn->cliMsgs)); SCliMsg* pCliMsg = NULL; @@ -810,6 +812,11 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { STrans* pTransInst = pThrd->pTransInst; cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr); + if (!EPSET_IS_VALID(&pCtx->epSet)) { + destroyCmsg(pMsg); + tError("invalid epset"); + return; + } bool ignore = false; SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore); @@ -1077,12 +1084,14 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { } else { cliCompareAndSwap(&pCtx->retryLimit, TRANS_RETRY_COUNT_LIMIT, TRANS_RETRY_COUNT_LIMIT); if (pCtx->retryCnt < pCtx->retryLimit) { - addConnToPool(pThrd->pool, pConn); if (pResp->contLen == 0) { EPSET_FORWARD_INUSE(&pCtx->epSet); } else { - tDeserializeSEpSet(pResp->pCont, pResp->contLen, &pCtx->epSet); + if (tDeserializeSEpSet(pResp->pCont, pResp->contLen, &pCtx->epSet) < 0) { + tError("%s conn %p failed to deserialize epset", CONN_GET_INST_LABEL(pConn)); + } } + addConnToPool(pThrd->pool, pConn); transFreeMsg(pResp->pCont); cliSchedMsgToNextNode(pMsg, pThrd); return -1; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index d32156dd0d1df4ec2ba4c24688c5b24e342cb352..4f33c8cdc1fd214e7bcce1578f7386ce4d29d161 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -1029,8 +1029,9 @@ void transUnrefSrvHandle(void* handle) { } void transReleaseSrvHandle(void* handle) { - SExHandle* exh = handle; - int64_t refId = exh->refId; + SRpcHandleInfo* info = handle; + SExHandle* exh = info->handle; + int64_t refId = info->refId; ASYNC_CHECK_HANDLE(exh, refId); diff --git a/source/libs/transport/test/transUT.cpp b/source/libs/transport/test/transUT.cpp index 86c483028488fc6a3172d84475b1afe20ece5aba..b55f771ebd5181503d1f85cd1dccb986218f88d1 100644 --- a/source/libs/transport/test/transUT.cpp +++ b/source/libs/transport/test/transUT.cpp @@ -175,7 +175,7 @@ static void processReleaseHandleCb(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) rpcMsg.code = 0; rpcSendResponse(&rpcMsg); - rpcReleaseHandle(pMsg->info.handle, TAOS_CONN_SERVER); + rpcReleaseHandle(&pMsg->info, TAOS_CONN_SERVER); } static void processRegisterFailure(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { { diff --git a/tests/system-test/1-insert/block_wise.py b/tests/system-test/1-insert/block_wise.py new file mode 100644 index 0000000000000000000000000000000000000000..6c779c64d751dba93a9bdb775afd7e6937f66ecb --- /dev/null +++ b/tests/system-test/1-insert/block_wise.py @@ -0,0 +1,442 @@ +import datetime +import re + +from dataclasses import dataclass, field +from typing import List, Any, Tuple +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.constant import * + +PRIMARY_COL = "ts" + +INT_COL = "c_int" +BINT_COL = "c_bint" +SINT_COL = "c_sint" +TINT_COL = "c_tint" +FLOAT_COL = "c_float" +DOUBLE_COL = "c_double" +BOOL_COL = "c_bool" +TINT_UN_COL = "c_utint" +SINT_UN_COL = "c_usint" +BINT_UN_COL = "c_ubint" +INT_UN_COL = "c_uint" +BINARY_COL = "c_binary" +NCHAR_COL = "c_nchar" +TS_COL = "c_ts" + +NUM_COL = [INT_COL, BINT_COL, SINT_COL, TINT_COL, FLOAT_COL, DOUBLE_COL, ] +CHAR_COL = [BINARY_COL, NCHAR_COL, ] +BOOLEAN_COL = [BOOL_COL, ] +TS_TYPE_COL = [TS_COL, ] + +INT_TAG = "t_int" + +ALL_COL = [PRIMARY_COL, INT_COL, BINT_COL, SINT_COL, TINT_COL, FLOAT_COL, DOUBLE_COL, BINARY_COL, NCHAR_COL, BOOL_COL, TS_COL] +TAG_COL = [INT_TAG] + +# insert data args: +TIME_STEP = 10000 +NOW = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000) + +# init db/table +DBNAME = "db" +STBNAME = "stb1" +CTBNAME = "ct1" +NTBNAME = "nt1" + + +@dataclass +class DataSet: + ts_data : List[int] = field(default_factory=list) + int_data : List[int] = field(default_factory=list) + bint_data : List[int] = field(default_factory=list) + sint_data : List[int] = field(default_factory=list) + tint_data : List[int] = field(default_factory=list) + int_un_data : List[int] = field(default_factory=list) + bint_un_data: List[int] = field(default_factory=list) + sint_un_data: List[int] = field(default_factory=list) + tint_un_data: List[int] = field(default_factory=list) + float_data : List[float] = field(default_factory=list) + double_data : List[float] = field(default_factory=list) + bool_data : List[int] = field(default_factory=list) + binary_data : List[str] = field(default_factory=list) + nchar_data : List[str] = field(default_factory=list) + + +@dataclass +class BSMAschema: + creation : str = "CREATE" + tb_type : str = "stable" + tbname : str = STBNAME + cols : Tuple[str] = None + tags : Tuple[str] = None + sma_flag : str = "SMA" + sma_cols : Tuple[str] = None + create_tabel_sql : str = None + other : Any = None + + drop : str = "DROP" + drop_flag : str = "INDEX" + querySmaOptimize : int = 1 + show : str = "SHOW" + show_msg : str = "INDEXES" + show_oper : str = "FROM" + dbname : str = None + rollup_db : bool = False + + def __post_init__(self): + if isinstance(self.other, dict): + for k,v in self.other.items(): + + if k.lower() == "tbname" and isinstance(v, str) and not self.tbname: + self.tbname = v + del self.other[k] + + if k.lower() == "cols" and (isinstance(v, tuple) or isinstance(v, list)) and not self.cols: + self.cols = v + del self.other[k] + + if k.lower() == "tags" and (isinstance(v, tuple) or isinstance(v, list)) and not self.tags: + self.tags = v + del self.other[k] + + if k.lower() == "sma_flag" and isinstance(v, str) and not self.sma_flag: + self.sma_flag = v + del self.other[k] + + if k.lower() == "sma_cols" and (isinstance(v, tuple) or isinstance(v, list)) and not self.sma_cols: + self.sma_cols = v + del self.other[k] + + if k.lower() == "create_tabel_sql" and isinstance(v, str) and not self.create_tabel_sql: + self.create_tabel_sql = v + del self.other[k] + + # bSma show and drop operator is not completed + if k.lower() == "drop_flag" and isinstance(v, str) and not self.drop_flag: + self.drop_flag = v + del self.other[k] + + if k.lower() == "show_msg" and isinstance(v, str) and not self.show_msg: + self.show_msg = v + del self.other[k] + + if k.lower() == "dbname" and isinstance(v, str) and not self.dbname: + self.dbname = v + del self.other[k] + + if k.lower() == "show_oper" and isinstance(v, str) and not self.show_oper: + self.show_oper = v + del self.other[k] + + if k.lower() == "rollup_db" and isinstance(v, bool) and not self.rollup_db: + self.rollup_db = v + del self.other[k] + + + +# from ...pytest.util.sql import * +# from ...pytest.util.constant import * + +class TDTestCase: + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + self.precision = "ms" + self.sma_count = 0 + self.sma_created_index = [] + + def __create_sma_index(self, sma:BSMAschema): + if sma.create_tabel_sql: + sql = sma.create_tabel_sql + else: + sql = f"{sma.creation} {sma.tb_type} {sma.tbname} ({', '.join(sma.cols)}) " + + if sma.tb_type == "stable" or (sma.tb_type=="table" and sma.tags): + sql = f"{sma.creation} {sma.tb_type} {sma.tbname} ({', '.join(sma.cols)}) tags ({', '.join(sma.tags)}) " + + + if sma.sma_flag: + sql += sma.sma_flag + if sma.sma_cols: + sql += f"({', '.join(sma.sma_cols)})" + + if isinstance(sma.other, dict): + for k,v in sma.other.items(): + if isinstance(v,tuple) or isinstance(v, list): + sql += f" {k} ({' '.join(v)})" + else: + sql += f" {k} {v}" + if isinstance(sma.other, tuple) or isinstance(sma.other, list): + sql += " ".join(sma.other) + if isinstance(sma.other, int) or isinstance(sma.other, float) or isinstance(sma.other, str): + sql += f" {sma.other}" + + return sql + + def __get_bsma_table_col_tag_str(self, sql:str): + p = re.compile(r"[(](.*)[)]", re.S) + + if "tags" in (col_str := sql): + col_str = re.findall(p, sql.split("tags")[0])[0].split(",") + if (tag_str := re.findall(p, sql.split("tags")[1])[0].split(",") ): + col_str.extend(tag_str) + + return col_str + + def __get_bsma_col_tag_names(self, col_tags:list): + return [ col_tag.strip().split(" ")[0] for col_tag in col_tags ] + + @property + def __get_db_tbname(self): + tb_list = [] + tdSql.query("show tables") + for row in tdSql.queryResult: + tb_list.append(row[0]) + tdSql.query("show tables") + for row in tdSql.queryResult: + tb_list.append(row[0]) + + return tb_list + + def __bsma_create_check(self, sma:BSMAschema): + if not sma.creation: + return False + if not sma.create_tabel_sql and (not sma.tbname or not sma.tb_type or not sma.cols): + return False + if not sma.create_tabel_sql and (sma.tb_type == "stable" and not sma.tags): + return False + if not sma.sma_flag or not isinstance(sma.sma_flag, str) or sma.sma_flag.upper() != "SMA": + return False + if sma.tbname in self.__get_db_tbname: + return False + + if sma.create_tabel_sql: + col_tag_list = self.__get_bsma_col_tag_names(self.__get_bsma_table_col_tag_str(sma.create_tabel_sql)) + else: + col_str = list(sma.cols) + if sma.tags: + col_str.extend(list(sma.tags)) + col_tag_list = self.__get_bsma_col_tag_names(col_str) + if not sma.sma_cols: + return False + for col in sma.sma_cols: + if col not in col_tag_list: + return False + + return True + + def bsma_create_check(self, sma:BSMAschema): + if self.__bsma_create_check(sma): + tdSql.query(self.__create_sma_index(sma)) + tdLog.info(f"current sql: {self.__create_sma_index(sma)}") + + else: + tdSql.error(self.__create_sma_index(sma)) + + + def __sma_drop_check(self, sma:BSMAschema): + pass + + def sma_drop_check(self, sma:BSMAschema): + pass + + def __show_sma_index(self, sma:BSMAschema): + pass + + def __sma_show_check(self, sma:BSMAschema): + pass + + def sma_show_check(self, sma:BSMAschema): + pass + + @property + def __create_sma_sql(self): + err_sqls = [] + cur_sqls = [] + # err_set + ### case 1: required fields check + err_sqls.append( BSMAschema(creation="", tbname="stb2", cols=(f"{PRIMARY_COL} timestamp", f"{INT_COL} int"), tags=(f"{INT_TAG} int",), sma_cols=(PRIMARY_COL, INT_COL ) ) ) + err_sqls.append( BSMAschema(tbname="", cols=(f"{PRIMARY_COL} timestamp", f"{INT_COL} int"), tags=(f"{INT_TAG} int",), sma_cols=(PRIMARY_COL, INT_COL ) ) ) + err_sqls.append( BSMAschema(tbname="stb2", cols=(), tags=(f"{INT_TAG} int",), sma_cols=(PRIMARY_COL, INT_COL ) ) ) + err_sqls.append( BSMAschema(tbname="stb2", cols=(f"{PRIMARY_COL} timestamp", f"{INT_COL} int"), tags=(), sma_cols=(PRIMARY_COL, INT_COL ) ) ) + err_sqls.append( BSMAschema(tbname="stb2", cols=(f"{PRIMARY_COL} timestamp", f"{INT_COL} int"), tags=(f"{INT_TAG} int",), sma_flag="", sma_cols=(PRIMARY_COL, INT_COL ) ) ) + err_sqls.append( BSMAschema(tbname="stb2", cols=(f"{PRIMARY_COL} timestamp", f"{INT_COL} int"), tags=(f"{INT_TAG} int",), sma_cols=() ) ) + ### case 2: + err_sqls.append( BSMAschema(tbname="stb2", cols=(f"{PRIMARY_COL} timestamp", f"{INT_COL} int"), tags=(f"{INT_TAG} int",), sma_cols=({BINT_COL}) ) ) + + # current_set + cur_sqls.append( BSMAschema(tbname="stb2", cols=(f"{PRIMARY_COL} timestamp", f"{INT_COL} int"), tags=(f"{INT_TAG} int",), sma_cols=(PRIMARY_COL, INT_COL ) ) ) + + return err_sqls, cur_sqls + + def test_create_sma(self): + err_sqls , cur_sqls = self.__create_sma_sql + for err_sql in err_sqls: + self.bsma_create_check(err_sql) + for cur_sql in cur_sqls: + self.bsma_create_check(cur_sql) + + @property + def __drop_sma_sql(self): + err_sqls = [] + cur_sqls = [] + # err_set + ## case 1: required fields check + return err_sqls, cur_sqls + + def test_drop_sma(self): + err_sqls , cur_sqls = self.__drop_sma_sql + for err_sql in err_sqls: + self.sma_drop_check(err_sql) + for cur_sql in cur_sqls: + self.sma_drop_check(cur_sql) + + def all_test(self): + self.test_create_sma() + + def __create_tb(self): + tdLog.printNoPrefix("==========step: create table") + create_stb_sql = f'''create table {STBNAME}( + ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint, + {FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool, + {BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp, + {TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned, + {INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned + ) tags ({INT_TAG} int) + ''' + create_ntb_sql = f'''create table {NTBNAME}( + ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint, + {FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool, + {BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp, + {TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned, + {INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned + ) + ''' + tdSql.execute(create_stb_sql) + tdSql.execute(create_ntb_sql) + + for i in range(4): + tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )') + + def __data_set(self, rows): + data_set = DataSet() + + for i in range(rows): + data_set.ts_data.append(NOW + 1 * (rows - i)) + data_set.int_data.append(rows - i) + data_set.bint_data.append(11111 * (rows - i)) + data_set.sint_data.append(111 * (rows - i) % 32767) + data_set.tint_data.append(11 * (rows - i) % 127) + data_set.int_un_data.append(rows - i) + data_set.bint_un_data.append(11111 * (rows - i)) + data_set.sint_un_data.append(111 * (rows - i) % 32767) + data_set.tint_un_data.append(11 * (rows - i) % 127) + data_set.float_data.append(1.11 * (rows - i)) + data_set.double_data.append(1100.0011 * (rows - i)) + data_set.bool_data.append((rows - i) % 2) + data_set.binary_data.append(f'binary{(rows - i)}') + data_set.nchar_data.append(f'nchar_测试_{(rows - i)}') + + return data_set + + def __insert_data(self): + tdLog.printNoPrefix("==========step: start inser data into tables now.....") + data = self.__data_set(rows=self.rows) + + # now_time = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000) + null_data = '''null, null, null, null, null, null, null, null, null, null, null, null, null, null''' + zero_data = "0, 0, 0, 0, 0, 0, 0, 'binary_0', 'nchar_0', 0, 0, 0, 0, 0" + + for i in range(self.rows): + row_data = f''' + {data.int_data[i]}, {data.bint_data[i]}, {data.sint_data[i]}, {data.tint_data[i]}, {data.float_data[i]}, {data.double_data[i]}, + {data.bool_data[i]}, '{data.binary_data[i]}', '{data.nchar_data[i]}', {data.ts_data[i]}, {data.tint_un_data[i]}, + {data.sint_un_data[i]}, {data.int_un_data[i]}, {data.bint_un_data[i]} + ''' + neg_row_data = f''' + {-1 * data.int_data[i]}, {-1 * data.bint_data[i]}, {-1 * data.sint_data[i]}, {-1 * data.tint_data[i]}, {-1 * data.float_data[i]}, {-1 * data.double_data[i]}, + {data.bool_data[i]}, '{data.binary_data[i]}', '{data.nchar_data[i]}', {data.ts_data[i]}, {1 * data.tint_un_data[i]}, + {1 * data.sint_un_data[i]}, {1 * data.int_un_data[i]}, {1 * data.bint_un_data[i]} + ''' + + tdSql.execute( + f"insert into ct1 values ( {NOW - i * TIME_STEP}, {row_data} )") + tdSql.execute( + f"insert into ct2 values ( {NOW - i * int(TIME_STEP * 0.6)}, {neg_row_data} )") + tdSql.execute( + f"insert into ct4 values ( {NOW - i * int(TIME_STEP * 0.8) }, {row_data} )") + tdSql.execute( + f"insert into {NTBNAME} values ( {NOW - i * int(TIME_STEP * 1.2)}, {row_data} )") + + tdSql.execute( + f"insert into ct2 values ( {NOW + int(TIME_STEP * 0.6)}, {null_data} )") + tdSql.execute( + f"insert into ct2 values ( {NOW - (self.rows + 1) * int(TIME_STEP * 0.6)}, {null_data} )") + tdSql.execute( + f"insert into ct2 values ( {NOW - self.rows * int(TIME_STEP * 0.29) }, {null_data} )") + + tdSql.execute( + f"insert into ct4 values ( {NOW + int(TIME_STEP * 0.8)}, {null_data} )") + tdSql.execute( + f"insert into ct4 values ( {NOW - (self.rows + 1) * int(TIME_STEP * 0.8)}, {null_data} )") + tdSql.execute( + f"insert into ct4 values ( {NOW - self.rows * int(TIME_STEP * 0.39)}, {null_data} )") + + tdSql.execute( + f"insert into {NTBNAME} values ( {NOW + int(TIME_STEP * 1.2)}, {null_data} )") + tdSql.execute( + f"insert into {NTBNAME} values ( {NOW - (self.rows + 1) * int(TIME_STEP * 1.2)}, {null_data} )") + tdSql.execute( + f"insert into {NTBNAME} values ( {NOW - self.rows * int(TIME_STEP * 0.59)}, {null_data} )") + + def run(self): + self.rows = 10 + + tdLog.printNoPrefix("==========step0:all check") + + tdLog.printNoPrefix("==========step1:create table in normal database") + tdSql.prepare() + self.__create_tb() + self.__insert_data() + self.all_test() + + # drop databases, create same name db、stb and sma index + tdSql.prepare() + self.__create_tb() + self.__insert_data() + self.all_test() + + tdLog.printNoPrefix("==========step2:create table in rollup database") + tdSql.execute("create database db3 retentions 1s:4m,2s:8m,3s:12m") + tdSql.execute("use db3") + tdSql.query(f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(first) watermark 5s max_delay 1m sma({INT_COL})") + + tdSql.execute("drop database if exists db1 ") + tdSql.execute("drop database if exists db2 ") + + tdDnodes.stop(1) + tdDnodes.start(1) + + tdLog.printNoPrefix("==========step4:after wal, all check again ") + tdSql.prepare() + self.__create_tb() + self.__insert_data() + self.all_test() + + # drop databases, create same name db、stb and sma index + tdSql.prepare() + self.__create_tb() + self.__insert_data() + self.all_test() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/1-insert/create_retentions.py b/tests/system-test/1-insert/create_retentions.py index 4b37eeb9a5c1e39d281cd9cd9a9209b418e397f1..db902d0031906b79febab9380a82b290c665bffe 100644 --- a/tests/system-test/1-insert/create_retentions.py +++ b/tests/system-test/1-insert/create_retentions.py @@ -1,6 +1,6 @@ import datetime -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import List from util.log import * from util.sql import * @@ -36,36 +36,20 @@ NOW = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000) @dataclass class DataSet: - ts_data : List[int] = None - int_data : List[int] = None - bint_data : List[int] = None - sint_data : List[int] = None - tint_data : List[int] = None - int_un_data : List[int] = None - bint_un_data : List[int] = None - sint_un_data : List[int] = None - tint_un_data : List[int] = None - float_data : List[float] = None - double_data : List[float] = None - bool_data : List[int] = None - binary_data : List[str] = None - nchar_data : List[str] = None - - def __post_init__(self): - self.ts_data = [] - self.int_data = [] - self.bint_data = [] - self.sint_data = [] - self.tint_data = [] - self.int_un_data = [] - self.bint_un_data = [] - self.sint_un_data = [] - self.tint_un_data = [] - self.float_data = [] - self.double_data = [] - self.bool_data = [] - self.binary_data = [] - self.nchar_data = [] + ts_data : List[int] = field(default_factory=list) + int_data : List[int] = field(default_factory=list) + bint_data : List[int] = field(default_factory=list) + sint_data : List[int] = field(default_factory=list) + tint_data : List[int] = field(default_factory=list) + int_un_data : List[int] = field(default_factory=list) + bint_un_data: List[int] = field(default_factory=list) + sint_un_data: List[int] = field(default_factory=list) + tint_un_data: List[int] = field(default_factory=list) + float_data : List[float] = field(default_factory=list) + double_data : List[float] = field(default_factory=list) + bool_data : List[int] = field(default_factory=list) + binary_data : List[str] = field(default_factory=list) + nchar_data : List[str] = field(default_factory=list) class TDTestCase: @@ -107,15 +91,15 @@ class TDTestCase: f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(count) watermark 1min", f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay -1s", f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) watermark -1m", - # f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) watermark 1m ", - # f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) max_delay 1m ", + f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) watermark 1m ", + f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) max_delay 1m ", f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int, {BINARY_COL} binary(16)) tags (tag1 int) rollup(avg) watermark 1s", - f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int, {BINARY_COL} nchar(16)) tags (tag1 int) rollup(avg) max_delay 1m", - # f"create table ntb_1 ({PRIMARY_COL} timestamp, {INT_COL} int, {BINARY_COL} nchar(16)) rollup(avg) watermark 1s max_delay 1s", - # f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int, {BINARY_COL} nchar(16)) tags (tag1 int) " , - # f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) " , - # f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int) " , - # f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int, {BINARY_COL} nchar(16)) " , + f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int, {NCHAR_COL} nchar(16)) tags (tag1 int) rollup(avg) max_delay 1m", + # f"create table ntb_1 ({PRIMARY_COL} timestamp, {INT_COL} int, {NCHAR_COL} nchar(16)) rollup(avg) watermark 1s max_delay 1s", + f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int, {NCHAR_COL} nchar(16)) tags (tag1 int) " , + f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) " , + f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int) " , + f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int, {BINARY_COL} nchar(16)) " , # watermark, max_delay: [0, 900000], [ms, s, m, ?] f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 1u", @@ -136,8 +120,9 @@ class TDTestCase: f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) watermark 5s max_delay 1m", f"create stable stb3 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(max) watermark 5s max_delay 1m", f"create stable stb4 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(sum) watermark 5s max_delay 1m", - # f"create stable stb5 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(last) watermark 5s max_delay 1m", - # f"create stable stb6 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(first) watermark 5s max_delay 1m", + f"create stable stb5 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(last) watermark 5s max_delay 1m", + f"create stable stb6 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(first) watermark 5s max_delay 1m", + f"create stable stb7 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(first) watermark 5s max_delay 1m sma({INT_COL})", ] def test_create_stb(self): @@ -150,7 +135,7 @@ class TDTestCase: # assert "rollup" in tdSql.description tdSql.checkRows(len(self.create_stable_sql_current)) - # tdSql.execute("use db") # because db is a noraml database, not a rollup database, should not be able to create a rollup database + tdSql.execute("use db") # because db is a noraml database, not a rollup database, should not be able to create a rollup stable # tdSql.error(f"create stable nor_db_rollup_stb ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) watermark 5s max_delay 1m") @@ -210,20 +195,6 @@ class TDTestCase: data_set.binary_data.append(f'binary{(rows - i)}') data_set.nchar_data.append(f'nchar_测试_{(rows - i)}') - # neg_data_set.ts_data.append(-1 * i) - # neg_data_set.int_data.append(-i) - # neg_data_set.bint_data.append(-11111 * i) - # neg_data_set.sint_data.append(-111 * i % 32767) - # neg_data_set.tint_data.append(-11 * i % 127) - # neg_data_set.int_un_data.append(-i) - # neg_data_set.bint_un_data.append(-11111 * i) - # neg_data_set.sint_un_data.append(-111 * i % 32767) - # neg_data_set.tint_un_data.append(-11 * i % 127) - # neg_data_set.float_data.append(-1.11 * i) - # neg_data_set.double_data.append(-1100.0011 * i) - # neg_data_set.binary_data.append(f'binary{i}') - # neg_data_set.nchar_data.append(f'nchar_测试_{i}') - return data_set def __insert_data(self): @@ -279,9 +250,14 @@ class TDTestCase: tdLog.printNoPrefix("==========step2:create table in rollup database") tdSql.execute("create database db3 retentions 1s:4m,2s:8m,3s:12m") + + tdSql.execute("drop database if exists db1 ") + tdSql.execute("drop database if exists db2 ") + tdSql.execute("use db3") - self.__create_tb() - self.__insert_data() + # self.__create_tb() + # self.__insert_data() + self.all_test() tdSql.execute("drop database if exists db1 ") tdSql.execute("drop database if exists db2 ") diff --git a/tests/system-test/1-insert/time_range_wise.py b/tests/system-test/1-insert/time_range_wise.py index d4434987a6763bf9d5fde1fb72b2d217150e4b3b..a620a4b51a25682350ed2e5bf6a42678751b489c 100644 --- a/tests/system-test/1-insert/time_range_wise.py +++ b/tests/system-test/1-insert/time_range_wise.py @@ -325,7 +325,7 @@ class TDTestCase: def __sma_create_check(self, sma:SMAschema): if self.updatecfgDict["querySmaOptimize"] == 0: return False - # # TODO: if database is a rollup-db, can not create sma index + # TODO: if database is a rollup-db, can not create sma index # tdSql.query("select database()") # if sma.rollup_db : # return False @@ -493,8 +493,8 @@ class TDTestCase: err_sqls , cur_sqls = self.__drop_sma_sql for err_sql in err_sqls: self.sma_drop_check(err_sql) - # for cur_sql in cur_sqls: - # self.sma_drop_check(cur_sql) + for cur_sql in cur_sqls: + self.sma_drop_check(cur_sql) def all_test(self): self.test_create_sma() @@ -605,24 +605,23 @@ class TDTestCase: tdLog.printNoPrefix("==========step1:create table in normal database") tdSql.prepare() self.__create_tb() - # self.__insert_data() + self.__insert_data() self.all_test() # drop databases, create same name db、stb and sma index - # tdSql.prepare() - # self.__create_tb() - # self.__insert_data() - # self.all_test() - - - - return + tdSql.prepare() + self.__create_tb() + self.__insert_data() + self.all_test() tdLog.printNoPrefix("==========step2:create table in rollup database") tdSql.execute("create database db3 retentions 1s:4m,2s:8m,3s:12m") tdSql.execute("use db3") - self.__create_tb() - self.__insert_data() + # self.__create_tb() + tdSql.execute(f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(first) watermark 5s max_delay 1m sma({INT_COL}) ") + self.all_test() + + # self.__insert_data() tdSql.execute("drop database if exists db1 ") tdSql.execute("drop database if exists db2 ") diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg.py b/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg.py new file mode 100644 index 0000000000000000000000000000000000000000..dd8a0ad33ac40344ebe2cc93e8a1144085f0d24b --- /dev/null +++ b/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg.py @@ -0,0 +1,241 @@ + +import taos +import sys +import time +import socket +import os +import threading +import math + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + def __init__(self): + self.vgroups = 4 + self.ctbNum = 10 + self.rowsPerTbl = 10000 + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + def prepareTestEnv(self): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 10000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 3, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 1} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) + tdLog.info("create stb") + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) + tdLog.info("create ctb") + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + tdLog.info("insert data") + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + + tdLog.info("restart taosd to ensure that the data falls into the disk") + tdDnodes.stop(1) + tdDnodes.start(1) + return + + def tmqCase3(self): + tdLog.printNoPrefix("======== test case 3: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 10000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 10, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 1} + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + topicNameList = ['topic1'] + expectRowsList = [] + tmqCom.initConsumerTable() + + tdLog.info("create topics from stb with filter") + queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) + # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicNameList[0], queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + tdSql.query(queryString) + expectRowsList.append(tdSql.getRows()) + totalRowsInserted = expectRowsList[0] + + # init consume info, and start tmq_sim, then check consume result + tdLog.info("insert consume info to consume processor") + consumerId = 3 + expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] / 3) + topicList = topicNameList[0] + ifcheckdata = 1 + ifManualCommit = 1 + keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + consumerId = 4 + expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2/3) + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor 0") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + tdLog.info("wait the consume result") + + expectRows = 2 + resultList = tmqCom.selectConsumeResult(expectRows) + actConsumeTotalRows = resultList[0] + resultList[1] + + if not (totalRowsInserted == actConsumeTotalRows): + tdLog.info("sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted)) + tdLog.exit("%d tmq consume rows error!"%consumerId) + + time.sleep(10) + for i in range(len(topicNameList)): + tdSql.query("drop topic %s"%topicNameList[i]) + + tdLog.printNoPrefix("======== test case 3 end ...... ") + + def tmqCase4(self): + tdLog.printNoPrefix("======== test case 4: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 10000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 10, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 1} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + topicNameList = ['topic1'] + expectRowsList = [] + tmqCom.initConsumerTable() + + tdLog.info("create topics from stb with filter") + queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) + # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicNameList[0], queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + tdSql.query(queryString) + expectRowsList.append(tdSql.getRows()) + totalRowsInserted = expectRowsList[0] + + # init consume info, and start tmq_sim, then check consume result + tdLog.info("insert consume info to consume processor") + consumerId = 5 + expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"]) + topicList = topicNameList[0] + ifcheckdata = 1 + ifManualCommit = 1 + keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor 0") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + + tdLog.info("wait commit notify") + tmqCom.getStartCommitNotifyFromTmqsim() + + tdLog.info("pkill consume processor") + tdCom.killProcessor("tmq_sim") + + # time.sleep(10) + + # reinit consume info, and start tmq_sim, then check consume result + tmqCom.initConsumerTable() + consumerId = 6 + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor 1") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + tdLog.info("wait the consume result") + + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + + actConsumeTotalRows = resultList[0] + + if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted): + tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) + tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) + tdLog.exit("%d tmq consume rows error!"%consumerId) + + time.sleep(10) + for i in range(len(topicNameList)): + tdSql.query("drop topic %s"%topicNameList[i]) + + tdLog.printNoPrefix("======== test case 4 end ...... ") + + def run(self): + tdSql.prepare() + self.prepareTestEnv() + self.tmqCase3() + self.tmqCase4() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1.py b/tests/system-test/7-tmq/tmqConsFromTsdb1.py index af06c774d6adbf8fdf3ce2db95239d9c62858289..a183eda1c7f05a0116a7d490542ea18ed6de243f 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb1.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb1.py @@ -85,7 +85,7 @@ class TDTestCase: 'rowsPerTbl': 10000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 10, + 'pollDelay': 15, 'showMsg': 1, 'showRow': 1, 'snapshot': 1} diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 8f5791f1c59264feaf25d39ffd9ff9c558b225a7..8f1137510ed50d9736c44a3429ae6900f4d05938 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -19,11 +19,15 @@ python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py python3 ./test.py -f 1-insert/opentsdb_telnet_line_taosc_insert.py python3 ./test.py -f 1-insert/opentsdb_json_taosc_insert.py python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py -python3 ./test.py -f 1-insert/test_stmt_set_tbname_tag.py +python3 ./test.py -f 1-insert/test_stmt_set_tbname_tag.py python3 ./test.py -f 1-insert/alter_stable.py python3 ./test.py -f 1-insert/alter_table.py python3 ./test.py -f 1-insert/insertWithMoreVgroup.py python3 ./test.py -f 1-insert/table_comment.py +python3 ./test.py -f 1-insert/time_range_wise.py +python3 ./test.py -f 1-insert/block_wise.py +python3 ./test.py -f 1-insert/create_retentions.py + #python3 ./test.py -f 1-insert/table_param_ttl.py python3 ./test.py -f 2-query/between.py python3 ./test.py -f 2-query/distinct.py @@ -114,19 +118,19 @@ python3 ./test.py -f 2-query/twa.py python3 ./test.py -f 2-query/irate.py python3 ./test.py -f 2-query/function_null.py -python3 ./test.py -f 2-query/queryQnode.py +python3 ./test.py -f 2-query/queryQnode.py -#python3 ./test.py -f 6-cluster/5dnode1mnode.py +#python3 ./test.py -f 6-cluster/5dnode1mnode.py #python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3 #python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3 #python3 ./test.py -f 6-cluster/5dnode3mnodeStopLoop.py -N 5 -M 3 # BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3 -# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 5 -M 3 +# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 5 -M 3 -# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py -N 5 -M 3 -# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py -N 5 -M 3 +# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py -N 5 -M 3 +# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py -N 5 -M 3 # python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py -N 5 -M 3 -# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py +# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py # python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5 # python3 test.py -f 6-cluster/5dnode3mnodeStopConnect.py -N 5 -M 3 @@ -158,3 +162,4 @@ python3 ./test.py -f 7-tmq/tmqAlterSchema.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb1.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg.py +python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg.py