未验证 提交 fb49aa6d 编写于 作者: H Hui Li 提交者: GitHub

Merge pull request #15330 from taosdata/test3.0/lihui

Test3.0/lihui
......@@ -18,7 +18,7 @@ class TDTestCase:
def __init__(self):
self.snapshot = 0
self.vgroups = 2
self.ctbNum = 1000
self.ctbNum = 100
self.rowsPerTbl = 1000
def init(self, conn, logSql):
......@@ -38,9 +38,9 @@ class TDTestCase:
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 1000,
'ctbNum': 100,
'rowsPerTbl': 1000,
'batchNum': 100,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
......@@ -131,13 +131,13 @@ class TDTestCase:
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
# time.sleep(3)
tmqCom.getStartCommitNotifyFromTmqsim()
tmqCom.getStartConsumeNotifyFromTmqsim()
tdLog.info("================= restart dnode ===========================")
tdDnodes.stop(1)
tdDnodes.start(1)
time.sleep(3)
tdDnodes.stoptaosd(1)
tdDnodes.starttaosd(1)
# time.sleep(3)
tdLog.info("insert process end, and start to check consume result")
tdLog.info(" restart taosd end and wait to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
totalConsumeRows = 0
......@@ -186,7 +186,7 @@ class TDTestCase:
# tdLog.info("****************************************************************************")
tmqCom.waitSubscriptionExit(tdSql)
tmqCom.waitSubscriptionExit(tdSql, topicFromStb1)
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 1 end ...... ")
......@@ -204,11 +204,11 @@ class TDTestCase:
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 1000,
'ctbNum': 100,
'rowsPerTbl': 1000,
'batchNum': 3000,
'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 5,
'pollDelay': 20,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
......@@ -250,15 +250,15 @@ class TDTestCase:
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tmqCom.getStartCommitNotifyFromTmqsim()
tmqCom.getStartConsumeNotifyFromTmqsim()
tdLog.info("================= restart dnode ===========================")
tdDnodes.stop(1)
tdDnodes.start(1)
time.sleep(3)
tdDnodes.stoptaosd(1)
tdDnodes.starttaosd(1)
# time.sleep(3)
# tdLog.info("create some new child table and insert data ")
# paraDict["batchNum"] = 1000
# paraDict["ctbPrefix"] = 'newCtb'
tdLog.info("create some new child table and insert data ")
paraDict["batchNum"] = 100
paraDict["ctbPrefix"] = 'newCtb'
# tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
tdLog.info("insert process end, and start to check consume result")
......@@ -275,6 +275,7 @@ class TDTestCase:
if totalConsumeRows != totalRowsFromQuery:
tdLog.exit("tmq consume rows error!")
tmqCom.waitSubscriptionExit(tdSql, topicFromStb1)
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 2 end ...... ")
......
......@@ -221,7 +221,7 @@ python3 ./test.py -f 7-tmq/tmqDropStb.py
python3 ./test.py -f 7-tmq/tmqDropStbCtb.py
python3 ./test.py -f 7-tmq/tmqDropNtb.py
python3 ./test.py -f 7-tmq/tmqUdf.py
# python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot0.py
python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot0.py
python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot1.py
python3 ./test.py -f 7-tmq/stbTagFilter-1ctb.py
......
......@@ -352,6 +352,29 @@ void ltrim(char* str) {
// return str;
}
int queryDB(TAOS* taos, char* command) {
int retryCnt = 10;
int code;
TAOS_RES* pRes;
while (retryCnt--) {
pRes = taos_query(taos, command);
code = taos_errno(pRes);
if (code != 0) {
taosSsleep(1);
taos_free_result(pRes);
continue;
}
taos_free_result(pRes);
return 0;
}
pError("failed to reason:%s, sql: %s", tstrerror(code), command);
taos_free_result(pRes);
return -1;
}
void addRowsToVgroupId(SThreadInfo* pInfo, int32_t vgroupId, int32_t rows) {
int32_t i;
for (i = 0; i < pInfo->numOfVgroups; i++) {
......@@ -374,30 +397,49 @@ void addRowsToVgroupId(SThreadInfo* pInfo, int32_t vgroupId, int32_t rows) {
}
}
TAOS* createNewTaosConnect() {
TAOS* taos = NULL;
int32_t retryCnt = 10;
while (retryCnt--) {
TAOS* taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
if (NULL != taos) {
return taos;
}
taosSsleep(1);
}
taosFprintfFile(g_fp, "taos_connect() fail\n");
return NULL;
}
int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) {
char sqlStr[1100] = {0};
if (strlen(buf) > 1024) {
taosFprintfFile(g_fp, "The length of one row[%d] is overflow 1024\n", strlen(buf));
taosCloseFile(&g_fp);
exit(-1);
return -1;
}
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
assert(pConn != NULL);
if (pConn == NULL) {
taosFprintfFile(g_fp, "taos_connect() fail, can not save consume result to main script\n");
return -1;
}
sprintf(sqlStr, "insert into %s.content_%d values (%" PRId64 ", \'%s\')", g_stConfInfo.cdbName, pInfo->consumerId,
pInfo->ts++, buf);
TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) {
pError("error in insert consume result, reason:%s\n", taos_errstr(pRes));
taosFprintfFile(g_fp, "error in insert consume result, reason:%s\n", taos_errstr(pRes));
int retCode = queryDB(pConn, sqlStr);
if (retCode != 0) {
taosFprintfFile(g_fp, "error in save consume content\n");
taosCloseFile(&g_fp);
taos_free_result(pRes);
taos_close(pConn);
exit(-1);
}
taos_free_result(pRes);
taos_close(pConn);
return 0;
}
......@@ -591,15 +633,12 @@ static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
int32_t code = tmq_get_raw_meta(msg, &raw);
if(code == TSDB_CODE_SUCCESS){
TAOS_RES* pRes = taos_query(pInfo->taos, "use metadb");
if (taos_errno(pRes) != 0) {
pError("error when use metadb, reason:%s\n", taos_errstr(pRes));
taosFprintfFile(g_fp, "error when use metadb, reason:%s\n", taos_errstr(pRes));
int retCode = queryDB(pInfo->taos, "use metadb");
if (retCode != 0) {
taosFprintfFile(g_fp, "error when use metadb\n");
taosCloseFile(&g_fp);
taos_free_result(pRes);
exit(-1);
}
taos_free_result(pRes);
taosFprintfFile(g_fp, "raw:%p\n", &raw);
taos_write_raw_meta(pInfo->taos, raw);
......@@ -618,19 +657,6 @@ static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
return totalRows;
}
int queryDB(TAOS* taos, char* command) {
TAOS_RES* pRes = taos_query(taos, command);
int code = taos_errno(pRes);
if (code != 0) {
pError("failed to reason:%s, sql: %s", tstrerror(code), command);
taos_free_result(pRes);
return -1;
}
taos_free_result(pRes);
return 0;
}
static void appNothing(void* param, TAOS_RES* res, int32_t numOfRows) {}
int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
......@@ -720,15 +746,12 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
char tmpString[128];
taosFprintfFile(g_fp, "%s, consume id %d result: %s\n", getCurrentTimeString(tmpString), pInfo->consumerId, sqlStr);
TAOS_RES* pRes = taos_query(pInfo->taos, sqlStr);
if (taos_errno(pRes) != 0) {
pError("error in save consumeinfo, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
exit(-1);
int retCode = queryDB(pInfo->taos, sqlStr);
if (retCode != 0) {
taosFprintfFile(g_fp, "consume id %d error in save consume result\n", pInfo->consumerId);
return -1;
}
taos_free_result(pRes);
return 0;
}
......@@ -823,18 +846,18 @@ void loop_consume(SThreadInfo* pInfo) {
void* consumeThreadFunc(void* param) {
SThreadInfo* pInfo = (SThreadInfo*)param;
pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
pInfo->taos = createNewTaosConnect();
if (pInfo->taos == NULL) {
taosFprintfFile(g_fp, "taos_connect() fail, can not notify and save consume result to main scripte\n");
ASSERT(0);
return NULL;
}
build_consumer(pInfo);
build_topic_list(pInfo);
if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) {
taosFprintfFile(g_fp, "create consumer fail! tmq is null or topicList is null\n");
assert(0);
taosFprintfFile(g_fp, "create consumer fail! tmq is null or topicList is null\n");
taos_close(pInfo->taos);
pInfo->taos = NULL;
return NULL;
}
......@@ -842,7 +865,8 @@ void* consumeThreadFunc(void* param) {
if (err != 0) {
pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
taosFprintfFile(g_fp, "tmq_subscribe() fail! reason: %s\n", tmq_err2str(err));
assert(0);
taos_close(pInfo->taos);
pInfo->taos = NULL;
return NULL;
}
......@@ -926,17 +950,20 @@ void parseConsumeInfo() {
int32_t getConsumeInfo() {
char sqlStr[1024] = {0};
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS* pConn = createNewTaosConnect();
if (pConn == NULL) {
taosFprintfFile(g_fp, "taos_connect() fail, can not get consume info for start consumer\n");
return -1;
}
sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.cdbName);
TAOS_RES* pRes = taos_query(pConn, sqlStr);
TAOS_RES *pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) {
pError("error in get consumeinfo, reason:%s\n", taos_errstr(pRes));
taosFprintfFile(g_fp, "error in get consumeinfo, reason:%s\n", taos_errstr(pRes));
taosFprintfFile(g_fp, "error in get consumeinfo for %s\n", taos_errstr(pRes));
taosCloseFile(&g_fp);
taos_free_result(pRes);
exit(-1);
taos_close(pConn);
return -1;
}
TAOS_ROW row = NULL;
......@@ -981,6 +1008,7 @@ int32_t getConsumeInfo() {
taos_free_result(pRes);
parseConsumeInfo();
taos_close(pConn);
return 0;
}
......@@ -1123,7 +1151,6 @@ void* ombConsumeThreadFunc(void* param) {
if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) {
taosFprintfFile(g_fp, "create consumer fail! tmq is null or topicList is null\n");
assert(0);
return NULL;
}
......@@ -1131,7 +1158,6 @@ void* ombConsumeThreadFunc(void* param) {
if (err != 0) {
pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
taosFprintfFile(g_fp, "tmq_subscribe() fail! reason: %s\n", tmq_err2str(err));
assert(0);
return NULL;
}
......@@ -1181,9 +1207,9 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type) {
void* ombProduceThreadFunc(void* param) {
SThreadInfo* pInfo = (SThreadInfo*)param;
pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
pInfo->taos = createNewTaosConnect();
if (pInfo->taos == NULL) {
printf("taos_connect() fail\n");
taosFprintfFile(g_fp, "taos_connect() fail, can not start producers!\n");
return NULL;
}
......@@ -1200,6 +1226,8 @@ void* ombProduceThreadFunc(void* param) {
char* sqlBuf = taosMemoryMalloc(MAX_SQL_LEN);
if (NULL == sqlBuf) {
printf("malloc fail for sqlBuf\n");
taos_close(pInfo->taos);
pInfo->taos = NULL;
return NULL;
}
......@@ -1232,6 +1260,8 @@ void* ombProduceThreadFunc(void* param) {
int64_t affectedRows = queryDbExec(pInfo->taos, sqlBuf, INSERT_TYPE);
if (affectedRows < 0) {
taos_close(pInfo->taos);
pInfo->taos = NULL;
return NULL;
}
......@@ -1266,6 +1296,8 @@ void* ombProduceThreadFunc(void* param) {
}
printf("affectedRowsTotal: %"PRId64"\n", affectedRowsTotal);
taos_close(pInfo->taos);
pInfo->taos = NULL;
return NULL;
}
......@@ -1301,10 +1333,9 @@ void startOmbConsume() {
taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);
if (0 != g_stConfInfo.producers) {
TAOS* taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
TAOS* taos = createNewTaosConnect();
if (taos == NULL) {
taosFprintfFile(g_fp, "taos_connect() fail, can not notify and save consume result to main scripte\n");
ASSERT(0);
taosFprintfFile(g_fp, "taos_connect() fail, can not create db, stbl, ctbl, topic!\n");
return ;
}
......@@ -1357,9 +1388,11 @@ void startOmbConsume() {
taosFprintfFile(g_fp, "==== close tmqlog ====\n");
taosCloseFile(&g_fp);
taos_close(taos);
return;
}
taos_close(taos);
}
// pthread_create one thread to consume
......@@ -1418,7 +1451,11 @@ int main(int32_t argc, char* argv[]) {
return 0;
}
getConsumeInfo();
int32_t retCode = getConsumeInfo();
if (0 != retCode) {
return -1;
}
saveConfigToLogFile();
tmqSetSignalHandle();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册