未验证 提交 946d94a7 编写于 作者: H Hui Li 提交者: GitHub

Merge pull request #14129 from taosdata/test3.0/lihui

test: add test case for tmq
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def checkFileContent(self, consumerId, queryString):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
else:
break
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db1',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbNum': 1,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1}
topicNameList = ['topic1', 'topic2', 'topic3']
expectRowsList = []
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1)
tdLog.info("create stb")
tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
tdLog.info("create ctb")
tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
tdLog.info("insert data")
tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
tdLog.info("create topics from stb with filter")
queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[0] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("0 tmq consume rows error!")
self.checkFileContent(consumerId, queryString)
# reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable()
queryString = "select ts, log(c1), cos(c1) from %s.%s where c1 > 3169" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[1], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
consumerId = 1
topicList = topicNameList[1]
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[1] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0]))
tdLog.exit("1 tmq consume rows error!")
self.checkFileContent(consumerId, queryString)
# reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable()
queryString = "select ts, log(c1), atan(c1) from %s.%s where ts >= %d" %(paraDict['dbName'], paraDict['stbName'], paraDict["startTs"]+6137)
sqlString = "create topic %s as %s" %(topicNameList[2], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
consumerId = 2
topicList = topicNameList[2]
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
# if expectRowsList[2] != resultList[0]:
# tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[2], resultList[0]))
# tdLog.exit("2 tmq consume rows error!")
# self.checkFileContent(consumerId, queryString)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 1 end ...... ")
def run(self):
tdSql.prepare()
self.tmqCase1()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
......@@ -132,3 +132,4 @@ python3 ./test.py -f 7-tmq/db.py
python3 ./test.py -f 7-tmq/tmqError.py
python3 ./test.py -f 7-tmq/schema.py
python3 ./test.py -f 7-tmq/stbFilter.py
python3 ./test.py -f 7-tmq/tmqCheckData.py
......@@ -24,6 +24,8 @@
#include "taos.h"
#include "taoserror.h"
#include "tlog.h"
#include "taosdef.h"
#include "types.h"
#define GREEN "\033[1;32m"
#define NC "\033[0m"
......@@ -49,6 +51,7 @@ typedef struct {
// char autoCommit[8]; // true, false
// char autoOffsetRest[16]; // none, earliest, latest
TdFilePtr pConsumeRowsFile;
int32_t ifCheckData;
int64_t expectMsgCnt;
......@@ -129,7 +132,6 @@ void initLogFile() {
char tmpString[128];
sprintf(filename, "%s/../log/tmqlog_%s.txt", configDir, getCurrentTimeString(tmpString));
// sprintf(filename, "%s/../log/tmqlog.txt", configDir);
#ifdef WINDOWS
for (int i = 2; i < sizeof(filename); i++) {
if (filename[i] == ':') filename[i] = '-';
......@@ -296,35 +298,165 @@ int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) {
return 0;
}
static char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision) {
//if (shell.args.is_raw_time) {
// sprintf(buf, "%" PRId64, val);
// return buf;
//}
time_t tt;
int32_t ms = 0;
if (precision == TSDB_TIME_PRECISION_NANO) {
tt = (time_t)(val / 1000000000);
ms = val % 1000000000;
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
tt = (time_t)(val / 1000000);
ms = val % 1000000;
} else {
tt = (time_t)(val / 1000);
ms = val % 1000;
}
/*
comment out as it make testcases like select_with_tags.sim fail.
but in windows, this may cause the call to localtime crash if tt < 0,
need to find a better solution.
if (tt < 0) {
tt = 0;
}
*/
#ifdef WINDOWS
if (tt < 0) tt = 0;
#endif
if (tt <= 0 && ms < 0) {
tt--;
if (precision == TSDB_TIME_PRECISION_NANO) {
ms += 1000000000;
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
ms += 1000000;
} else {
ms += 1000;
}
}
struct tm *ptm = taosLocalTime(&tt, NULL);
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm);
if (precision == TSDB_TIME_PRECISION_NANO) {
sprintf(buf + pos, ".%09d", ms);
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
sprintf(buf + pos, ".%06d", ms);
} else {
sprintf(buf + pos, ".%03d", ms);
}
return buf;
}
static void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, int32_t length, int32_t precision) {
if (val == NULL) {
taosFprintfFile(pFile, "%s", TSDB_DATA_NULL_STR);
return;
}
int n;
char buf[TSDB_MAX_BYTES_PER_ROW];
switch (field->type) {
case TSDB_DATA_TYPE_BOOL:
taosFprintfFile(pFile, "%d", ((((int32_t)(*((char *)val))) == 1) ? 1 : 0));
break;
case TSDB_DATA_TYPE_TINYINT:
taosFprintfFile(pFile, "%d", *((int8_t *)val));
break;
case TSDB_DATA_TYPE_UTINYINT:
taosFprintfFile(pFile, "%u", *((uint8_t *)val));
break;
case TSDB_DATA_TYPE_SMALLINT:
taosFprintfFile(pFile, "%d", *((int16_t *)val));
break;
case TSDB_DATA_TYPE_USMALLINT:
taosFprintfFile(pFile, "%u", *((uint16_t *)val));
break;
case TSDB_DATA_TYPE_INT:
taosFprintfFile(pFile, "%d", *((int32_t *)val));
break;
case TSDB_DATA_TYPE_UINT:
taosFprintfFile(pFile, "%u", *((uint32_t *)val));
break;
case TSDB_DATA_TYPE_BIGINT:
taosFprintfFile(pFile, "%" PRId64, *((int64_t *)val));
break;
case TSDB_DATA_TYPE_UBIGINT:
taosFprintfFile(pFile, "%" PRIu64, *((uint64_t *)val));
break;
case TSDB_DATA_TYPE_FLOAT:
taosFprintfFile(pFile, "%.5f", GET_FLOAT_VAL(val));
break;
case TSDB_DATA_TYPE_DOUBLE:
n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", length, GET_DOUBLE_VAL(val));
if (n > TMAX(25, length)) {
taosFprintfFile(pFile, "%*.15e", length, GET_DOUBLE_VAL(val));
} else {
taosFprintfFile(pFile, "%s", buf);
}
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_JSON:
memcpy(buf, val, length);
buf[length] = 0;
taosFprintfFile(pFile, "\'%s\'", buf);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
shellFormatTimestamp(buf, *(int64_t *)val, precision);
taosFprintfFile(pFile, "'%s'", buf);
break;
default:
break;
}
}
static void dumpToFileForCheck(TdFilePtr pFile, TAOS_ROW row, TAOS_FIELD* fields, int32_t* length, int32_t num_fields, int32_t precision) {
for (int32_t i = 0; i < num_fields; i++) {
if (i > 0) {
taosFprintfFile(pFile, "\n");
}
shellDumpFieldToFile(pFile, (const char *)row[i], fields + i, length[i], precision);
}
taosFprintfFile(pFile, "\n");
}
static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex) {
char buf[1024];
int32_t totalRows = 0;
// printf("topic: %s\n", tmq_get_topic_name(msg));
int32_t vgroupId = tmq_get_vgroup_id(msg);
const char* dbName = tmq_get_db_name(msg);
taosFprintfFile(g_fp, "msg index:%" PRId64 ", consumerId: %d\n", msgIndex, pInfo->consumerId);
// taosFprintfFile(g_fp, "topic: %s, vgroupId: %d, tableName: %s\n", tmq_get_topic_name(msg), vgroupId,
// tmq_get_table_name(msg));
taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), vgroupId);
taosFprintfFile(g_fp, "consumerId: %d, msg index:%" PRId64 "\n", pInfo->consumerId, msgIndex);
taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId: %d\n", dbName != NULL ? dbName : "invalid table", tmq_get_topic_name(msg), vgroupId);
while (1) {
TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break;
TAOS_FIELD* fields = taos_fetch_fields(msg);
TAOS_FIELD* fields = taos_fetch_fields(msg);
int32_t numOfFields = taos_field_count(msg);
int32_t* length = taos_fetch_lengths(msg);
int32_t precision = taos_result_precision(msg);
const char* tbName = tmq_get_table_name(msg);
dumpToFileForCheck(pInfo->pConsumeRowsFile, row, fields, length, numOfFields, precision);
taos_print_row(buf, row, fields, numOfFields);
const char* tbName = tmq_get_table_name(msg);
if (0 != g_stConfInfo.showRowFlag) {
taosFprintfFile(g_fp, "tbname:%s, rows[%d]: %s\n", (tbName != NULL ? tbName : "null table"), totalRows, buf);
if (0 != g_stConfInfo.saveRowFlag) {
saveConsumeContentToTbl(pInfo, buf);
}
//if (0 != g_stConfInfo.saveRowFlag) {
// saveConsumeContentToTbl(pInfo, buf);
//}
}
totalRows++;
......@@ -463,6 +595,18 @@ void loop_consume(SThreadInfo* pInfo) {
pInfo->consumerId);
pInfo->ts = taosGetTimestampMs();
if (pInfo->ifCheckData) {
char filename[256] = {0};
char tmpString[128];
//sprintf(filename, "%s/../log/consumerid_%d_%s.txt", configDir, pInfo->consumerId, getCurrentTimeString(tmpString));
sprintf(filename, "%s/../log/consumerid_%d.txt", configDir, pInfo->consumerId);
pInfo->pConsumeRowsFile = taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
if (pInfo->pConsumeRowsFile == NULL) {
taosFprintfFile(g_fp, "%s create file fail for save rows content\n", getCurrentTimeString(tmpString));
return;
}
}
while (running) {
TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, g_stConfInfo.consumeDelay * 1000);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册