提交 7fe7b962 编写于 作者: P plum-lihui

test:add test case for tmq

上级 291aab04
...@@ -11,7 +11,9 @@ ...@@ -11,7 +11,9 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from asyncore import loop
from collections import defaultdict from collections import defaultdict
import subprocess
import random import random
import string import string
import threading import threading
...@@ -75,7 +77,7 @@ class TMQCom: ...@@ -75,7 +77,7 @@ class TMQCom:
return resultList return resultList
def startTmqSimProcess(self,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): def startTmqSimProcess(self,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0,alias=0):
buildPath = tdCom.getBuildPath() buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath() cfgPath = tdCom.getClientCfgPath()
if valgrind == 1: if valgrind == 1:
...@@ -88,30 +90,53 @@ class TMQCom: ...@@ -88,30 +90,53 @@ class TMQCom:
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> nul 2>&1 &" shellCmd += "> nul 2>&1 &"
else: else:
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath processorName = buildPath + '/build/bin/tmq_sim'
if alias != 0:
processorNameNew = buildPath + '/build/bin/tmq_sim_new'
shellCmd = 'cp %s %s'%(processorName, processorNameNew)
os.system(shellCmd)
processorName = processorNameNew
shellCmd = 'nohup ' + processorName + ' -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &" shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd) tdLog.info(shellCmd)
os.system(shellCmd) os.system(shellCmd)
def getStartConsumeNotifyFromTmqsim(self,cdbName='cdb'): def stopTmqSimProcess(self, processorName):
while 1: psCmd = "ps -ef|grep -w %s|grep -v grep | awk '{print $2}'"%(processorName)
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
while(processID):
killCmd = "kill -INT %s > /dev/null 2>&1" % processID
os.system(killCmd)
time.sleep(0.2)
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
tdLog.debug("%s is stopped by kill -INT" % (processorName))
def getStartConsumeNotifyFromTmqsim(self,cdbName='cdb',rows=1):
loopFlag = 1
while loopFlag:
tdSql.query("select * from %s.notifyinfo"%cdbName) tdSql.query("select * from %s.notifyinfo"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if (tdSql.getRows() == 1) and (tdSql.getData(0, 1) == 0): actRows = tdSql.getRows()
break if (actRows >= rows):
else: for i in range(actRows):
time.sleep(0.1) if tdSql.getData(i, 1) == 0:
loopFlag = 0
break
time.sleep(0.1)
return return
def getStartCommitNotifyFromTmqsim(self,cdbName='cdb'): def getStartCommitNotifyFromTmqsim(self,cdbName='cdb',rows=2):
while 1: loopFlag = 1
while loopFlag:
tdSql.query("select * from %s.notifyinfo"%cdbName) tdSql.query("select * from %s.notifyinfo"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if tdSql.getRows() == 2 : actRows = tdSql.getRows()
print(tdSql.getData(0, 1), tdSql.getData(1, 1)) if (actRows >= rows):
if tdSql.getData(1, 1) == 1: for i in range(actRows):
break if tdSql.getData(i, 1) == 1:
loopFlag = 0
break
time.sleep(0.1) time.sleep(0.1)
return return
......
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def checkFileContent(self, consumerId, queryString):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
else:
break
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db1',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbNum': 10,
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 20,
'showMsg': 1,
'showRow': 1}
topicNameList = ['topic1', 'topic2']
expectRowsList = []
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1)
tdLog.info("create stb")
tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
tdLog.info("create ctb")
tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
tdLog.info("insert data")
tmqCom.insert_data_2(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
tdLog.info("create topics from stb with filter")
# queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
# create one stb2
paraDict["stbName"] = 'stb2'
paraDict["ctbPrefix"] = 'ctbx'
paraDict["rowsPerTbl"] = 5000
tdLog.info("create stb2")
tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
tdLog.info("create ctb2")
tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
# queryString = "select ts, sin(c1), abs(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
queryString = "select ts, sin(c1), abs(pow(c1,3)) from %s.%s" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[1], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
# tdSql.query(queryString)
# expectRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
topicList = "%s,%s"%(topicNameList[0],topicNameList[1])
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:3000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 1")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
tdLog.info("start consume processor 2")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'],'cdb',0,1)
tdLog.info("async insert data")
pThread = tmqCom.asyncInsertData(paraDict)
tdLog.info("wait consumer commit notify")
tmqCom.getStartCommitNotifyFromTmqsim(rows=4)
tdLog.info("pkill one consume processor")
tmqCom.stopTmqSimProcess('tmq_sim_new')
pThread.join()
tdLog.info("wait the consume result")
expectRows = 2
resultList = tmqCom.selectConsumeResult(expectRows)
actTotalRows = 0
for i in range(len(resultList)):
actTotalRows += resultList[i]
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
expectTotalRows = 0
for i in range(len(expectRowsList)):
expectTotalRows += expectRowsList[i]
tdLog.info("act consume rows: %d, expect consume rows: %d"%(actTotalRows, expectTotalRows))
if expectTotalRows <= resultList[0]:
tdLog.info("act consume rows: %d should >= expect consume rows: %d"%(actTotalRows, expectTotalRows))
tdLog.exit("0 tmq consume rows error!")
# time.sleep(10)
# for i in range(len(topicNameList)):
# tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 1 end ...... ")
def run(self):
tdSql.prepare()
self.tmqCase1()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
...@@ -135,3 +135,4 @@ python3 ./test.py -f 7-tmq/stbFilter.py ...@@ -135,3 +135,4 @@ python3 ./test.py -f 7-tmq/stbFilter.py
python3 ./test.py -f 7-tmq/tmqCheckData.py python3 ./test.py -f 7-tmq/tmqCheckData.py
python3 ./test.py -f 7-tmq/tmqUdf.py python3 ./test.py -f 7-tmq/tmqUdf.py
#python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 5 #python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 5
python3 ./test.py -f 7-tmq/tmqConsumerGroup.py
...@@ -22,9 +22,9 @@ ...@@ -22,9 +22,9 @@
#include <time.h> #include <time.h>
#include "taos.h" #include "taos.h"
#include "taosdef.h"
#include "taoserror.h" #include "taoserror.h"
#include "tlog.h" #include "tlog.h"
#include "taosdef.h"
#include "types.h" #include "types.h"
#define GREEN "\033[1;32m" #define GREEN "\033[1;32m"
...@@ -36,7 +36,11 @@ ...@@ -36,7 +36,11 @@
#define MAX_CONSUMER_THREAD_CNT (16) #define MAX_CONSUMER_THREAD_CNT (16)
#define MAX_VGROUP_CNT (32) #define MAX_VGROUP_CNT (32)
typedef enum { NOTIFY_CMD_START_CONSUM, NOTIFY_CMD_START_COMMIT, NOTIFY_CMD_ID_BUTT } NOTIFY_CMD_ID; typedef enum {
NOTIFY_CMD_START_CONSUM,
NOTIFY_CMD_START_COMMIT,
NOTIFY_CMD_ID_BUTT
}NOTIFY_CMD_ID;
typedef struct { typedef struct {
TdThread thread; TdThread thread;
...@@ -48,8 +52,8 @@ typedef struct { ...@@ -48,8 +52,8 @@ typedef struct {
// char autoOffsetRest[16]; // none, earliest, latest // char autoOffsetRest[16]; // none, earliest, latest
TdFilePtr pConsumeRowsFile; TdFilePtr pConsumeRowsFile;
int32_t ifCheckData; int32_t ifCheckData;
int64_t expectMsgCnt; int64_t expectMsgCnt;
int64_t consumeMsgCnt; int64_t consumeMsgCnt;
int64_t consumeRowCnt; int64_t consumeRowCnt;
...@@ -85,7 +89,6 @@ typedef struct { ...@@ -85,7 +89,6 @@ typedef struct {
int32_t saveRowFlag; int32_t saveRowFlag;
int32_t consumeDelay; // unit s int32_t consumeDelay; // unit s
int32_t numOfThread; int32_t numOfThread;
int32_t useSnapshot;
SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT]; SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT];
} SConfInfo; } SConfInfo;
...@@ -93,8 +96,6 @@ static SConfInfo g_stConfInfo; ...@@ -93,8 +96,6 @@ static SConfInfo g_stConfInfo;
TdFilePtr g_fp = NULL; TdFilePtr g_fp = NULL;
static int running = 1; static int running = 1;
int8_t useSnapshot = 0;
// char* g_pRowValue = NULL; // char* g_pRowValue = NULL;
// TdFilePtr g_fp = NULL; // TdFilePtr g_fp = NULL;
...@@ -126,11 +127,23 @@ char* getCurrentTimeString(char* timeString) { ...@@ -126,11 +127,23 @@ char* getCurrentTimeString(char* timeString) {
return timeString; return timeString;
} }
static void tmqStop(int signum, void *info, void *ctx) {
running = 0;
char tmpString[128];
taosFprintfFile(g_fp, "%s tmqStop() receive stop signal[%d]\n", getCurrentTimeString(tmpString), signum);
}
static void tmqSetSignalHandle() {
taosSetSignal(SIGINT, tmqStop);
}
void initLogFile() { void initLogFile() {
char filename[256]; char filename[256];
char tmpString[128]; char tmpString[128];
sprintf(filename, "%s/../log/tmqlog_%s.txt", configDir, getCurrentTimeString(tmpString)); pid_t process_id = getpid();
sprintf(filename, "%s/../log/tmqlog-%d-%s.txt", configDir, process_id, getCurrentTimeString(tmpString));
#ifdef WINDOWS #ifdef WINDOWS
for (int i = 2; i < sizeof(filename); i++) { for (int i = 2; i < sizeof(filename); i++) {
if (filename[i] == ':') filename[i] = '-'; if (filename[i] == ':') filename[i] = '-';
...@@ -204,8 +217,6 @@ void parseArgument(int32_t argc, char* argv[]) { ...@@ -204,8 +217,6 @@ void parseArgument(int32_t argc, char* argv[]) {
g_stConfInfo.saveRowFlag = atol(argv[++i]); g_stConfInfo.saveRowFlag = atol(argv[++i]);
} else if (strcmp(argv[i], "-y") == 0) { } else if (strcmp(argv[i], "-y") == 0) {
g_stConfInfo.consumeDelay = atol(argv[++i]); g_stConfInfo.consumeDelay = atol(argv[++i]);
} else if (strcmp(argv[i], "-e") == 0) {
useSnapshot = (int8_t)atol(argv[++i]);
} else { } else {
pError("%s unknow para: %s %s", GREEN, argv[++i], NC); pError("%s unknow para: %s %s", GREEN, argv[++i], NC);
exit(-1); exit(-1);
...@@ -299,11 +310,11 @@ int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) { ...@@ -299,11 +310,11 @@ int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) {
return 0; return 0;
} }
static char* shellFormatTimestamp(char* buf, int64_t val, int32_t precision) { static char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision) {
// if (shell.args.is_raw_time) { //if (shell.args.is_raw_time) {
// sprintf(buf, "%" PRId64, val); // sprintf(buf, "%" PRId64, val);
// return buf; // return buf;
// } //}
time_t tt; time_t tt;
int32_t ms = 0; int32_t ms = 0;
...@@ -341,7 +352,7 @@ static char* shellFormatTimestamp(char* buf, int64_t val, int32_t precision) { ...@@ -341,7 +352,7 @@ static char* shellFormatTimestamp(char* buf, int64_t val, int32_t precision) {
} }
} }
struct tm* ptm = taosLocalTime(&tt, NULL); struct tm *ptm = taosLocalTime(&tt, NULL);
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm); size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm);
if (precision == TSDB_TIME_PRECISION_NANO) { if (precision == TSDB_TIME_PRECISION_NANO) {
...@@ -355,8 +366,7 @@ static char* shellFormatTimestamp(char* buf, int64_t val, int32_t precision) { ...@@ -355,8 +366,7 @@ static char* shellFormatTimestamp(char* buf, int64_t val, int32_t precision) {
return buf; return buf;
} }
static void shellDumpFieldToFile(TdFilePtr pFile, const char* val, TAOS_FIELD* field, int32_t length, static void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, int32_t length, int32_t precision) {
int32_t precision) {
if (val == NULL) { if (val == NULL) {
taosFprintfFile(pFile, "%s", TSDB_DATA_NULL_STR); taosFprintfFile(pFile, "%s", TSDB_DATA_NULL_STR);
return; return;
...@@ -366,31 +376,31 @@ static void shellDumpFieldToFile(TdFilePtr pFile, const char* val, TAOS_FIELD* f ...@@ -366,31 +376,31 @@ static void shellDumpFieldToFile(TdFilePtr pFile, const char* val, TAOS_FIELD* f
char buf[TSDB_MAX_BYTES_PER_ROW]; char buf[TSDB_MAX_BYTES_PER_ROW];
switch (field->type) { switch (field->type) {
case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_BOOL:
taosFprintfFile(pFile, "%d", ((((int32_t)(*((char*)val))) == 1) ? 1 : 0)); taosFprintfFile(pFile, "%d", ((((int32_t)(*((char *)val))) == 1) ? 1 : 0));
break; break;
case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_TINYINT:
taosFprintfFile(pFile, "%d", *((int8_t*)val)); taosFprintfFile(pFile, "%d", *((int8_t *)val));
break; break;
case TSDB_DATA_TYPE_UTINYINT: case TSDB_DATA_TYPE_UTINYINT:
taosFprintfFile(pFile, "%u", *((uint8_t*)val)); taosFprintfFile(pFile, "%u", *((uint8_t *)val));
break; break;
case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_SMALLINT:
taosFprintfFile(pFile, "%d", *((int16_t*)val)); taosFprintfFile(pFile, "%d", *((int16_t *)val));
break; break;
case TSDB_DATA_TYPE_USMALLINT: case TSDB_DATA_TYPE_USMALLINT:
taosFprintfFile(pFile, "%u", *((uint16_t*)val)); taosFprintfFile(pFile, "%u", *((uint16_t *)val));
break; break;
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT:
taosFprintfFile(pFile, "%d", *((int32_t*)val)); taosFprintfFile(pFile, "%d", *((int32_t *)val));
break; break;
case TSDB_DATA_TYPE_UINT: case TSDB_DATA_TYPE_UINT:
taosFprintfFile(pFile, "%u", *((uint32_t*)val)); taosFprintfFile(pFile, "%u", *((uint32_t *)val));
break; break;
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
taosFprintfFile(pFile, "%" PRId64, *((int64_t*)val)); taosFprintfFile(pFile, "%" PRId64, *((int64_t *)val));
break; break;
case TSDB_DATA_TYPE_UBIGINT: case TSDB_DATA_TYPE_UBIGINT:
taosFprintfFile(pFile, "%" PRIu64, *((uint64_t*)val)); taosFprintfFile(pFile, "%" PRIu64, *((uint64_t *)val));
break; break;
case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_FLOAT:
taosFprintfFile(pFile, "%.5f", GET_FLOAT_VAL(val)); taosFprintfFile(pFile, "%.5f", GET_FLOAT_VAL(val));
...@@ -411,7 +421,7 @@ static void shellDumpFieldToFile(TdFilePtr pFile, const char* val, TAOS_FIELD* f ...@@ -411,7 +421,7 @@ static void shellDumpFieldToFile(TdFilePtr pFile, const char* val, TAOS_FIELD* f
taosFprintfFile(pFile, "\'%s\'", buf); taosFprintfFile(pFile, "\'%s\'", buf);
break; break;
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
shellFormatTimestamp(buf, *(int64_t*)val, precision); shellFormatTimestamp(buf, *(int64_t *)val, precision);
taosFprintfFile(pFile, "'%s'", buf); taosFprintfFile(pFile, "'%s'", buf);
break; break;
default: default:
...@@ -419,13 +429,12 @@ static void shellDumpFieldToFile(TdFilePtr pFile, const char* val, TAOS_FIELD* f ...@@ -419,13 +429,12 @@ static void shellDumpFieldToFile(TdFilePtr pFile, const char* val, TAOS_FIELD* f
} }
} }
static void dumpToFileForCheck(TdFilePtr pFile, TAOS_ROW row, TAOS_FIELD* fields, int32_t* length, int32_t num_fields, static void dumpToFileForCheck(TdFilePtr pFile, TAOS_ROW row, TAOS_FIELD* fields, int32_t* length, int32_t num_fields, int32_t precision) {
int32_t precision) {
for (int32_t i = 0; i < num_fields; i++) { for (int32_t i = 0; i < num_fields; i++) {
if (i > 0) { if (i > 0) {
taosFprintfFile(pFile, "\n"); taosFprintfFile(pFile, "\n");
} }
shellDumpFieldToFile(pFile, (const char*)row[i], fields + i, length[i], precision); shellDumpFieldToFile(pFile, (const char *)row[i], fields + i, length[i], precision);
} }
taosFprintfFile(pFile, "\n"); taosFprintfFile(pFile, "\n");
} }
...@@ -435,42 +444,40 @@ static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex) ...@@ -435,42 +444,40 @@ static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex)
int32_t totalRows = 0; int32_t totalRows = 0;
// printf("topic: %s\n", tmq_get_topic_name(msg)); // printf("topic: %s\n", tmq_get_topic_name(msg));
int32_t vgroupId = tmq_get_vgroup_id(msg); int32_t vgroupId = tmq_get_vgroup_id(msg);
const char* dbName = tmq_get_db_name(msg); const char* dbName = tmq_get_db_name(msg);
taosFprintfFile(g_fp, "consumerId: %d, msg index:%" PRId64 "\n", pInfo->consumerId, msgIndex); taosFprintfFile(g_fp, "consumerId: %d, msg index:%" PRId64 "\n", pInfo->consumerId, msgIndex);
taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId: %d\n", dbName != NULL ? dbName : "invalid table", taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId: %d\n", dbName != NULL ? dbName : "invalid table", tmq_get_topic_name(msg), vgroupId);
tmq_get_topic_name(msg), vgroupId);
while (1) { while (1) {
TAOS_ROW row = taos_fetch_row(msg); TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break; if (row == NULL) break;
TAOS_FIELD* fields = taos_fetch_fields(msg); TAOS_FIELD* fields = taos_fetch_fields(msg);
int32_t numOfFields = taos_field_count(msg); int32_t numOfFields = taos_field_count(msg);
int32_t* length = taos_fetch_lengths(msg); int32_t* length = taos_fetch_lengths(msg);
int32_t precision = taos_result_precision(msg); int32_t precision = taos_result_precision(msg);
const char* tbName = tmq_get_table_name(msg); const char* tbName = tmq_get_table_name(msg);
#if 0 #if 0
// get schema // get schema
//============================== stub =================================================// //============================== stub =================================================//
for (int32_t i = 0; i < numOfFields; i++) { for (int32_t i = 0; i < numOfFields; i++) {
taosFprintfFile(g_fp, "%02d: name: %s, type: %d, len: %d\n", i, fields[i].name, fields[i].type, fields[i].bytes); taosFprintfFile(g_fp, "%02d: name: %s, type: %d, len: %d\n", i, fields[i].name, fields[i].type, fields[i].bytes);
} }
//============================== stub =================================================// //============================== stub =================================================//
#endif #endif
dumpToFileForCheck(pInfo->pConsumeRowsFile, row, fields, length, numOfFields, precision); dumpToFileForCheck(pInfo->pConsumeRowsFile, row, fields, length, numOfFields, precision);
taos_print_row(buf, row, fields, numOfFields); taos_print_row(buf, row, fields, numOfFields);
if (0 != g_stConfInfo.showRowFlag) { if (0 != g_stConfInfo.showRowFlag) {
taosFprintfFile(g_fp, "tbname:%s, rows[%d]: %s\n", (tbName != NULL ? tbName : "null table"), totalRows, buf); taosFprintfFile(g_fp, "tbname:%s, rows[%d]: %s\n", (tbName != NULL ? tbName : "null table"), totalRows, buf);
// if (0 != g_stConfInfo.saveRowFlag) { //if (0 != g_stConfInfo.saveRowFlag) {
// saveConsumeContentToTbl(pInfo, buf); // saveConsumeContentToTbl(pInfo, buf);
// } //}
} }
totalRows++; totalRows++;
...@@ -493,7 +500,8 @@ int queryDB(TAOS* taos, char* command) { ...@@ -493,7 +500,8 @@ int queryDB(TAOS* taos, char* command) {
return 0; return 0;
} }
static void appNothing(void* param, TAOS_RES* res, int32_t numOfRows) {} static void appNothing(void* param, TAOS_RES* res, int32_t numOfRows) {
}
int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) { int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
char sqlStr[1024] = {0}; char sqlStr[1024] = {0};
...@@ -501,8 +509,11 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) { ...@@ -501,8 +509,11 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
sprintf(sqlStr, "insert into %s.notifyinfo values (%" PRId64 ", %d, %d)", g_stConfInfo.cdbName, now, cmdId, sprintf(sqlStr, "insert into %s.notifyinfo values (%"PRId64", %d, %d)",
pInfo->consumerId); g_stConfInfo.cdbName,
now,
cmdId,
pInfo->consumerId);
taos_query_a(pInfo->taos, sqlStr, appNothing, NULL); taos_query_a(pInfo->taos, sqlStr, appNothing, NULL);
...@@ -512,14 +523,16 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) { ...@@ -512,14 +523,16 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
} }
static int32_t g_once_commit_flag = 0; static int32_t g_once_commit_flag = 0;
static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
pError("tmq_commit_cb_print() commit %d\n", code); pError("tmq_commit_cb_print() commit %d\n", code);
if (0 == g_once_commit_flag) { if (0 == g_once_commit_flag) {
g_once_commit_flag = 1; g_once_commit_flag = 1;
notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT); notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT);
} }
taosFprintfFile(g_fp, "tmq_commit_cb_print() be called\n");
char tmpString[128];
taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString));
} }
void build_consumer(SThreadInfo* pInfo) { void build_consumer(SThreadInfo* pInfo) {
...@@ -551,10 +564,6 @@ void build_consumer(SThreadInfo* pInfo) { ...@@ -551,10 +564,6 @@ void build_consumer(SThreadInfo* pInfo) {
// tmq_conf_set(conf, "auto.offset.reset", "none"); // tmq_conf_set(conf, "auto.offset.reset", "none");
// tmq_conf_set(conf, "auto.offset.reset", "earliest"); // tmq_conf_set(conf, "auto.offset.reset", "earliest");
// tmq_conf_set(conf, "auto.offset.reset", "latest"); // tmq_conf_set(conf, "auto.offset.reset", "latest");
//
if (useSnapshot) {
tmq_conf_set(conf, "experiment.use.snapshot", "true");
}
pInfo->tmq = tmq_consumer_new(conf, NULL, 0); pInfo->tmq = tmq_consumer_new(conf, NULL, 0);
...@@ -609,13 +618,12 @@ void loop_consume(SThreadInfo* pInfo) { ...@@ -609,13 +618,12 @@ void loop_consume(SThreadInfo* pInfo) {
pInfo->consumerId); pInfo->consumerId);
pInfo->ts = taosGetTimestampMs(); pInfo->ts = taosGetTimestampMs();
if (pInfo->ifCheckData) { if (pInfo->ifCheckData) {
char filename[256] = {0}; char filename[256] = {0};
char tmpString[128]; char tmpString[128];
// sprintf(filename, "%s/../log/consumerid_%d_%s.txt", configDir, pInfo->consumerId, //sprintf(filename, "%s/../log/consumerid_%d_%s.txt", configDir, pInfo->consumerId, getCurrentTimeString(tmpString));
// getCurrentTimeString(tmpString)); sprintf(filename, "%s/../log/consumerid_%d.txt", configDir, pInfo->consumerId);
sprintf(filename, "%s/../log/consumerid_%d.txt", configDir, pInfo->consumerId);
pInfo->pConsumeRowsFile = taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); pInfo->pConsumeRowsFile = taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
if (pInfo->pConsumeRowsFile == NULL) { if (pInfo->pConsumeRowsFile == NULL) {
taosFprintfFile(g_fp, "%s create file fail for save rows content\n", getCurrentTimeString(tmpString)); taosFprintfFile(g_fp, "%s create file fail for save rows content\n", getCurrentTimeString(tmpString));
...@@ -634,10 +642,10 @@ void loop_consume(SThreadInfo* pInfo) { ...@@ -634,10 +642,10 @@ void loop_consume(SThreadInfo* pInfo) {
totalMsgs++; totalMsgs++;
if (0 == once_flag) { if (0 == once_flag) {
once_flag = 1; once_flag = 1;
notifyMainScript(pInfo, NOTIFY_CMD_START_CONSUM); notifyMainScript(pInfo, NOTIFY_CMD_START_CONSUM);
} }
if (totalRows >= pInfo->expectMsgCnt) { if (totalRows >= pInfo->expectMsgCnt) {
char tmpString[128]; char tmpString[128];
...@@ -651,6 +659,10 @@ void loop_consume(SThreadInfo* pInfo) { ...@@ -651,6 +659,10 @@ void loop_consume(SThreadInfo* pInfo) {
} }
} }
if (0 == running) {
taosFprintfFile(g_fp, "receive stop signal and not continue consume\n");
}
pInfo->consumeMsgCnt = totalMsgs; pInfo->consumeMsgCnt = totalMsgs;
pInfo->consumeRowCnt = totalRows; pInfo->consumeRowCnt = totalRows;
...@@ -666,7 +678,7 @@ void* consumeThreadFunc(void* param) { ...@@ -666,7 +678,7 @@ void* consumeThreadFunc(void* param) {
pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0); pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
if (pInfo->taos == NULL) { if (pInfo->taos == NULL) {
taosFprintfFile(g_fp, "taos_connect() fail, can not notify and save consume result to main scripte\n"); taosFprintfFile(g_fp, "taos_connect() fail, can not notify and save consume result to main scripte\n");
return NULL; return NULL;
} }
build_consumer(pInfo); build_consumer(pInfo);
...@@ -680,7 +692,7 @@ void* consumeThreadFunc(void* param) { ...@@ -680,7 +692,7 @@ void* consumeThreadFunc(void* param) {
int32_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList); int32_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList);
if (err != 0) { if (err != 0) {
pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
taosFprintfFile(g_fp, "tmq_subscribe()! reason: %s\n", tmq_err2str(err)); taosFprintfFile(g_fp, "tmq_subscribe() fail! reason: %s\n", tmq_err2str(err));
assert(0); assert(0);
return NULL; return NULL;
} }
...@@ -829,6 +841,8 @@ int main(int32_t argc, char* argv[]) { ...@@ -829,6 +841,8 @@ int main(int32_t argc, char* argv[]) {
getConsumeInfo(); getConsumeInfo();
saveConfigToLogFile(); saveConfigToLogFile();
tmqSetSignalHandle();
TdThreadAttr thattr; TdThreadAttr thattr;
taosThreadAttrInit(&thattr); taosThreadAttrInit(&thattr);
taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE); taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册