提交 a44725f1 编写于 作者: P plum-lihui

[test: refactor tmq test script]

上级 c273b2d5
...@@ -55,7 +55,7 @@ ...@@ -55,7 +55,7 @@
# ---- tmq # ---- tmq
./test.sh -f tsim/tmq/basic.sim ./test.sh -f tsim/tmq/basic.sim
./test.sh -f tsim/tmq/basic1.sim #./test.sh -f tsim/tmq/basic1.sim
#./test.sh -f tsim/tmq/oneTopic.sim #./test.sh -f tsim/tmq/oneTopic.sim
#./test.sh -f tsim/tmq/multiTopic.sim #./test.sh -f tsim/tmq/multiTopic.sim
......
#!/bin/bash
##################################################
#
# Do tmq test
#
##################################################
set +e
# set default value for parameters
EXEC_OPTON=start
DB_NAME=db
POLL_DELAY=5
VALGRIND=0
SIGNAL=SIGINT
while getopts "d:s:v:y:x:" arg
do
case $arg in
d)
DB_NAME=$OPTARG
;;
s)
EXEC_OPTON=$OPTARG
;;
v)
VALGRIND=1
;;
y)
POLL_DELAY=$OPTARG
;;
x)
SIGNAL=$OPTARG
;;
?)
echo "unkown argument"
;;
esac
done
SCRIPT_DIR=`pwd`
IN_TDINTERNAL="community"
if [[ "$SCRIPT_DIR" == *"$IN_TDINTERNAL"* ]]; then
cd ../../..
else
cd ../../
fi
TOP_DIR=`pwd`
if [[ "$SCRIPT_DIR" == *"$IN_TDINTERNAL"* ]]; then
BIN_DIR=`find . -name "tmq_sim"|grep bin|head -n1|cut -d '/' -f 2,3`
else
BIN_DIR=`find . -name "tmq_sim"|grep bin|head -n1|cut -d '/' -f 2`
fi
declare -x BUILD_DIR=$TOP_DIR/$BIN_DIR
declare -x SIM_DIR=$TOP_DIR/sim
PROGRAM=$BUILD_DIR/build/bin/tmq_sim
PRG_DIR=$SIM_DIR/tsim
CFG_DIR=$PRG_DIR/cfg
LOG_DIR=$PRG_DIR/log
echo "------------------------------------------------------------------------"
echo "BUILD_DIR: $BUILD_DIR"
echo "SIM_DIR : $SIM_DIR"
echo "CFG_DIR : $CFG_DIR"
echo "PROGRAM: $PROGRAM
echo "CFG_DIR: $CFG_DIR
echo "POLL_DELAY: $POLL_DELAY
echo "DB_NAME: $DB_NAME
echo "------------------------------------------------------------------------"
if [ "$EXEC_OPTON" = "start" ]; then
if [ $VALGRIND -eq 1 ]; then
echo nohup valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tmq_sim.log $PROGRAM -c $CFG_DIR -d $DB_NAME -y $POLL_DELAY > /dev/null 2>&1 &
nohup valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tmq_sim.log $PROGRAM -c $CFG_DIR -d $DB_NAME -y $POLL_DELAY > /dev/null 2>&1 &
else
echo "nohup $PROGRAM -c $CFG_DIR -d $DB_NAME -y $POLL_DELAY > /dev/null 2>&1 &"
nohup $PROGRAM -c $CFG_DIR -y $POLL_DELAY -d $DB_NAME > /dev/null 2>&1 &
fi
else
PID=`ps -ef|grep tmq_sim | grep -v grep | awk '{print $2}'`
while [ -n "$PID" ]
do
if [ "$SIGNAL" = "SIGKILL" ]; then
echo try to kill by signal SIGKILL
kill -9 $PID
else
echo try to kill by signal SIGINT
kill -SIGINT $PID
fi
sleep 1
PID=`ps -ef|grep tmq_sim | grep -v grep | awk '{print $2}'`
done
fi
...@@ -31,44 +31,46 @@ ...@@ -31,44 +31,46 @@
#define NC "\033[0m" #define NC "\033[0m"
#define min(a, b) (((a) < (b)) ? (a) : (b)) #define min(a, b) (((a) < (b)) ? (a) : (b))
#define MAX_SQL_STR_LEN (1024 * 1024) #define MAX_SQL_STR_LEN (1024 * 1024)
#define MAX_ROW_STR_LEN (16 * 1024) #define MAX_ROW_STR_LEN (16 * 1024)
#define MAX_CONSUMER_THREAD_CNT (16)
typedef struct { typedef struct {
int32_t expectMsgCnt; TdThread thread;
int32_t consumeMsgCnt; int32_t consumerId;
TdThread thread;
int32_t ifCheckData;
int64_t expectMsgCnt;
int64_t consumeMsgCnt;
int32_t checkresult;
char topicString[1024];
char keyString[1024];
int32_t numOfTopic;
char topics[32][64];
int32_t numOfKey;
char key[32][64];
char value[32][64];
tmq_t* tmq;
tmq_list_t* topicList;
} SThreadInfo; } SThreadInfo;
typedef struct { typedef struct {
// input from argvs // input from argvs
char dbName[32]; char dbName[32];
char topicString[256]; int32_t showMsgFlag;
char keyString[1024]; int32_t consumeDelay; // unit s
char topicString1[256]; int32_t numOfThread;
char keyString1[1024]; SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT];
int32_t showMsgFlag;
int32_t consumeDelay; // unit s
int32_t consumeMsgCnt;
int32_t checkMode;
// save result after parse agrvs
int32_t numOfTopic;
char topics[32][64];
int32_t numOfKey;
char key[32][64];
char value[32][64];
int32_t numOfTopic1;
char topics1[32][64];
int32_t numOfKey1;
char key1[32][64];
char value1[32][64];
} SConfInfo; } SConfInfo;
static SConfInfo g_stConfInfo; static SConfInfo g_stConfInfo;
TdFilePtr g_fp = NULL;
// char* g_pRowValue = NULL; // char* g_pRowValue = NULL;
// TdFilePtr g_fp = NULL; // TdFilePtr g_fp = NULL;
...@@ -81,30 +83,54 @@ static void printHelp() { ...@@ -81,30 +83,54 @@ static void printHelp() {
printf("%s%s%s%s\n", indent, indent, "Configuration directory, default is ", configDir); printf("%s%s%s%s\n", indent, indent, "Configuration directory, default is ", configDir);
printf("%s%s\n", indent, "-d"); printf("%s%s\n", indent, "-d");
printf("%s%s%s\n", indent, indent, "The name of the database for cosumer, no default "); printf("%s%s%s\n", indent, indent, "The name of the database for cosumer, no default ");
printf("%s%s\n", indent, "-t");
printf("%s%s%s\n", indent, indent, "The topic string for cosumer, no default ");
printf("%s%s\n", indent, "-k");
printf("%s%s%s\n", indent, indent, "The key-value string for cosumer, no default ");
printf("%s%s\n", indent, "-t1");
printf("%s%s%s\n", indent, indent, "The topic1 string for cosumer, no default ");
printf("%s%s\n", indent, "-k1");
printf("%s%s%s\n", indent, indent, "The key1-value1 string for cosumer, no default ");
printf("%s%s\n", indent, "-g"); printf("%s%s\n", indent, "-g");
printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag); printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag);
printf("%s%s\n", indent, "-y"); printf("%s%s\n", indent, "-y");
printf("%s%s%s%d\n", indent, indent, "consume delay, default is s", g_stConfInfo.consumeDelay); printf("%s%s%s%d\n", indent, indent, "consume delay, default is s", g_stConfInfo.consumeDelay);
printf("%s%s\n", indent, "-m");
printf("%s%s%s%d\n", indent, indent, "consume msg count, default is s", g_stConfInfo.consumeMsgCnt);
printf("%s%s\n", indent, "-j");
printf("%s%s%s%d\n", indent, indent, "check mode, default is s", g_stConfInfo.checkMode);
exit(EXIT_SUCCESS); exit(EXIT_SUCCESS);
} }
void initLogFile() {
// FILE *fp = fopen(g_stConfInfo.resultFileName, "a");
TdFilePtr pFile = taosOpenFile("./tmqlog.txt", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_STREAM);
if (NULL == pFile) {
fprintf(stderr, "Failed to open %s for save result\n", "./tmqlog.txt");
exit -1;
};
g_fp = pFile;
time_t tTime = taosGetTimestampSec();
struct tm tm = *taosLocalTime(&tTime, NULL);
taosFprintfFile(pFile, "###################################################################\n");
taosFprintfFile(pFile, "# configDir: %s\n", configDir);
taosFprintfFile(pFile, "# dbName: %s\n", g_stConfInfo.dbName);
taosFprintfFile(pFile, "# showMsgFlag: %d\n", g_stConfInfo.showMsgFlag);
taosFprintfFile(pFile, "# consumeDelay: %d\n", g_stConfInfo.consumeDelay);
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
taosFprintfFile(pFile, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId);
taosFprintfFile(pFile, " Topics: ");
for (int i = 0 ; i < g_stConfInfo.stThreads[i].numOfTopic; i++) {
taosFprintfFile(pFile, "%s, ", g_stConfInfo.stThreads[i].topics[i]);
}
taosFprintfFile(pFile, "\n");
taosFprintfFile(pFile, " Key: ");
for (int i = 0 ; i < g_stConfInfo.stThreads[i].numOfKey; i++) {
taosFprintfFile(pFile, "%s:%s, ", g_stConfInfo.stThreads[i].key[i], g_stConfInfo.stThreads[i].value[i]);
}
taosFprintfFile(pFile, "\n");
}
taosFprintfFile(pFile, "# Test time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1,
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
taosFprintfFile(pFile, "###################################################################\n");
}
void parseArgument(int32_t argc, char* argv[]) { void parseArgument(int32_t argc, char* argv[]) {
memset(&g_stConfInfo, 0, sizeof(SConfInfo)); memset(&g_stConfInfo, 0, sizeof(SConfInfo));
g_stConfInfo.showMsgFlag = 0; g_stConfInfo.showMsgFlag = 0;
g_stConfInfo.consumeDelay = 8000; g_stConfInfo.consumeDelay = 5;
g_stConfInfo.consumeMsgCnt = 0;
for (int32_t i = 1; i < argc; i++) { for (int32_t i = 1; i < argc; i++) {
if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) { if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
...@@ -114,37 +140,20 @@ void parseArgument(int32_t argc, char* argv[]) { ...@@ -114,37 +140,20 @@ void parseArgument(int32_t argc, char* argv[]) {
strcpy(g_stConfInfo.dbName, argv[++i]); strcpy(g_stConfInfo.dbName, argv[++i]);
} else if (strcmp(argv[i], "-c") == 0) { } else if (strcmp(argv[i], "-c") == 0) {
strcpy(configDir, argv[++i]); strcpy(configDir, argv[++i]);
} else if (strcmp(argv[i], "-t") == 0) {
strcpy(g_stConfInfo.topicString, argv[++i]);
} else if (strcmp(argv[i], "-k") == 0) {
strcpy(g_stConfInfo.keyString, argv[++i]);
} else if (strcmp(argv[i], "-t1") == 0) {
strcpy(g_stConfInfo.topicString1, argv[++i]);
} else if (strcmp(argv[i], "-k1") == 0) {
strcpy(g_stConfInfo.keyString1, argv[++i]);
} else if (strcmp(argv[i], "-g") == 0) { } else if (strcmp(argv[i], "-g") == 0) {
g_stConfInfo.showMsgFlag = atol(argv[++i]); g_stConfInfo.showMsgFlag = atol(argv[++i]);
} else if (strcmp(argv[i], "-y") == 0) { } else if (strcmp(argv[i], "-y") == 0) {
g_stConfInfo.consumeDelay = atol(argv[++i]); g_stConfInfo.consumeDelay = atol(argv[++i]);
} else if (strcmp(argv[i], "-m") == 0) {
g_stConfInfo.consumeMsgCnt = atol(argv[++i]);
} else if (strcmp(argv[i], "-j") == 0) {
g_stConfInfo.checkMode = atol(argv[++i]);
} else { } else {
printf("%s unknow para: %s %s", GREEN, argv[++i], NC); printf("%s unknow para: %s %s", GREEN, argv[++i], NC);
exit(-1); exit(-1);
} }
} }
if (0 == g_stConfInfo.consumeMsgCnt) { #if 1
g_stConfInfo.consumeMsgCnt = 0x7fffffff;
}
#if 0
pPrint("%s configDir:%s %s", GREEN, configDir, NC); pPrint("%s configDir:%s %s", GREEN, configDir, NC);
pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC); pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC);
pPrint("%s topicString:%s %s", GREEN, g_stConfInfo.topicString, NC); pPrint("%s consumeDelay:%d %s", GREEN, g_stConfInfo.consumeDelay, NC);
pPrint("%s keyString:%s %s", GREEN, g_stConfInfo.keyString, NC);
pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC); pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC);
#endif #endif
} }
...@@ -171,74 +180,26 @@ void ltrim(char* str) { ...@@ -171,74 +180,26 @@ void ltrim(char* str) {
// return str; // return str;
} }
void parseInputString() { static int running = 1;
// printf("topicString: %s\n", g_stConfInfo.topicString); static void msg_process(TAOS_RES* msg, int32_t msgIndex, int32_t threadLable) {
// printf("keyString: %s\n\n", g_stConfInfo.keyString); char buf[1024];
char* token; //printf("topic: %s\n", tmq_get_topic_name(msg));
const char delim[2] = ","; //printf("vg:%d\n", tmq_get_vgroup_id(msg));
const char ch = ':'; taosFprintfFile(g_fp, "msg index:%d, threadLable: %d\n", msgIndex, threadLable);
taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), tmq_get_vgroup_id(msg));
token = strtok(g_stConfInfo.topicString, delim);
while (token != NULL) { while (1) {
// printf("%s\n", token ); TAOS_ROW row = taos_fetch_row(msg);
strcpy(g_stConfInfo.topics[g_stConfInfo.numOfTopic], token); if (row == NULL) break;
ltrim(g_stConfInfo.topics[g_stConfInfo.numOfTopic]); TAOS_FIELD* fields = taos_fetch_fields(msg);
// printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]); int32_t numOfFields = taos_field_count(msg);
g_stConfInfo.numOfTopic++; //taos_print_row(buf, row, fields, numOfFields);
//printf("%s\n", buf);
token = strtok(NULL, delim); //taosFprintfFile(g_fp, "%s\n", buf);
}
token = strtok(g_stConfInfo.topicString1, delim);
while (token != NULL) {
// printf("%s\n", token );
strcpy(g_stConfInfo.topics1[g_stConfInfo.numOfTopic1], token);
ltrim(g_stConfInfo.topics1[g_stConfInfo.numOfTopic1]);
// printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
g_stConfInfo.numOfTopic1++;
token = strtok(NULL, delim);
}
token = strtok(g_stConfInfo.keyString, delim);
while (token != NULL) {
// printf("%s\n", token );
{
char* pstr = token;
ltrim(pstr);
char* ret = strchr(pstr, ch);
memcpy(g_stConfInfo.key[g_stConfInfo.numOfKey], pstr, ret - pstr);
strcpy(g_stConfInfo.value[g_stConfInfo.numOfKey], ret + 1);
// printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
// g_stConfInfo.value[g_stConfInfo.numOfKey]);
g_stConfInfo.numOfKey++;
}
token = strtok(NULL, delim);
}
token = strtok(g_stConfInfo.keyString1, delim);
while (token != NULL) {
// printf("%s\n", token );
{
char* pstr = token;
ltrim(pstr);
char* ret = strchr(pstr, ch);
memcpy(g_stConfInfo.key1[g_stConfInfo.numOfKey1], pstr, ret - pstr);
strcpy(g_stConfInfo.value1[g_stConfInfo.numOfKey1], ret + 1);
// printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
// g_stConfInfo.value[g_stConfInfo.numOfKey]);
g_stConfInfo.numOfKey1++;
}
token = strtok(NULL, delim);
} }
} }
static int running = 1;
/*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/
int queryDB(TAOS* taos, char* command) { int queryDB(TAOS* taos, char* command) {
TAOS_RES* pRes = taos_query(taos, command); TAOS_RES* pRes = taos_query(taos, command);
int code = taos_errno(pRes); int code = taos_errno(pRes);
...@@ -252,8 +213,7 @@ int queryDB(TAOS* taos, char* command) { ...@@ -252,8 +213,7 @@ int queryDB(TAOS* taos, char* command) {
return 0; return 0;
} }
tmq_t* build_consumer() { void build_consumer(SThreadInfo *pInfo) {
#if 0
char sqlStr[1024] = {0}; char sqlStr[1024] = {0};
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
...@@ -267,273 +227,229 @@ tmq_t* build_consumer() { ...@@ -267,273 +227,229 @@ tmq_t* build_consumer() {
exit(-1); exit(-1);
} }
taos_free_result(pRes); taos_free_result(pRes);
#endif
tmq_conf_t* conf = tmq_conf_new(); tmq_conf_t* conf = tmq_conf_new();
// tmq_conf_set(conf, "group.id", "tg2"); // tmq_conf_set(conf, "group.id", "tg2");
for (int32_t i = 0; i < g_stConfInfo.numOfKey; i++) { for (int32_t i = 0; i < pInfo->numOfKey; i++) {
tmq_conf_set(conf, g_stConfInfo.key[i], g_stConfInfo.value[i]); tmq_conf_set(conf, pInfo->key[i], pInfo->value[i]);
} }
tmq_conf_set(conf, "td.connect.user", "root"); pInfo->tmq = tmq_consumer_new(pConn, conf, NULL, 0);
tmq_conf_set(conf, "td.connect.pass", "taosdata"); return;
tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
tmq_t* tmq = tmq_consumer_new1(conf, NULL, 0);
assert(tmq);
tmq_conf_destroy(conf);
return tmq;
} }
tmq_list_t* build_topic_list() { void build_topic_list(SThreadInfo *pInfo) {
tmq_list_t* topic_list = tmq_list_new(); pInfo->topicList = tmq_list_new();
// tmq_list_append(topic_list, "test_stb_topic_1"); // tmq_list_append(topic_list, "test_stb_topic_1");
for (int32_t i = 0; i < g_stConfInfo.numOfTopic; i++) { for (int32_t i = 0; i < pInfo->numOfTopic; i++) {
tmq_list_append(topic_list, g_stConfInfo.topics[i]); tmq_list_append(pInfo->topicList, pInfo->topics[i]);
} }
return topic_list; return;
} }
tmq_t* build_consumer_x() { int32_t saveConsumeResult(SThreadInfo *pInfo) {
#if 0
char sqlStr[1024] = {0}; char sqlStr[1024] = {0};
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
assert(pConn != NULL); assert(pConn != NULL);
sprintf(sqlStr, "use %s", g_stConfInfo.dbName); // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %d)",
g_stConfInfo.dbName,
pInfo->consumerId,
pInfo->consumeMsgCnt,
pInfo->checkresult);
TAOS_RES* pRes = taos_query(pConn, sqlStr); TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes)); printf("error in save consumeinfo, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes); taos_free_result(pRes);
exit(-1); exit(-1);
} }
taos_free_result(pRes); taos_free_result(pRes);
#endif
tmq_conf_t* conf = tmq_conf_new();
// tmq_conf_set(conf, "group.id", "tg2");
for (int32_t i = 0; i < g_stConfInfo.numOfKey1; i++) {
tmq_conf_set(conf, g_stConfInfo.key1[i], g_stConfInfo.value1[i]);
}
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
tmq_t* tmq = tmq_consumer_new1(conf, NULL, 0);
assert(tmq);
tmq_conf_destroy(conf);
return tmq;
}
tmq_list_t* build_topic_list_x() { return 0;
tmq_list_t* topic_list = tmq_list_new();
// tmq_list_append(topic_list, "test_stb_topic_1");
for (int32_t i = 0; i < g_stConfInfo.numOfTopic1; i++) {
tmq_list_append(topic_list, g_stConfInfo.topics1[i]);
}
return topic_list;
} }
void loop_consume(tmq_t* tmq) { void loop_consume(SThreadInfo *pInfo) {
tmq_resp_err_t err; tmq_resp_err_t err;
int64_t totalMsgs = 0;
//int64_t totalRows = 0;
int32_t totalMsgs = 0;
int32_t totalRows = 0;
int32_t skipLogNum = 0;
while (running) { while (running) {
TAOS_RES* tmqMsg = tmq_consumer_poll(tmq, 8000); TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, g_stConfInfo.consumeDelay * 1000);
if (tmqMsg) { if (tmqMsg) {
totalMsgs++;
#if 0
TAOS_ROW row;
while (NULL != (row = tmq_get_row(tmqMsg))) {
totalRows++;
}
#endif
/*skipLogNum += tmqGetSkipLogNum(tmqMsg);*/
if (0 != g_stConfInfo.showMsgFlag) { if (0 != g_stConfInfo.showMsgFlag) {
/*msg_process(tmqMsg);*/ msg_process(tmqMsg, totalMsgs, 0);
} }
tmq_message_destroy(tmqMsg); tmq_message_destroy(tmqMsg);
} else {
break;
}
}
err = tmq_consumer_close(tmq);
if (err) {
printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
exit(-1);
}
printf("{consume success: %d, %d}", totalMsgs, totalRows);
}
int32_t parallel_consume(tmq_t* tmq, int threadLable) {
tmq_resp_err_t err;
int32_t totalMsgs = 0;
int32_t totalRows = 0;
int32_t skipLogNum = 0;
while (running) {
TAOS_RES* tmqMsg = tmq_consumer_poll(tmq, g_stConfInfo.consumeDelay * 1000);
if (tmqMsg) {
totalMsgs++; totalMsgs++;
// printf("threadFlag: %d, totalMsgs: %d\n", threadLable, totalMsgs); if (totalMsgs >= pInfo->expectMsgCnt) {
#if 0
TAOS_ROW row;
while (NULL != (row = tmq_get_row(tmqMsg))) {
totalRows++;
}
#endif
/*skipLogNum += tmqGetSkipLogNum(tmqMsg);*/
if (0 != g_stConfInfo.showMsgFlag) {
/*msg_process(tmqMsg);*/
}
tmq_message_destroy(tmqMsg);
if (totalMsgs >= g_stConfInfo.consumeMsgCnt) {
break; break;
} }
} else { } else {
break; break;
} }
} }
err = tmq_consumer_close(tmq); err = tmq_consumer_close(pInfo->tmq);
if (err) { if (err) {
printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
exit(-1); exit(-1);
} }
// printf("%d", totalMsgs); // output to sim for check result pInfo->consumeMsgCnt = totalMsgs;
return totalMsgs;
} }
void* threadFunc(void* param) { void *consumeThreadFunc(void *param) {
int32_t totalMsgs = 0; int32_t totalMsgs = 0;
SThreadInfo* pInfo = (SThreadInfo*)param; SThreadInfo *pInfo = (SThreadInfo *)param;
tmq_t* tmq = build_consumer_x(); build_consumer(pInfo);
tmq_list_t* topic_list = build_topic_list_x(); build_topic_list(pInfo);
if ((NULL == tmq) || (NULL == topic_list)) { if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)){
return NULL; return NULL;
} }
tmq_resp_err_t err = tmq_subscribe(tmq, topic_list); tmq_resp_err_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList);
if (err) { if (err) {
printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
exit(-1); exit(-1);
} }
loop_consume(pInfo);
// if (0 == g_stConfInfo.consumeMsgCnt) { err = tmq_unsubscribe(pInfo->tmq);
// loop_consume(tmq);
// } else {
pInfo->consumeMsgCnt = parallel_consume(tmq, 1);
//}
err = tmq_unsubscribe(tmq);
if (err) { if (err) {
printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
pInfo->consumeMsgCnt = -1; pInfo->consumeMsgCnt = -1;
return NULL; return NULL;
} }
// save consume result into consumeresult table
saveConsumeResult(pInfo);
return NULL; return NULL;
} }
int main(int32_t argc, char* argv[]) { void parseConsumeInfo() {
parseArgument(argc, argv); char* token;
parseInputString(); const char delim[2] = ",";
const char ch = ':';
int32_t numOfThreads = 1; for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
TdThreadAttr thattr; token = strtok(g_stConfInfo.stThreads[i].topicString, delim);
taosThreadAttrInit(&thattr); while (token != NULL) {
taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE); // printf("%s\n", token );
SThreadInfo* pInfo = (SThreadInfo*)taosMemoryCalloc(numOfThreads, sizeof(SThreadInfo)); strcpy(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic], token);
ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]);
if (g_stConfInfo.numOfTopic1) { // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
// pthread_create one thread to consume g_stConfInfo.stThreads[i].numOfTopic++;
for (int32_t i = 0; i < numOfThreads; ++i) {
pInfo[i].expectMsgCnt = 0; token = strtok(NULL, delim);
pInfo[i].consumeMsgCnt = 0; }
taosThreadCreate(&(pInfo[i].thread), &thattr, threadFunc, (void*)(pInfo + i));
token = strtok(g_stConfInfo.stThreads[i].keyString, delim);
while (token != NULL) {
// printf("%s\n", token );
{
char* pstr = token;
ltrim(pstr);
char* ret = strchr(pstr, ch);
memcpy(g_stConfInfo.stThreads[i].key[g_stConfInfo.stThreads[i].numOfKey], pstr, ret - pstr);
strcpy(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey], ret + 1);
// printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
// g_stConfInfo.value[g_stConfInfo.numOfKey]);
g_stConfInfo.stThreads[i].numOfKey++;
}
token = strtok(NULL, delim);
} }
} }
}
int32_t totalMsgs = 0; int32_t getConsumeInfo() {
tmq_t* tmq = build_consumer(); char sqlStr[1024] = {0};
tmq_list_t* topic_list = build_topic_list();
if ((NULL == tmq) || (NULL == topic_list)) { TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
return -1; assert(pConn != NULL);
}
sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.dbName);
tmq_resp_err_t err = tmq_subscribe(tmq, topic_list); TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (err) { if (taos_errno(pRes) != 0) {
printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); printf("error in get consumeinfo, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
exit(-1); exit(-1);
}
TAOS_ROW row = NULL;
int num_fields = taos_num_fields(pRes);
TAOS_FIELD* fields = taos_fetch_fields(pRes);
// schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int
int32_t numOfThread = 0;
while ((row = taos_fetch_row(pRes))) {
int32_t* lengths = taos_fetch_lengths(pRes);
for (int i = 0; i < num_fields; ++i) {
if (row[i] == NULL || 0 == i) {
continue;
}
if ((1 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
g_stConfInfo.stThreads[numOfThread].consumerId = *((int32_t *)row[i]);
} else if ((2 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
memcpy(g_stConfInfo.stThreads[numOfThread].topicString, row[i], lengths[i]);
} else if ((3 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
memcpy(g_stConfInfo.stThreads[numOfThread].keyString, row[i], lengths[i]);
} else if ((4 == i) && (fields[i].type == TSDB_DATA_TYPE_BIGINT)) {
g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t *)row[i]);
} else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t *)row[i]);
}
}
numOfThread ++;
} }
g_stConfInfo.numOfThread = numOfThread;
if (0 == g_stConfInfo.numOfTopic1) { taos_free_result(pRes);
loop_consume(tmq);
} else {
totalMsgs = parallel_consume(tmq, 0);
}
err = tmq_unsubscribe(tmq); parseConsumeInfo();
if (err) {
printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
exit(-1);
}
if (g_stConfInfo.numOfTopic1) { return 0;
for (int32_t i = 0; i < numOfThreads; i++) { }
taosThreadJoin(pInfo[i].thread, NULL);
}
// printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);
if (0 == g_stConfInfo.checkMode) { int main(int32_t argc, char* argv[]) {
if ((totalMsgs + pInfo->consumeMsgCnt) == g_stConfInfo.consumeMsgCnt) { parseArgument(argc, argv);
printf("success"); getConsumeInfo();
} else { initLogFile();
printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
} TdThreadAttr thattr;
} else if (1 == g_stConfInfo.checkMode) { taosThreadAttrInit(&thattr);
if ((totalMsgs == g_stConfInfo.consumeMsgCnt) && (pInfo->consumeMsgCnt == g_stConfInfo.consumeMsgCnt)) { taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);
printf("success");
} else { // pthread_create one thread to consume
printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt); for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) {
} taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, consumeThreadFunc, (void *)(&(g_stConfInfo.stThreads[i])));
} else if (2 == g_stConfInfo.checkMode) { }
if ((totalMsgs + pInfo->consumeMsgCnt) == 3 * g_stConfInfo.consumeMsgCnt) {
printf("success"); for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
} else { taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL);
printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
}
} else if (3 == g_stConfInfo.checkMode) {
if ((totalMsgs == 2 * g_stConfInfo.consumeMsgCnt) && (pInfo->consumeMsgCnt == 2 * g_stConfInfo.consumeMsgCnt)) {
printf("success");
} else {
printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
}
} else if (4 == g_stConfInfo.checkMode) {
if (((totalMsgs == 0) && (pInfo->consumeMsgCnt == 3 * g_stConfInfo.consumeMsgCnt)) ||
((pInfo->consumeMsgCnt == 0) && (totalMsgs == 3 * g_stConfInfo.consumeMsgCnt)) ||
((pInfo->consumeMsgCnt == g_stConfInfo.consumeMsgCnt) && (totalMsgs == 2 * g_stConfInfo.consumeMsgCnt)) ||
((pInfo->consumeMsgCnt == 2 * g_stConfInfo.consumeMsgCnt) && (totalMsgs == g_stConfInfo.consumeMsgCnt))) {
printf("success");
} else {
printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
}
} else {
printf("fail, check mode unknow. consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
}
} }
//printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);
taosFprintfFile(g_fp, "\n");
taosCloseFile(&g_fp);
return 0; return 0;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册