未验证 提交 d538f9b8 编写于 作者: H Hui Li 提交者: GitHub

Merge pull request #12381 from taosdata/test-v3.0/lihui

test: add test case for tmq
...@@ -13,14 +13,12 @@ from util.dnodes import * ...@@ -13,14 +13,12 @@ from util.dnodes import *
class TDTestCase: class TDTestCase:
hostname = socket.gethostname() hostname = socket.gethostname()
rpcDebugFlagVal = '143' #rpcDebugFlagVal = '143'
clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} #clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal #clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal
#updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} #updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal
updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal #print ("===================: ", updatecfgDict)
print ("===================: ", updatecfgDict)
def init(self, conn, logSql): def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
...@@ -43,27 +41,35 @@ class TDTestCase: ...@@ -43,27 +41,35 @@ class TDTestCase:
break break
return buildPath return buildPath
def create_tables(self,dbName,vgroups,stbName,ctbNum,rowsPerTbl): def newcur(self,cfg,host,port):
tdSql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) user = "root"
tdSql.execute("use %s" %dbName) password = "taosdata"
tdSql.execute("create table %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port)
cur=con.cursor()
print(cur)
return cur
def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl):
tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
tsql.execute("use %s" %dbName)
tsql.execute("create table %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName)
pre_create = "create table" pre_create = "create table"
sql = pre_create sql = pre_create
#tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname)) #tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
for i in range(ctbNum): for i in range(ctbNum):
sql += " %s_%d using %s tags(%d)"%(stbName,i,stbName,i+1) sql += " %s_%d using %s tags(%d)"%(stbName,i,stbName,i+1)
if (i > 0) and (i%100 == 0): if (i > 0) and (i%100 == 0):
tdSql.execute(sql) tsql.execute(sql)
sql = pre_create sql = pre_create
if sql != pre_create: if sql != pre_create:
tdSql.execute(sql) tsql.execute(sql)
tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum)) tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum))
return return
def insert_data(self,dbName,stbName,ctbNum,rowsPerTbl,startTs): def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs):
tdLog.debug("start to insert data ............") tdLog.debug("start to insert data ............")
tdSql.execute("use %s" %dbName) tsql.execute("use %s" %dbName)
pre_insert = "insert into " pre_insert = "insert into "
sql = pre_insert sql = pre_insert
...@@ -72,31 +78,38 @@ class TDTestCase: ...@@ -72,31 +78,38 @@ class TDTestCase:
sql += " %s_%d values "%(stbName,i) sql += " %s_%d values "%(stbName,i)
for j in range(rowsPerTbl): for j in range(rowsPerTbl):
sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j) sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j)
if (j > 0) and (j%2000 == 0): if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)):
tdSql.execute(sql) tsql.execute(sql)
sql = "insert into %s_%d values " %(stbName,i) if j < rowsPerTbl - 1:
sql = "insert into %s_%d values " %(stbName,i)
else:
sql = "insert into "
#end sql #end sql
if sql != pre_insert: if sql != pre_insert:
# print(sql) #print("insert sql:%s"%sql)
print("sql:%s"%sql) tsql.execute(sql)
tdSql.execute(sql)
tdLog.debug("insert data ............ [OK]") tdLog.debug("insert data ............ [OK]")
return return
def prepareEnv(self, **parameterDict): def prepareEnv(self, **parameterDict):
print ("input parameters:") print ("input parameters:")
print (parameterDict) print (parameterDict)
self.create_tables(parameterDict["dbName"],\ # create new connector for my thread
tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030)
self.create_tables(tsql,\
parameterDict["dbName"],\
parameterDict["vgroups"],\ parameterDict["vgroups"],\
parameterDict["stbName"],\ parameterDict["stbName"],\
parameterDict["ctbNum"],\ parameterDict["ctbNum"],\
parameterDict["rowsPerTbl"]) parameterDict["rowsPerTbl"])
self.insert_data(parameterDict["dbName"],\ self.insert_data(tsql,\
parameterDict["stbName"],\ parameterDict["dbName"],\
parameterDict["ctbNum"],\ parameterDict["stbName"],\
parameterDict["rowsPerTbl"],\ parameterDict["ctbNum"],\
parameterDict["startTs"]) parameterDict["rowsPerTbl"],\
parameterDict["batchNum"],\
parameterDict["startTs"])
return return
def run(self): def run(self):
...@@ -113,17 +126,114 @@ class TDTestCase: ...@@ -113,17 +126,114 @@ class TDTestCase:
tdLog.printNoPrefix("======== test scenario 1: ") tdLog.printNoPrefix("======== test scenario 1: ")
tdLog.info("step 1: create database, stb, ctb and insert data") tdLog.info("step 1: create database, stb, ctb and insert data")
# create and start thread # create and start thread
parameterDict = {'dbName': 'db', \ parameterDict = {'cfg': '', \
'dbName': 'db', \
'vgroups': 1, \ 'vgroups': 1, \
'stbName': 'stb', \ 'stbName': 'stb', \
'ctbNum': 10, \ 'ctbNum': 10, \
'rowsPerTbl': 10, \ 'rowsPerTbl': 10000, \
'batchNum': 10, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000 'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
prepareEnvThread.start() prepareEnvThread.start()
time.sleep(2)
# wait stb ready
while 1:
#tdSql.query("show %s.stables"%parameterDict['dbName'])
tdSql.query("show db.stables")
#print (self.queryResult)
#print (tdSql.getRows())
if tdSql.getRows() == 1:
break
else:
time.sleep(1)
tdLog.info("create topics from super table")
topicFromStb = 'topic_stb_column'
topicFromCtb = 'topic_ctb_column'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName']))
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName']))
time.sleep(1)
tdSql.query("show topics")
print ("======================================")
#print (self.queryResult)
#tdSql.checkRows(2)
topic1 = tdSql.getData(0 , 0)
topic2 = tdSql.getData(1 , 0)
print (topic1)
print (topic2)
print (topicFromStb)
print (topicFromCtb)
#tdLog.info("show topics: %s, %s"%topic1, topic2)
#if topic1 != topicFromStb or topic1 != topicFromCtb:
# tdLog.exit("topic error1")
#if topic2 != topicFromStb or topic2 != topicFromCtb:
# tdLog.exit("topic error2")
tdLog.info("create consume info table and consume result table")
cdbName = parameterDict["dbName"]
#tdSql.query("create database %s"%cdbName)
#tdSql.query("use %s"%cdbName)
tdSql.query("create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)")
tdSql.query("create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)")
consumerId = 0
expectmsgcnt = (parameterDict["rowsPerTbl"] / parameterDict["batchNum"] ) * parameterDict["ctbNum"]
expectmsgcnt1 = expectmsgcnt + parameterDict["ctbNum"]
topicList = topicFromStb
ifcheckdata = 0
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
sql = "insert into consumeinfo values "
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectmsgcnt1, ifcheckdata)
tdSql.query(sql)
tdLog.info("check stb if there are data")
while 1:
tdSql.query("select count(*) from %s"%parameterDict["stbName"])
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
countOfStb = tdSql.getData(0, 0)
if countOfStb != 0:
tdLog.info("count from stb: %d"%countOfStb)
break
else:
time.sleep(1)
tdLog.info("start consume processor")
pollDelay = 5
showMsg = 1
showRow = 1
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd)
os.system(shellCmd)
# wait for data ready # wait for data ready
prepareEnvThread.join() prepareEnvThread.join()
tdLog.info("insert process end, and start to check consume result")
while 1:
tdSql.query("select * from consumeresult")
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if tdSql.getRows() == 1:
break
else:
time.sleep(5)
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
tdSql.checkData(0 , 1, consumerId)
tdSql.checkData(0 , 2, expectmsgcnt)
tdSql.checkData(0 , 3, expectrowcnt)
tdLog.printNoPrefix("======== test scenario 2: ") tdLog.printNoPrefix("======== test scenario 2: ")
......
...@@ -51,3 +51,7 @@ python3 ./test.py -f 2-query/arcsin.py ...@@ -51,3 +51,7 @@ python3 ./test.py -f 2-query/arcsin.py
python3 ./test.py -f 2-query/arccos.py python3 ./test.py -f 2-query/arccos.py
python3 ./test.py -f 2-query/arctan.py python3 ./test.py -f 2-query/arctan.py
# python3 ./test.py -f 2-query/query_cols_tags_and_or.py # python3 ./test.py -f 2-query/query_cols_tags_and_or.py
python3 ./test.py -f 7-tmq/basic5.py
/* /*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com> * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
* *
* This program is free software: you can use, redistribute, and/or modify * This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3 * it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation. * or later ("AGPL"), as published by the Free Software Foundation.
* *
* This program is distributed in the hope that it will be useful, but WITHOUT * This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. * FITNESS FOR A PARTICULAR PURPOSE.
* *
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <assert.h> #include <assert.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/types.h> #include <sys/types.h>
#include <time.h> #include <time.h>
#include "taos.h" #include "taos.h"
#include "taoserror.h" #include "taoserror.h"
#include "tlog.h" #include "tlog.h"
#define GREEN "\033[1;32m" #define GREEN "\033[1;32m"
#define NC "\033[0m" #define NC "\033[0m"
#define min(a, b) (((a) < (b)) ? (a) : (b)) #define min(a, b) (((a) < (b)) ? (a) : (b))
#define MAX_SQL_STR_LEN (1024 * 1024) #define MAX_SQL_STR_LEN (1024 * 1024)
#define MAX_ROW_STR_LEN (16 * 1024) #define MAX_ROW_STR_LEN (16 * 1024)
#define MAX_CONSUMER_THREAD_CNT (16) #define MAX_CONSUMER_THREAD_CNT (16)
typedef struct { typedef struct {
TdThread thread; TdThread thread;
int32_t consumerId; int32_t consumerId;
int32_t ifCheckData; int32_t autoCommitIntervalMs; // 1000 ms
int64_t expectMsgCnt; char autoCommit[8]; // true, false
char autoOffsetRest[16]; // none, earliest, latest
int64_t consumeMsgCnt;
int64_t consumeRowCnt; int32_t ifCheckData;
int32_t checkresult; int64_t expectMsgCnt;
char topicString[1024]; int64_t consumeMsgCnt;
char keyString[1024]; int64_t consumeRowCnt;
int32_t checkresult;
int32_t numOfTopic;
char topics[32][64]; char topicString[1024];
char keyString[1024];
int32_t numOfKey;
char key[32][64]; int32_t numOfTopic;
char value[32][64]; char topics[32][64];
tmq_t* tmq; int32_t numOfKey;
tmq_list_t* topicList; char key[32][64];
char value[32][64];
} SThreadInfo;
tmq_t* tmq;
typedef struct { tmq_list_t* topicList;
// input from argvs
char cdbName[32]; } SThreadInfo;
char dbName[32];
int32_t showMsgFlag; typedef struct {
int32_t showRowFlag; // input from argvs
int32_t consumeDelay; // unit s char cdbName[32];
int32_t numOfThread; char dbName[32];
SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT]; int32_t showMsgFlag;
} SConfInfo; int32_t showRowFlag;
int32_t consumeDelay; // unit s
static SConfInfo g_stConfInfo; int32_t numOfThread;
TdFilePtr g_fp = NULL; SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT];
} SConfInfo;
// char* g_pRowValue = NULL;
// TdFilePtr g_fp = NULL; static SConfInfo g_stConfInfo;
TdFilePtr g_fp = NULL;
static void printHelp() {
char indent[10] = " "; // char* g_pRowValue = NULL;
printf("Used to test the tmq feature with sim cases\n"); // TdFilePtr g_fp = NULL;
printf("%s%s\n", indent, "-c"); static void printHelp() {
printf("%s%s%s%s\n", indent, indent, "Configuration directory, default is ", configDir); char indent[10] = " ";
printf("%s%s\n", indent, "-d"); printf("Used to test the tmq feature with sim cases\n");
printf("%s%s%s\n", indent, indent, "The name of the database for cosumer, no default ");
printf("%s%s\n", indent, "-g"); printf("%s%s\n", indent, "-c");
printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag); printf("%s%s%s%s\n", indent, indent, "Configuration directory, default is ", configDir);
printf("%s%s\n", indent, "-r"); printf("%s%s\n", indent, "-d");
printf("%s%s%s%d\n", indent, indent, "showRowFlag, default is ", g_stConfInfo.showRowFlag); printf("%s%s%s\n", indent, indent, "The name of the database for cosumer, no default ");
printf("%s%s\n", indent, "-y"); printf("%s%s\n", indent, "-g");
printf("%s%s%s%d\n", indent, indent, "consume delay, default is s", g_stConfInfo.consumeDelay); printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag);
exit(EXIT_SUCCESS); printf("%s%s\n", indent, "-r");
} printf("%s%s%s%d\n", indent, indent, "showRowFlag, default is ", g_stConfInfo.showRowFlag);
printf("%s%s\n", indent, "-y");
void initLogFile() { printf("%s%s%s%d\n", indent, indent, "consume delay, default is s", g_stConfInfo.consumeDelay);
// FILE *fp = fopen(g_stConfInfo.resultFileName, "a"); exit(EXIT_SUCCESS);
char file[256]; }
sprintf(file, "%s/../log/tmqlog.txt", configDir);
TdFilePtr pFile = taosOpenFile(file, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); void initLogFile() {
if (NULL == pFile) { // FILE *fp = fopen(g_stConfInfo.resultFileName, "a");
fprintf(stderr, "Failed to open %s for save result\n", "./tmqlog.txt"); char file[256];
exit(-1); sprintf(file, "%s/../log/tmqlog.txt", configDir);
} TdFilePtr pFile = taosOpenFile(file, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
g_fp = pFile; if (NULL == pFile) {
} fprintf(stderr, "Failed to open %s for save result\n", "./tmqlog.txt");
exit(-1);
void saveConfigToLogFile() { }
time_t tTime = taosGetTimestampSec(); g_fp = pFile;
struct tm tm = *taosLocalTime(&tTime, NULL); }
taosFprintfFile(g_fp, "###################################################################\n"); void saveConfigToLogFile() {
taosFprintfFile(g_fp, "# configDir: %s\n", configDir); time_t tTime = taosGetTimestampSec();
taosFprintfFile(g_fp, "# dbName: %s\n", g_stConfInfo.dbName); struct tm tm = *taosLocalTime(&tTime, NULL);
taosFprintfFile(g_fp, "# cdbName: %s\n", g_stConfInfo.cdbName);
taosFprintfFile(g_fp, "# showMsgFlag: %d\n", g_stConfInfo.showMsgFlag); taosFprintfFile(g_fp, "###################################################################\n");
taosFprintfFile(g_fp, "# showRowFlag: %d\n", g_stConfInfo.showRowFlag); taosFprintfFile(g_fp, "# configDir: %s\n", configDir);
taosFprintfFile(g_fp, "# consumeDelay: %d\n", g_stConfInfo.consumeDelay); taosFprintfFile(g_fp, "# dbName: %s\n", g_stConfInfo.dbName);
taosFprintfFile(g_fp, "# numOfThread: %d\n", g_stConfInfo.numOfThread); taosFprintfFile(g_fp, "# cdbName: %s\n", g_stConfInfo.cdbName);
taosFprintfFile(g_fp, "# showMsgFlag: %d\n", g_stConfInfo.showMsgFlag);
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { taosFprintfFile(g_fp, "# showRowFlag: %d\n", g_stConfInfo.showRowFlag);
taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId); taosFprintfFile(g_fp, "# consumeDelay: %d\n", g_stConfInfo.consumeDelay);
taosFprintfFile(g_fp, " Topics: "); taosFprintfFile(g_fp, "# numOfThread: %d\n", g_stConfInfo.numOfThread);
for (int j = 0; j < g_stConfInfo.stThreads[i].numOfTopic; j++) {
taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]); for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
} taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId);
taosFprintfFile(g_fp, "\n"); taosFprintfFile(g_fp, " auto commit: %s\n", g_stConfInfo.stThreads[i].autoCommit);
taosFprintfFile(g_fp, " Key: "); taosFprintfFile(g_fp, " auto commit interval ms: %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs);
for (int k = 0; k < g_stConfInfo.stThreads[i].numOfKey; k++) { taosFprintfFile(g_fp, " auto offset rest: %s\n", g_stConfInfo.stThreads[i].autoOffsetRest);
taosFprintfFile(g_fp, "%s:%s, ", g_stConfInfo.stThreads[i].key[k], g_stConfInfo.stThreads[i].value[k]); taosFprintfFile(g_fp, " Topics: ");
} for (int j = 0; j < g_stConfInfo.stThreads[i].numOfTopic; j++) {
taosFprintfFile(g_fp, "\n"); taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]);
} }
taosFprintfFile(g_fp, "\n");
taosFprintfFile(g_fp, "# Test time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1, taosFprintfFile(g_fp, " Key: ");
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec); for (int k = 0; k < g_stConfInfo.stThreads[i].numOfKey; k++) {
taosFprintfFile(g_fp, "###################################################################\n"); taosFprintfFile(g_fp, "%s:%s, ", g_stConfInfo.stThreads[i].key[k], g_stConfInfo.stThreads[i].value[k]);
} }
taosFprintfFile(g_fp, "\n");
void parseArgument(int32_t argc, char* argv[]) { }
memset(&g_stConfInfo, 0, sizeof(SConfInfo));
g_stConfInfo.showMsgFlag = 0; taosFprintfFile(g_fp, "# Test time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1,
g_stConfInfo.showRowFlag = 0; tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
g_stConfInfo.consumeDelay = 5; taosFprintfFile(g_fp, "###################################################################\n");
}
for (int32_t i = 1; i < argc; i++) {
if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) { void parseArgument(int32_t argc, char* argv[]) {
printHelp(); memset(&g_stConfInfo, 0, sizeof(SConfInfo));
exit(0); g_stConfInfo.showMsgFlag = 0;
} else if (strcmp(argv[i], "-d") == 0) { g_stConfInfo.showRowFlag = 0;
strcpy(g_stConfInfo.dbName, argv[++i]); g_stConfInfo.consumeDelay = 5;
} else if (strcmp(argv[i], "-w") == 0) {
strcpy(g_stConfInfo.cdbName, argv[++i]); for (int32_t i = 1; i < argc; i++) {
} else if (strcmp(argv[i], "-c") == 0) { if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
strcpy(configDir, argv[++i]); printHelp();
} else if (strcmp(argv[i], "-g") == 0) { exit(0);
g_stConfInfo.showMsgFlag = atol(argv[++i]); } else if (strcmp(argv[i], "-d") == 0) {
} else if (strcmp(argv[i], "-r") == 0) { strcpy(g_stConfInfo.dbName, argv[++i]);
g_stConfInfo.showRowFlag = atol(argv[++i]); } else if (strcmp(argv[i], "-w") == 0) {
} else if (strcmp(argv[i], "-y") == 0) { strcpy(g_stConfInfo.cdbName, argv[++i]);
g_stConfInfo.consumeDelay = atol(argv[++i]); } else if (strcmp(argv[i], "-c") == 0) {
} else { strcpy(configDir, argv[++i]);
printf("%s unknow para: %s %s", GREEN, argv[++i], NC); } else if (strcmp(argv[i], "-g") == 0) {
exit(-1); g_stConfInfo.showMsgFlag = atol(argv[++i]);
} } else if (strcmp(argv[i], "-r") == 0) {
} g_stConfInfo.showRowFlag = atol(argv[++i]);
} else if (strcmp(argv[i], "-y") == 0) {
initLogFile(); g_stConfInfo.consumeDelay = atol(argv[++i]);
} else {
taosFprintfFile(g_fp, "====parseArgument() success\n"); printf("%s unknow para: %s %s", GREEN, argv[++i], NC);
exit(-1);
#if 1 }
pPrint("%s configDir:%s %s", GREEN, configDir, NC); }
pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC);
pPrint("%s cdbName:%s %s", GREEN, g_stConfInfo.cdbName, NC); initLogFile();
pPrint("%s consumeDelay:%d %s", GREEN, g_stConfInfo.consumeDelay, NC);
pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC); taosFprintfFile(g_fp, "====parseArgument() success\n");
pPrint("%s showRowFlag:%d %s", GREEN, g_stConfInfo.showRowFlag, NC);
#endif #if 1
} pPrint("%s configDir:%s %s", GREEN, configDir, NC);
pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC);
void splitStr(char** arr, char* str, const char* del) { pPrint("%s cdbName:%s %s", GREEN, g_stConfInfo.cdbName, NC);
char* s = strtok(str, del); pPrint("%s consumeDelay:%d %s", GREEN, g_stConfInfo.consumeDelay, NC);
while (s != NULL) { pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC);
*arr++ = s; pPrint("%s showRowFlag:%d %s", GREEN, g_stConfInfo.showRowFlag, NC);
s = strtok(NULL, del); #endif
} }
}
void splitStr(char** arr, char* str, const char* del) {
void ltrim(char* str) { char* s = strtok(str, del);
if (str == NULL || *str == '\0') { while (s != NULL) {
return; *arr++ = s;
} s = strtok(NULL, del);
int len = 0; }
char* p = str; }
while (*p != '\0' && isspace(*p)) {
++p; void ltrim(char* str) {
++len; if (str == NULL || *str == '\0') {
} return;
memmove(str, p, strlen(str) - len + 1); }
// return str; int len = 0;
} char* p = str;
while (*p != '\0' && isspace(*p)) {
static int running = 1; ++p;
static int32_t msg_process(TAOS_RES* msg, int64_t msgIndex, int32_t threadLable) { ++len;
char buf[1024]; }
int32_t totalRows = 0; memmove(str, p, strlen(str) - len + 1);
// return str;
// printf("topic: %s\n", tmq_get_topic_name(msg)); }
// printf("vg:%d\n", tmq_get_vgroup_id(msg));
taosFprintfFile(g_fp, "msg index:%" PRId64 ", threadLable: %d\n", msgIndex, threadLable); static int running = 1;
taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), tmq_get_vgroup_id(msg)); static int32_t msg_process(TAOS_RES* msg, int64_t msgIndex, int32_t threadLable) {
char buf[1024];
while (1) { int32_t totalRows = 0;
TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break; // printf("topic: %s\n", tmq_get_topic_name(msg));
if (0 != g_stConfInfo.showRowFlag) { // printf("vg:%d\n", tmq_get_vgroup_id(msg));
TAOS_FIELD* fields = taos_fetch_fields(msg); taosFprintfFile(g_fp, "msg index:%" PRId64 ", threadLable: %d\n", msgIndex, threadLable);
int32_t numOfFields = taos_field_count(msg); taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), tmq_get_vgroup_id(msg));
taos_print_row(buf, row, fields, numOfFields);
taosFprintfFile(g_fp, "rows[%d]: %s\n", totalRows, buf); while (1) {
} TAOS_ROW row = taos_fetch_row(msg);
totalRows++; if (row == NULL) break;
} if (0 != g_stConfInfo.showRowFlag) {
TAOS_FIELD* fields = taos_fetch_fields(msg);
return totalRows; int32_t numOfFields = taos_field_count(msg);
} taos_print_row(buf, row, fields, numOfFields);
taosFprintfFile(g_fp, "rows[%d]: %s\n", totalRows, buf);
int queryDB(TAOS* taos, char* command) { }
TAOS_RES* pRes = taos_query(taos, command); totalRows++;
int code = taos_errno(pRes); }
// if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) {
if (code != 0) { return totalRows;
pError("failed to reason:%s, sql: %s", tstrerror(code), command); }
taos_free_result(pRes);
return -1; int queryDB(TAOS* taos, char* command) {
} TAOS_RES* pRes = taos_query(taos, command);
taos_free_result(pRes); int code = taos_errno(pRes);
return 0; // if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) {
} if (code != 0) {
pError("failed to reason:%s, sql: %s", tstrerror(code), command);
static void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets, void* param) { taos_free_result(pRes);
printf("tmq_commit_cb_print() commit %d\n", resp); return -1;
} }
taos_free_result(pRes);
void build_consumer(SThreadInfo* pInfo) { return 0;
tmq_conf_t* conf = tmq_conf_new(); }
// tmq_conf_set(conf, "td.connect.ip", "localhost"); static void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets, void* param) {
// tmq_conf_set(conf, "td.connect.port", "6030"); printf("tmq_commit_cb_print() commit %d\n", resp);
tmq_conf_set(conf, "td.connect.user", "root"); }
tmq_conf_set(conf, "td.connect.pass", "taosdata");
void build_consumer(SThreadInfo* pInfo) {
tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName); tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print, NULL); // tmq_conf_set(conf, "td.connect.ip", "localhost");
// tmq_conf_set(conf, "td.connect.port", "6030");
// tmq_conf_set(conf, "group.id", "cgrp1"); tmq_conf_set(conf, "td.connect.user", "root");
for (int32_t i = 0; i < pInfo->numOfKey; i++) { tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, pInfo->key[i], pInfo->value[i]);
} //tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
// tmq_conf_set(conf, "client.id", "c-001"); tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print, NULL);
// tmq_conf_set(conf, "enable.auto.commit", "true"); // tmq_conf_set(conf, "group.id", "cgrp1");
// tmq_conf_set(conf, "enable.auto.commit", "false"); for (int32_t i = 0; i < pInfo->numOfKey; i++) {
tmq_conf_set(conf, pInfo->key[i], pInfo->value[i]);
// tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); }
// tmq_conf_set(conf, "auto.offset.reset", "none"); // tmq_conf_set(conf, "client.id", "c-001");
// tmq_conf_set(conf, "auto.offset.reset", "earliest");
// tmq_conf_set(conf, "auto.offset.reset", "latest"); // tmq_conf_set(conf, "enable.auto.commit", "true");
// tmq_conf_set(conf, "enable.auto.commit", "false");
pInfo->tmq = tmq_consumer_new(conf, NULL, 0);
return; // tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
}
// tmq_conf_set(conf, "auto.offset.reset", "none");
void build_topic_list(SThreadInfo* pInfo) { // tmq_conf_set(conf, "auto.offset.reset", "earliest");
pInfo->topicList = tmq_list_new(); // tmq_conf_set(conf, "auto.offset.reset", "latest");
// tmq_list_append(topic_list, "test_stb_topic_1");
for (int32_t i = 0; i < pInfo->numOfTopic; i++) { pInfo->tmq = tmq_consumer_new(conf, NULL, 0);
tmq_list_append(pInfo->topicList, pInfo->topics[i]);
} tmq_conf_destroy(conf);
return;
} return;
}
int32_t saveConsumeResult(SThreadInfo* pInfo) {
char sqlStr[1024] = {0}; void build_topic_list(SThreadInfo* pInfo) {
pInfo->topicList = tmq_list_new();
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); // tmq_list_append(topic_list, "test_stb_topic_1");
assert(pConn != NULL); for (int32_t i = 0; i < pInfo->numOfTopic; i++) {
tmq_list_append(pInfo->topicList, pInfo->topics[i]);
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int }
sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %" PRId64 ", %d)", g_stConfInfo.cdbName, return;
pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt, pInfo->checkresult); }
TAOS_RES* pRes = taos_query(pConn, sqlStr); int32_t saveConsumeResult(SThreadInfo* pInfo) {
if (taos_errno(pRes) != 0) { char sqlStr[1024] = {0};
printf("error in save consumeinfo, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes); TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
exit(-1); assert(pConn != NULL);
}
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
taos_free_result(pRes); sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %" PRId64 ", %d)", g_stConfInfo.cdbName,
pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt, pInfo->checkresult);
return 0;
} TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) {
void loop_consume(SThreadInfo* pInfo) { printf("error in save consumeinfo, reason:%s\n", taos_errstr(pRes));
tmq_resp_err_t err; taos_free_result(pRes);
exit(-1);
int64_t totalMsgs = 0; }
int64_t totalRows = 0;
taos_free_result(pRes);
while (running) {
TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, g_stConfInfo.consumeDelay * 1000); return 0;
if (tmqMsg) { }
if (0 != g_stConfInfo.showMsgFlag) {
totalRows += msg_process(tmqMsg, totalMsgs, pInfo->consumerId); void loop_consume(SThreadInfo* pInfo) {
} tmq_resp_err_t err;
taos_free_result(tmqMsg); int64_t totalMsgs = 0;
int64_t totalRows = 0;
totalMsgs++;
while (running) {
if (totalMsgs >= pInfo->expectMsgCnt) { TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, g_stConfInfo.consumeDelay * 1000);
break; if (tmqMsg) {
} if (0 != g_stConfInfo.showMsgFlag) {
} else { totalRows += msg_process(tmqMsg, totalMsgs, pInfo->consumerId);
break; }
}
} taos_free_result(tmqMsg);
pInfo->consumeMsgCnt = totalMsgs; totalMsgs++;
pInfo->consumeRowCnt = totalRows;
if (totalMsgs >= pInfo->expectMsgCnt) {
taosFprintfFile(g_fp, "==== consumerId: %d, consumeMsgCnt: %" PRId64 ", consumeRowCnt: %" PRId64 "\n", taosFprintfFile(g_fp, "==== totalMsgs >= pInfo->expectMsgCnt, so break\n");
pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt); break;
} }
} else {
void* consumeThreadFunc(void* param) { taosFprintfFile(g_fp, "==== delay over time, so break\n");
int32_t totalMsgs = 0; break;
}
SThreadInfo* pInfo = (SThreadInfo*)param; }
build_consumer(pInfo); pInfo->consumeMsgCnt = totalMsgs;
build_topic_list(pInfo); pInfo->consumeRowCnt = totalRows;
if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) {
return NULL; taosFprintfFile(g_fp, "==== consumerId: %d, consumeMsgCnt: %" PRId64 ", consumeRowCnt: %" PRId64 "\n",
} pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt);
}
tmq_resp_err_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList);
if (err) { void* consumeThreadFunc(void* param) {
printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); int32_t totalMsgs = 0;
exit(-1);
} SThreadInfo* pInfo = (SThreadInfo*)param;
loop_consume(pInfo); build_consumer(pInfo);
build_topic_list(pInfo);
tmq_commit(pInfo->tmq, NULL, 0); if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) {
return NULL;
err = tmq_unsubscribe(pInfo->tmq); }
if (err) {
printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); tmq_resp_err_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList);
pInfo->consumeMsgCnt = -1; if (err) {
return NULL; printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
} exit(-1);
}
err = tmq_consumer_close(pInfo->tmq);
if (err) { tmq_list_destroy(pInfo->topicList);
printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); pInfo->topicList = NULL;
exit(-1);
} loop_consume(pInfo);
pInfo->tmq = NULL;
tmq_commit(pInfo->tmq, NULL, 0);
// save consume result into consumeresult table
saveConsumeResult(pInfo); err = tmq_unsubscribe(pInfo->tmq);
if (err) {
return NULL; printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
} pInfo->consumeMsgCnt = -1;
return NULL;
void parseConsumeInfo() { }
char* token;
const char delim[2] = ","; err = tmq_consumer_close(pInfo->tmq);
const char ch = ':'; if (err) {
printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { exit(-1);
token = strtok(g_stConfInfo.stThreads[i].topicString, delim); }
while (token != NULL) { pInfo->tmq = NULL;
// printf("%s\n", token );
strcpy(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic], token); // save consume result into consumeresult table
ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]); saveConsumeResult(pInfo);
// printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
g_stConfInfo.stThreads[i].numOfTopic++; return NULL;
}
token = strtok(NULL, delim);
} void parseConsumeInfo() {
char* token;
token = strtok(g_stConfInfo.stThreads[i].keyString, delim); const char delim[2] = ",";
while (token != NULL) { const char ch = ':';
// printf("%s\n", token );
{ for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
char* pstr = token; token = strtok(g_stConfInfo.stThreads[i].topicString, delim);
ltrim(pstr); while (token != NULL) {
char* ret = strchr(pstr, ch); // printf("%s\n", token );
memcpy(g_stConfInfo.stThreads[i].key[g_stConfInfo.stThreads[i].numOfKey], pstr, ret - pstr); strcpy(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic], token);
strcpy(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey], ret + 1); ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]);
// printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey], // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
// g_stConfInfo.value[g_stConfInfo.numOfKey]); g_stConfInfo.stThreads[i].numOfTopic++;
g_stConfInfo.stThreads[i].numOfKey++;
} token = strtok(NULL, delim);
}
token = strtok(NULL, delim);
} token = strtok(g_stConfInfo.stThreads[i].keyString, delim);
} while (token != NULL) {
} // printf("%s\n", token );
{
int32_t getConsumeInfo() { char* pstr = token;
char sqlStr[1024] = {0}; ltrim(pstr);
char* ret = strchr(pstr, ch);
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); memcpy(g_stConfInfo.stThreads[i].key[g_stConfInfo.stThreads[i].numOfKey], pstr, ret - pstr);
assert(pConn != NULL); strcpy(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey], ret + 1);
// printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.cdbName); // g_stConfInfo.value[g_stConfInfo.numOfKey]);
TAOS_RES* pRes = taos_query(pConn, sqlStr); g_stConfInfo.stThreads[i].numOfKey++;
if (taos_errno(pRes) != 0) { }
printf("error in get consumeinfo, reason:%s\n", taos_errstr(pRes));
taosFprintfFile(g_fp, "error in get consumeinfo, reason:%s\n", taos_errstr(pRes)); token = strtok(NULL, delim);
taosCloseFile(&g_fp); }
taos_free_result(pRes); }
exit(-1); }
}
int32_t getConsumeInfo() {
TAOS_ROW row = NULL; char sqlStr[1024] = {0};
int num_fields = taos_num_fields(pRes);
TAOS_FIELD* fields = taos_fetch_fields(pRes); TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
assert(pConn != NULL);
// schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint,
// ifcheckdata int sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.cdbName);
TAOS_RES* pRes = taos_query(pConn, sqlStr);
int32_t numOfThread = 0; if (taos_errno(pRes) != 0) {
while ((row = taos_fetch_row(pRes))) { printf("error in get consumeinfo, reason:%s\n", taos_errstr(pRes));
int32_t* lengths = taos_fetch_lengths(pRes); taosFprintfFile(g_fp, "error in get consumeinfo, reason:%s\n", taos_errstr(pRes));
taosCloseFile(&g_fp);
for (int i = 0; i < num_fields; ++i) { taos_free_result(pRes);
if (row[i] == NULL || 0 == i) { exit(-1);
continue; }
}
TAOS_ROW row = NULL;
if ((1 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) { int num_fields = taos_num_fields(pRes);
g_stConfInfo.stThreads[numOfThread].consumerId = *((int32_t*)row[i]); TAOS_FIELD* fields = taos_fetch_fields(pRes);
} else if ((2 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
memcpy(g_stConfInfo.stThreads[numOfThread].topicString, row[i], lengths[i]); // schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint,
} else if ((3 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) { // ifcheckdata int
memcpy(g_stConfInfo.stThreads[numOfThread].keyString, row[i], lengths[i]);
} else if ((4 == i) && (fields[i].type == TSDB_DATA_TYPE_BIGINT)) { int32_t numOfThread = 0;
g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t*)row[i]); while ((row = taos_fetch_row(pRes))) {
} else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) { int32_t* lengths = taos_fetch_lengths(pRes);
g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t*)row[i]);
} // set default value
} g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000;
numOfThread++; memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true"));
} memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast"));
g_stConfInfo.numOfThread = numOfThread;
for (int i = 0; i < num_fields; ++i) {
taos_free_result(pRes); if (row[i] == NULL || 0 == i) {
continue;
parseConsumeInfo(); }
return 0; if ((1 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
} g_stConfInfo.stThreads[numOfThread].consumerId = *((int32_t*)row[i]);
} else if ((2 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
int main(int32_t argc, char* argv[]) { memcpy(g_stConfInfo.stThreads[numOfThread].topicString, row[i], lengths[i]);
parseArgument(argc, argv); } else if ((3 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
getConsumeInfo(); memcpy(g_stConfInfo.stThreads[numOfThread].keyString, row[i], lengths[i]);
saveConfigToLogFile(); } else if ((4 == i) && (fields[i].type == TSDB_DATA_TYPE_BIGINT)) {
g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t*)row[i]);
TdThreadAttr thattr; } else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
taosThreadAttrInit(&thattr); g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t*)row[i]);
taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE); } else if ((6 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, row[i], lengths[i]);
// pthread_create one thread to consume } else if ((7 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
taosFprintfFile(g_fp, "==== create %d consume thread ====\n", g_stConfInfo.numOfThread); g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = *((int32_t*)row[i]);
for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) { } else if ((8 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, consumeThreadFunc, memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, row[i], lengths[i]);
(void*)(&(g_stConfInfo.stThreads[i]))); }
} }
numOfThread++;
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { }
taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL); g_stConfInfo.numOfThread = numOfThread;
}
taos_free_result(pRes);
// printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);
parseConsumeInfo();
taosFprintfFile(g_fp, "==== close tmqlog ====\n");
taosCloseFile(&g_fp); return 0;
}
return 0;
} int main(int32_t argc, char* argv[]) {
parseArgument(argc, argv);
getConsumeInfo();
saveConfigToLogFile();
TdThreadAttr thattr;
taosThreadAttrInit(&thattr);
taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);
// pthread_create one thread to consume
taosFprintfFile(g_fp, "==== create %d consume thread ====\n", g_stConfInfo.numOfThread);
for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) {
taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, consumeThreadFunc,
(void*)(&(g_stConfInfo.stThreads[i])));
}
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL);
}
// printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);
taosFprintfFile(g_fp, "==== close tmqlog ====\n");
taosCloseFile(&g_fp);
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册