提交 4511f582 编写于 作者: P plum-lihui

test: add test case for tmq

上级 aad78647
......@@ -61,7 +61,7 @@ class TDTestCase:
tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum))
return
def insert_data(self,dbName,stbName,ctbNum,rowsPerTbl,startTs):
def insert_data(self,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs):
tdLog.debug("start to insert data ............")
tdSql.execute("use %s" %dbName)
pre_insert = "insert into "
......@@ -72,13 +72,15 @@ class TDTestCase:
sql += " %s_%d values "%(stbName,i)
for j in range(rowsPerTbl):
sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j)
if (j > 0) and (j%2000 == 0):
if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)):
tdSql.execute(sql)
if j < rowsPerTbl - 1:
sql = "insert into %s_%d values " %(stbName,i)
else:
sql = "insert into "
#end sql
if sql != pre_insert:
# print(sql)
print("sql:%s"%sql)
#print("insert sql:%s"%sql)
tdSql.execute(sql)
tdLog.debug("insert data ............ [OK]")
return
......@@ -96,6 +98,7 @@ class TDTestCase:
parameterDict["stbName"],\
parameterDict["ctbNum"],\
parameterDict["rowsPerTbl"],\
parameterDict["batchNum"],\
parameterDict["startTs"])
return
......@@ -117,14 +120,82 @@ class TDTestCase:
'vgroups': 1, \
'stbName': 'stb', \
'ctbNum': 10, \
'rowsPerTbl': 10, \
'rowsPerTbl': 10000, \
'batchNum': 10, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
prepareEnvThread.start()
time.sleep(1)
# wait stb ready
while 1:
tdSql.query("show %s.stables"%parameterDict['dbName'])
if tdSql.getRows() == 1:
#if (self.queryRows == 1):
time.sleep(1)
break
tdLog.info("create topics from super table")
topicFromStb = 'topic_stb_column'
topicFromCtb = 'topic_ctb_column'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName']))
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName']))
tdSql.query("show topics")
tdSql.checkRows(2)
topic1 = tdSql.getData(0 , 0)
topic2 = tdSql.getData(1 , 0)
if topic1 != topicFromStb or topic1 != topicFromCtb:
tdLog.exit("topic error")
if topic2 != topicFromStb or topic2 != topicFromCtb:
tdLog.exit("topic error")
tdLog.info("create consume info table and consume result table")
cdbName = 'cdb'
tdSql.query("create database %s"%cdbName)
tdSql.query("create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)")
tdSql.query("create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)")
consumerId = 0
expectmsgcnt = (parameterDict["rowsPerTbl"] / parameterDict["batchNum"] + 1) * parameterDict["ctbNum"]
topicList = topicFromStb
ifcheckdata = 0
keyList = 'group.id:cgrp1, \
enable.auto.commit:false, \
auto.commit.interval.ms:6000, \
auto.offset.reset:none'
sql = "insert into consumeinfo values "
sql += "(now, %d, '%s', '%s', %l64d, %d)"%(consumerId, topicList, keyList, expectmsgcnt, ifcheckdata)
tdSql.query(sql)
tdLog.info("start consume processor")
pollDelay = 5
showMsg = 1
showRow = 1
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s, -g %d, -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd)
os.system(taosCmd)
# wait for data ready
prepareEnvThread.join()
tdLog.info("check consume result")
while 1:
tdSql.query("select * from consumeresult")
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if tdSql.getRows() == 1:
#if (self.queryRows == 1):
time.sleep(1)
break
tdSql.checkData(0 , 1, consumerId)
tdSql.checkData(0 , 2, expectmsgcnt)
tdSql.checkData(0 , 3, expectrowcnt)
tdLog.printNoPrefix("======== test scenario 2: ")
......
......@@ -51,3 +51,7 @@ python3 ./test.py -f 2-query/arcsin.py
python3 ./test.py -f 2-query/arccos.py
python3 ./test.py -f 2-query/arctan.py
# python3 ./test.py -f 2-query/query_cols_tags_and_or.py
python3 ./test.py -f 7-tmq/basic5.py
......@@ -37,6 +37,10 @@ typedef struct {
TdThread thread;
int32_t consumerId;
int32_t autoCommitIntervalMs; // 1000 ms
char autoCommit[8]; // true, false
char autoOffsetRest[16]; // none, earliest, latest
int32_t ifCheckData;
int64_t expectMsgCnt;
......@@ -120,6 +124,9 @@ void saveConfigToLogFile() {
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId);
taosFprintfFile(g_fp, " auto commit: %s\n", g_stConfInfo.stThreads[i].autoCommit);
taosFprintfFile(g_fp, " auto commit interval ms: %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs);
taosFprintfFile(g_fp, " auto offset rest: %s\n", g_stConfInfo.stThreads[i].autoOffsetRest);
taosFprintfFile(g_fp, " Topics: ");
for (int j = 0; j < g_stConfInfo.stThreads[i].numOfTopic; j++) {
taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]);
......@@ -251,7 +258,7 @@ void build_consumer(SThreadInfo* pInfo) {
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
//tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print, NULL);
......@@ -272,6 +279,9 @@ void build_consumer(SThreadInfo* pInfo) {
// tmq_conf_set(conf, "auto.offset.reset", "latest");
pInfo->tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
return;
}
......@@ -324,9 +334,11 @@ void loop_consume(SThreadInfo* pInfo) {
totalMsgs++;
if (totalMsgs >= pInfo->expectMsgCnt) {
taosFprintfFile(g_fp, "==== totalMsgs >= pInfo->expectMsgCnt, so break\n");
break;
}
} else {
taosFprintfFile(g_fp, "==== delay over time, so break\n");
break;
}
}
......@@ -355,6 +367,9 @@ void* consumeThreadFunc(void* param) {
exit(-1);
}
tmq_list_destroy(pInfo->topicList);
pInfo->topicList = NULL;
loop_consume(pInfo);
tmq_commit(pInfo->tmq, NULL, 0);
......@@ -442,6 +457,11 @@ int32_t getConsumeInfo() {
while ((row = taos_fetch_row(pRes))) {
int32_t* lengths = taos_fetch_lengths(pRes);
// set default value
g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000;
memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true"));
memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast"));
for (int i = 0; i < num_fields; ++i) {
if (row[i] == NULL || 0 == i) {
continue;
......@@ -457,6 +477,12 @@ int32_t getConsumeInfo() {
g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t*)row[i]);
} else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t*)row[i]);
} else if ((6 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, row[i], lengths[i]);
} else if ((7 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = *((int32_t*)row[i]);
} else if ((8 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, row[i], lengths[i]);
}
}
numOfThread++;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册