diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 9881c8cb445a9cea43ff049ad5f8dafe658d2987..7f7fe76139e5803e5e263e879578e03467d38aff 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -249,13 +249,6 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_NODE_NOT_DEPLOYED || \ (_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_APP_NOT_READY) -#define NEED_SCHEDULER_RETRY_ERROR(_code) \ - (NEED_SCHEDULER_REDIRECT_ERROR(_code) || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \ - (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR || (_code) == TSDB_CODE_RPC_BROKEN_LINK) - - - - #define REQUEST_TOTAL_EXEC_TIMES 2 #define qFatal(...) \ diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index b6009ebbda83b9c22248059f22273ed6b5c2e88e..7d42bbdad979014aa00d7d04621f58ffb925e6a0 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -786,6 +786,7 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) { void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) { SRequestObj* pRequest = (SRequestObj*)param; pRequest->code = code; + pRequest->body.resInfo.execRes = pResult->res; if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) { @@ -797,6 +798,8 @@ void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) { } } + taosMemoryFree(pResult); + tscDebug("0x%" PRIx64 " enter scheduler exec cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), pRequest->requestId); diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 94a11087106b730e2e2df1a6e1acfd92b6fd8d45..aaa8274ce8808a2f47e4317624603667b498aa6f 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -276,8 +276,6 @@ extern SSchedulerMgmt schMgmt; #define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1) #define SCH_TASK_EID(_task) ((_task) ? (_task)->execId : -1) -#define SCH_SET_TASK_LASTMSG_TYPE(_task, _type) do { if(_task) { atomic_store_32(&(_task)->lastMsgType, _type); } } while (0) -#define SCH_GET_TASK_LASTMSG_TYPE(_task) ((_task) ? atomic_load_32(&(_task)->lastMsgType) : -1) #define SCH_IS_DATA_SRC_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) #define SCH_IS_DATA_SRC_TASK(task) (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY)) @@ -309,7 +307,10 @@ extern SSchedulerMgmt schMgmt; #define SCH_IS_NEED_DROP_JOB(_job) (SCH_IS_QUERY_JOB(_job)) #define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode) #define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL) -#define SCH_SUB_TASK_NETWORK_ERR(_code, _len) (((_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_RPC_BROKEN_LINK) && ((_len) > 0)) +#define SCH_SUB_TASK_NETWORK_ERR(_code, _len) (SCH_NETWORK_ERR(_code) && ((_len) > 0)) +#define SCH_NEED_REDIRECT_MSGTYPE(_msgType) ((_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || (_msgType) == TDMT_SCH_FETCH) +#define SCH_NEED_REDIRECT(_msgType, _code, _rspLen) (SCH_NEED_REDIRECT_MSGTYPE(_msgType) && (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_SUB_TASK_NETWORK_ERR(_code, _rspLen))) +#define SCH_NEED_RETRY(_msgType, _code) ((SCH_NETWORK_ERR(_code) && SCH_NEED_REDIRECT_MSGTYPE(_msgType)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR) #define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum) #define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse]) diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 425ea242fdbd3b01e6808583128b6f23fdba5351..6e4838e50ce17f7abfa6dbf081c5bfd6b7523737 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -835,7 +835,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo return TSDB_CODE_SUCCESS; } - if (!NEED_SCHEDULER_RETRY_ERROR(errCode)) { + if (!SCH_NEED_RETRY(pTask->lastMsgType, errCode)) { *needRetry = false; SCH_TASK_DLOG("task no more retry cause of errCode, errCode:%x - %s", errCode, tstrerror(errCode)); return TSDB_CODE_SUCCESS; diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 69e41d31111a2ed3ec5f237635a82a839c428fcd..02e7b085ae7bddaa75213509b1b0c9f862bdf516 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -23,7 +23,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { - int32_t lastMsgType = SCH_GET_TASK_LASTMSG_TYPE(pTask); + int32_t lastMsgType = pTask->lastMsgType; int32_t taskStatus = SCH_GET_TASK_STATUS(pTask); int32_t reqMsgType = msgType - 1; switch (msgType) { @@ -42,7 +42,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy TMSG_INFO(msgType)); } - SCH_SET_TASK_LASTMSG_TYPE(pTask, -1); + //SCH_SET_TASK_LASTMSG_TYPE(pTask, -1); return TSDB_CODE_SUCCESS; case TDMT_SCH_FETCH_RSP: if (lastMsgType != reqMsgType && -1 != lastMsgType) { @@ -57,7 +57,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } - SCH_SET_TASK_LASTMSG_TYPE(pTask, -1); + //SCH_SET_TASK_LASTMSG_TYPE(pTask, -1); return TSDB_CODE_SUCCESS; case TDMT_VND_CREATE_TABLE_RSP: case TDMT_VND_DROP_TABLE_RSP: @@ -82,7 +82,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } - SCH_SET_TASK_LASTMSG_TYPE(pTask, -1); + //SCH_SET_TASK_LASTMSG_TYPE(pTask, -1); return TSDB_CODE_SUCCESS; } @@ -396,7 +396,8 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { SCH_ERR_JRET(schValidateReceivedMsgType(pJob, pTask, msgType)); - if (NEED_SCHEDULER_REDIRECT_ERROR(rspCode) || SCH_SUB_TASK_NETWORK_ERR(rspCode, pMsg->len > 0)) { + int32_t reqType = IsReq(pMsg) ? pMsg->msgType : (pMsg->msgType - 1); + if (SCH_NEED_REDIRECT(reqType, rspCode, pMsg->len)) { code = schHandleRedirect(pJob, pTask, (SDataBuf *)pMsg, rspCode); goto _return; } @@ -855,6 +856,9 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery addr->nodeId, epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, trans->pTrans, trans->pHandle); + if (pTask) { + pTask->lastMsgType = msgType; + } int64_t transporterId = 0; code = asyncSendMsgToServerExt(trans->pTrans, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx); @@ -1098,8 +1102,6 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, break; } - SCH_SET_TASK_LASTMSG_TYPE(pTask, msgType); - SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)}; SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL))); @@ -1112,7 +1114,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, _return: - SCH_SET_TASK_LASTMSG_TYPE(pTask, -1); + pTask->lastMsgType = -1; schFreeRpcCtx(&rpcCtx); taosMemoryFreeClear(msg); diff --git a/tests/system-test/1-insert/insertWithMoreVgroup.py b/tests/system-test/1-insert/insertWithMoreVgroup.py index e3a20cebf88f08652a9a1524754e1e3513382636..80468509eecc28b0f0d6f40af8124e0f11805d71 100644 --- a/tests/system-test/1-insert/insertWithMoreVgroup.py +++ b/tests/system-test/1-insert/insertWithMoreVgroup.py @@ -355,40 +355,6 @@ class TDTestCase: return - def test_case4(self): - self.taosBenchCreate("127.0.0.1","no","db1", "stb1", 1, 2, 1*10) - tdSql.execute("use db1;") - tdSql.query("show dnodes;") - dnodeId=tdSql.getData(0,0) - print(dnodeId) - tdSql.execute("create qnode on dnode %s"%dnodeId) - tdSql.query("select max(c1) from stb10;") - maxQnode=tdSql.getData(0,0) - tdSql.query("select min(c1) from stb11;") - minQnode=tdSql.getData(0,0) - tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;") - unionQnode=tdSql.queryResult - tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;") - unionallQnode=tdSql.queryResult - - # tdSql.query("show qnodes;") - # qnodeId=tdSql.getData(0,0) - tdSql.execute("drop qnode on dnode %s"%dnodeId) - tdSql.execute("reset query cache") - tdSql.query("select max(c1) from stb10;") - tdSql.checkData(0, 0, "%s"%maxQnode) - tdSql.query("select min(c1) from stb11;") - tdSql.checkData(0, 0, "%s"%minQnode) - tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;") - unionVnode=tdSql.queryResult - assert unionQnode == unionVnode - tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;") - unionallVnode=tdSql.queryResult - assert unionallQnode == unionallVnode - - - # tdSql.execute("create qnode on dnode %s"%dnodeId) - # run case def run(self): diff --git a/tests/system-test/1-insert/test_stmt_insert_query_ex.py b/tests/system-test/1-insert/test_stmt_insert_query_ex.py index 2a66df866d9679f16c9372777f0babfeabe24a5a..2a64c09ff4c09d4ca41e2ac0f5d621141eb05bea 100644 --- a/tests/system-test/1-insert/test_stmt_insert_query_ex.py +++ b/tests/system-test/1-insert/test_stmt_insert_query_ex.py @@ -82,7 +82,7 @@ class TDTestCase: return con def test_stmt_set_tbname_tag(self,conn): - dbname = "stmt_set_tbname_tag" + dbname = "stmt_tag" try: conn.execute("drop database if exists %s" % dbname) @@ -196,31 +196,31 @@ class TDTestCase: assert rows9[0][0] == 12, 'fourth case is failed' assert rows9[1][0] == 12, 'fourth case is failed' - # #query: conversion Functions + #query: conversion Functions - # querystmt4=conn.statement("select cast( ? as bigint) from log ") - # queryparam4=new_bind_params(1) - # print(type(queryparam4)) - # queryparam4[0].binary('1232a') - # querystmt4.bind_param(queryparam4) - # querystmt4.execute() - # result4=querystmt4.use_result() - # rows4=result4.fetch_all() - # print("5",rows4) - # assert rows4[0][0] == 1232 - # assert rows4[1][0] == 1232 + querystmt4=conn.statement("select cast( ? as bigint) from log ") + queryparam4=new_bind_params(1) + print(type(queryparam4)) + queryparam4[0].binary('1232a') + querystmt4.bind_param(queryparam4) + querystmt4.execute() + result4=querystmt4.use_result() + rows4=result4.fetch_all() + print("5",rows4) + assert rows4[0][0] == 1232 + assert rows4[1][0] == 1232 - # querystmt4=conn.statement("select cast( ? as binary(10)) from log ") - # queryparam4=new_bind_params(1) - # print(type(queryparam4)) - # queryparam4[0].int(123) - # querystmt4.bind_param(queryparam4) - # querystmt4.execute() - # result4=querystmt4.use_result() - # rows4=result4.fetch_all() - # print("6",rows4) - # assert rows4[0][0] == '123' - # assert rows4[1][0] == '123' + querystmt4=conn.statement("select cast( ? as binary(10)) from log ") + queryparam4=new_bind_params(1) + print(type(queryparam4)) + queryparam4[0].int(123) + querystmt4.bind_param(queryparam4) + querystmt4.execute() + result4=querystmt4.use_result() + rows4=result4.fetch_all() + print("6",rows4) + assert rows4[0][0] == '123' + assert rows4[1][0] == '123' # #query: datatime Functions diff --git a/tests/system-test/1-insert/test_stmt_muti_insert_query.py b/tests/system-test/1-insert/test_stmt_muti_insert_query.py index 971be6c85fb20e4960fe902cd3b329acb4931237..de10b2f6b9b848907799eb86fff9b26e1d2f192e 100644 --- a/tests/system-test/1-insert/test_stmt_muti_insert_query.py +++ b/tests/system-test/1-insert/test_stmt_muti_insert_query.py @@ -84,21 +84,21 @@ class TDTestCase: def test_stmt_insert_multi(self,conn): # type: (TaosConnection) -> None - dbname = "pytest_taos_stmt_multi" + dbname = "db_stmt" try: conn.execute("drop database if exists %s" % dbname) conn.execute("create database if not exists %s" % dbname) conn.select_db(dbname) conn.execute( - "create table if not exists log(ts timestamp, bo bool, nil tinyint, ti tinyint, si smallint, ii int,\ + "create table if not exists stb1(ts timestamp, bo bool, nil tinyint, ti tinyint, si smallint, ii int,\ bi bigint, tu tinyint unsigned, su smallint unsigned, iu int unsigned, bu bigint unsigned, \ ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)", ) # conn.load_table_info("log") start = datetime.now() - stmt = conn.statement("insert into log values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") + stmt = conn.statement("insert into stb1 values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") params = new_multi_binds(16) params[0].timestamp((1626861392589, 1626861392590, 1626861392591)) @@ -125,7 +125,7 @@ class TDTestCase: assert stmt.affected_rows == 3 #query 1 - querystmt=conn.statement("select ?,bu from log") + querystmt=conn.statement("select ?,bu from stb1") queryparam=new_bind_params(1) print(type(queryparam)) queryparam[0].binary("ts") @@ -135,7 +135,7 @@ class TDTestCase: # rows=result.fetch_all() # print( querystmt.use_result()) - # result = conn.query("select * from log") + # result = conn.query("select * from stb1") rows=result.fetch_all() # rows=result.fetch_all() print(rows) @@ -144,7 +144,7 @@ class TDTestCase: assert rows[2][1] == None #query 2 - querystmt1=conn.statement("select * from log where bu < ?") + querystmt1=conn.statement("select * from stb1 where bu < ?") queryparam1=new_bind_params(1) print(type(queryparam1)) queryparam1[0].int(4) diff --git a/tests/system-test/2-query/queryQnode.py b/tests/system-test/2-query/queryQnode.py index 15b643f5d7e73754710d7aca2de6cafef054cefd..0011fe248f2c951f4829cc02d9f718310e429789 100644 --- a/tests/system-test/2-query/queryQnode.py +++ b/tests/system-test/2-query/queryQnode.py @@ -17,6 +17,8 @@ import threading as thd import multiprocessing as mp from numpy.lib.function_base import insert import taos +from util.dnodes import TDDnode +from util.dnodes import * from util.log import * from util.cases import * from util.sql import * @@ -30,9 +32,9 @@ class TDTestCase: # # --------------- main frame ------------------- # - clientCfgDict = {'queryproxy': '1','debugFlag': 135} - clientCfgDict["queryproxy"] = '2' - clientCfgDict["debugFlag"] = 143 + clientCfgDict = {'queryPolicy': '1','debugFlag': 135} + clientCfgDict["queryPolicy"] = '1' + clientCfgDict["debugFlag"] = 131 updatecfgDict = {'clientCfg': {}} updatecfgDict = {'debugFlag': 143} @@ -62,7 +64,7 @@ class TDTestCase: return buildPath # init - def init(self, conn, logSql): + def init(self, conn, logSql=True): tdLog.debug("start to execute %s" % __file__) tdSql.init(conn.cursor()) # tdSql.prepare() @@ -292,12 +294,13 @@ class TDTestCase: tdLog.debug("-----create database and muti-thread create tables test------- ") - def test_case4(self): + def test_case1(self): self.taosBenchCreate("127.0.0.1","no","db1", "stb1", 1, 2, 1*10) tdSql.execute("use db1;") tdSql.query("show dnodes;") dnodeId=tdSql.getData(0,0) print(dnodeId) + tdLog.debug("create qnode on dnode %s"%dnodeId) tdSql.execute("create qnode on dnode %s"%dnodeId) tdSql.query("select max(c1) from stb10;") maxQnode=tdSql.getData(0,0) @@ -310,6 +313,7 @@ class TDTestCase: # tdSql.query("show qnodes;") # qnodeId=tdSql.getData(0,0) + tdLog.debug("drop qnode on dnode %s"%dnodeId) tdSql.execute("drop qnode on dnode %s"%dnodeId) tdSql.execute("reset query cache") tdSql.query("select max(c1) from stb10;") @@ -323,15 +327,156 @@ class TDTestCase: unionallVnode=tdSql.queryResult assert unionallQnode == unionallVnode + queryPolicy=2 + simClientCfg="%s/taos.cfg"%tdDnodes.getSimCfgPath() + cmd='sed -i "s/^queryPolicy.*/queryPolicy 2/g" %s'%simClientCfg + os.system(cmd) + # tdDnodes.stop(1) + # tdDnodes.start(1) + tdSql.execute("reset query cache") + tdSql.execute('alter local "queryPolicy" "%d"'%queryPolicy) + tdSql.query("show local variables;") + for i in range(tdSql.queryRows): + if tdSql.queryResult[i][0] == "queryPolicy" : + if int(tdSql.queryResult[i][1]) == int(queryPolicy): + tdLog.success('alter queryPolicy to %d successfully'%queryPolicy) + else : + tdLog.debug(tdSql.queryResult) + tdLog.exit("alter queryPolicy to %d failed"%queryPolicy) + tdSql.execute("reset query cache") + + tdSql.execute("use db1;") + tdSql.query("show dnodes;") + dnodeId=tdSql.getData(0,0) + tdLog.debug("create qnode on dnode %s"%dnodeId) + + tdSql.execute("create qnode on dnode %s"%dnodeId) + tdSql.query("select max(c1) from stb10;") + assert maxQnode==tdSql.getData(0,0) + tdSql.query("select min(c1) from stb11;") + assert minQnode==tdSql.getData(0,0) + tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;") + assert unionQnode==tdSql.queryResult + tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;") + assert unionallQnode==tdSql.queryResult + + # tdSql.query("show qnodes;") + # qnodeId=tdSql.getData(0,0) + tdLog.debug("drop qnode on dnode %s"%dnodeId) + tdSql.execute("drop qnode on dnode %s"%dnodeId) + tdSql.execute("reset query cache") + tdSql.query("select max(c1) from stb10;") + assert maxQnode==tdSql.getData(0,0) + tdSql.query("select min(c1) from stb11;") + assert minQnode==tdSql.getData(0,0) + tdSql.error("select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;") + tdSql.error("select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;") # tdSql.execute("create qnode on dnode %s"%dnodeId) + queryPolicy=3 + simClientCfg="%s/taos.cfg"%tdDnodes.getSimCfgPath() + cmd='sed -i "s/^queryPolicy.*/queryPolicy 2/g" %s'%simClientCfg + os.system(cmd) + # tdDnodes.stop(1) + # tdDnodes.start(1) + tdSql.execute("reset query cache") + tdSql.execute('alter local "queryPolicy" "%d"'%queryPolicy) + tdSql.query("show local variables;") + for i in range(tdSql.queryRows): + if tdSql.queryResult[i][0] == "queryPolicy" : + if int(tdSql.queryResult[i][1]) == int(queryPolicy): + tdLog.success('alter queryPolicy to %d successfully'%queryPolicy) + else : + tdLog.debug(tdSql.queryResult) + tdLog.exit("alter queryPolicy to %d failed"%queryPolicy) + tdSql.execute("reset query cache") + + tdSql.execute("use db1;") + tdSql.query("show dnodes;") + dnodeId=tdSql.getData(0,0) + tdLog.debug("create qnode on dnode %s"%dnodeId) + + tdSql.execute("create qnode on dnode %s"%dnodeId) + tdSql.query("select max(c1) from stb10;") + assert maxQnode==tdSql.getData(0,0) + tdSql.query("select min(c1) from stb11;") + assert minQnode==tdSql.getData(0,0) + tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;") + assert unionQnode==tdSql.queryResult + tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;") + assert unionallQnode==tdSql.queryResult + + def test_case2(self): + self.taosBenchCreate("127.0.0.1","no","db1", "stb1", 10, 2, 1*10) + tdSql.query("show qnodes") + if tdSql.queryRows == 1 : + tdLog.debug("drop qnode on dnode 1") + tdSql.execute("drop qnode on dnode 1") + queryPolicy=2 + simClientCfg="%s/taos.cfg"%tdDnodes.getSimCfgPath() + cmd='sed -i "s/^queryPolicy.*/queryPolicy 2/g" %s'%simClientCfg + os.system(cmd) + # tdDnodes.stop(1) + # tdDnodes.start(1) + tdSql.execute("reset query cache") + tdSql.execute('alter local "queryPolicy" "%d"'%queryPolicy) + tdSql.query("show local variables;") + for i in range(tdSql.queryRows): + if tdSql.queryResult[i][0] == "queryPolicy" : + if int(tdSql.queryResult[i][1]) == int(queryPolicy): + tdLog.success('alter queryPolicy to %d successfully'%queryPolicy) + else : + tdLog.debug(tdSql.queryResult) + tdLog.exit("alter queryPolicy to %d failed"%queryPolicy) + tdSql.execute("use db1;") + tdSql.error("select max(c1) from stb10;") + tdSql.error("select min(c1) from stb11;") + tdSql.error("select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;") + tdSql.error("select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;") + + tdSql.query("select max(c1) from stb10_0;") + tdSql.query("select min(c1) from stb11_0;") + + def test_case3(self): + + tdSql.execute('alter local "queryPolicy" "3"') + tdLog.debug("create qnode on dnode 1") + tdSql.execute("create qnode on dnode 1") + tdSql.execute("use db1;") + tdSql.query("show dnodes;") + dnodeId=tdSql.getData(0,0) + print(dnodeId) + + tdSql.query("select max(c1) from stb10;") + maxQnode=tdSql.getData(0,0) + tdSql.query("select min(c1) from stb11;") + minQnode=tdSql.getData(0,0) + tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;") + unionQnode=tdSql.queryResult + tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;") + unionallQnode=tdSql.queryResult + + # tdSql.query("show qnodes;") + # qnodeId=tdSql.getData(0,0) + tdLog.debug("drop qnode on dnode %s"%dnodeId) + + tdSql.execute("drop qnode on dnode %s"%dnodeId) + tdSql.execute("reset query cache") + + tdSql.error("select max(c1) from stb10;") + tdSql.error("select min(c1) from stb11;") + tdSql.error("select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;") + tdSql.error("select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;") + # run case def run(self): - # test qnode - self.test_case4() - tdLog.debug(" LIMIT test_case3 ............ [OK]") + self.test_case1() + self.test_case2() + + self.test_case3() + # tdLog.debug(" LIMIT test_case3 ............ [OK]") return diff --git a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py index e7353b0f1336a59a25068616277d74a07df4aab8..80508979c54836325b3a9d1c55caad524144c5b6 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py +++ b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py @@ -65,31 +65,6 @@ class TDTestCase: self._async_raise(thread.ident, SystemExit) - def insertData(self,countstart,countstop): - # fisrt add data : db\stable\childtable\general table - - for couti in range(countstart,countstop): - tdLog.debug("drop database if exists db%d" %couti) - tdSql.execute("drop database if exists db%d" %couti) - print("create database if not exists db%d replica 1 duration 300" %couti) - tdSql.execute("create database if not exists db%d replica 1 duration 300" %couti) - tdSql.execute("use db%d" %couti) - tdSql.execute( - '''create table stb1 - (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) - tags (t1 int) - ''' - ) - tdSql.execute( - ''' - create table t1 - (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) - ''' - ) - for i in range(4): - tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )') - - def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole): tdLog.printNoPrefix("======== test case 1: ") paraDict = {'dbName': 'db', @@ -143,7 +118,8 @@ class TDTestCase: threads=[] for i in range(restartNumbers): dbNameIndex = '%s%d'%(paraDict["dbName"],i) - threads.append(threading.Thread(target=clusterComCreate.create_databases, args=(tdSql, dbNameIndex,paraDict["dbNumbers"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica']))) + newTdSql=tdCom.newTdSql() + threads.append(threading.Thread(target=clusterComCreate.create_databases, args=(newTdSql, dbNameIndex,paraDict["dbNumbers"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica']))) for tr in threads: tr.start() diff --git a/tests/system-test/6-cluster/clusterCommonCheck.py b/tests/system-test/6-cluster/clusterCommonCheck.py index 2d378e623d73635ec992084395b6042c011caf37..cde09b4c5835fe649bc64dd622e021169957ef07 100644 --- a/tests/system-test/6-cluster/clusterCommonCheck.py +++ b/tests/system-test/6-cluster/clusterCommonCheck.py @@ -39,6 +39,7 @@ class ClusterComCheck: def checkDnodes(self,dnodeNumbers): count=0 + # print(tdSql) while count < 5: tdSql.query("show dnodes") # tdLog.debug(tdSql.queryResult) @@ -85,7 +86,7 @@ class ClusterComCheck: tdLog.debug("check %s_%d that status is ready "%(dbNameIndex,j)) else: continue - print(query_status) + # print(query_status) count+=1 if query_status == dbNumbers: tdLog.success("we find cluster with %d dnode and check all databases are ready within 5s! " %dbNumbers) diff --git a/tests/system-test/6-cluster/clusterCommonCreate.py b/tests/system-test/6-cluster/clusterCommonCreate.py index 78aac309a051e4cf392025fb4f52835397656fa5..9d3f74a2e3384a454b0905d64aaff139567f4678 100644 --- a/tests/system-test/6-cluster/clusterCommonCreate.py +++ b/tests/system-test/6-cluster/clusterCommonCreate.py @@ -125,7 +125,6 @@ class ClusterComCreate: def create_databases(self,tsql,dbNameIndex,dbNumbers,dropFlag=1,vgroups=4,replica=1): for i in range(dbNumbers): - print(dbNumbers) if dropFlag == 1: tsql.execute("drop database if exists %s_%d"%(dbNameIndex,i)) tsql.execute("create database if not exists %s_%d vgroups %d replica %d"%(dbNameIndex,i, vgroups, replica)) diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index a4e191b36a30fb2dbf49d374ebeddd3f92023b00..957ee17e0be9747a8e5a41d2815d1b2421805110 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -121,7 +121,7 @@ python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeStopLoop.py -N 5 -M 3 # BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3 # BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 5 -M 3 -# python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 5 -M 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 5 -M 3 # BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py -N 5 -M 3 # BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py -N 5 -M 3 # python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py -N 5 -M 3