提交 f06407b9 编写于 作者: P plum-lihui

[test: modify consume tool]

上级 5747629c
...@@ -31,46 +31,49 @@ ...@@ -31,46 +31,49 @@
#define NC "\033[0m" #define NC "\033[0m"
#define min(a, b) (((a) < (b)) ? (a) : (b)) #define min(a, b) (((a) < (b)) ? (a) : (b))
#define MAX_SQL_STR_LEN (1024 * 1024) #define MAX_SQL_STR_LEN (1024 * 1024)
#define MAX_ROW_STR_LEN (16 * 1024) #define MAX_ROW_STR_LEN (16 * 1024)
#define MAX_CONSUMER_THREAD_CNT (16) #define MAX_CONSUMER_THREAD_CNT (16)
typedef struct { typedef struct {
TdThread thread; TdThread thread;
int32_t consumerId; int32_t consumerId;
int32_t ifCheckData; int32_t ifCheckData;
int64_t expectMsgCnt; int64_t expectMsgCnt;
int64_t consumeMsgCnt;
int64_t consumeRowCnt;
int32_t checkresult;
int64_t consumeMsgCnt; char topicString[1024];
int32_t checkresult; char keyString[1024];
char topicString[1024]; int32_t numOfTopic;
char keyString[1024]; char topics[32][64];
int32_t numOfTopic; int32_t numOfKey;
char topics[32][64]; char key[32][64];
char value[32][64];
int32_t numOfKey;
char key[32][64];
char value[32][64];
tmq_t* tmq; tmq_t* tmq;
tmq_list_t* topicList; tmq_list_t* topicList;
} SThreadInfo; } SThreadInfo;
typedef struct { typedef struct {
// input from argvs // input from argvs
char dbName[32]; char cdbName[32];
int32_t showMsgFlag; char dbName[32];
int32_t consumeDelay; // unit s int32_t showMsgFlag;
int32_t numOfThread; int32_t showRowFlag;
SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT]; int32_t consumeDelay; // unit s
int32_t numOfThread;
SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT];
} SConfInfo; } SConfInfo;
static SConfInfo g_stConfInfo; static SConfInfo g_stConfInfo;
TdFilePtr g_fp = NULL; TdFilePtr g_fp = NULL;
// char* g_pRowValue = NULL; // char* g_pRowValue = NULL;
// TdFilePtr g_fp = NULL; // TdFilePtr g_fp = NULL;
...@@ -85,51 +88,62 @@ static void printHelp() { ...@@ -85,51 +88,62 @@ static void printHelp() {
printf("%s%s%s\n", indent, indent, "The name of the database for cosumer, no default "); printf("%s%s%s\n", indent, indent, "The name of the database for cosumer, no default ");
printf("%s%s\n", indent, "-g"); printf("%s%s\n", indent, "-g");
printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag); printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag);
printf("%s%s\n", indent, "-r");
printf("%s%s%s%d\n", indent, indent, "showRowFlag, default is ", g_stConfInfo.showRowFlag);
printf("%s%s\n", indent, "-y"); printf("%s%s\n", indent, "-y");
printf("%s%s%s%d\n", indent, indent, "consume delay, default is s", g_stConfInfo.consumeDelay); printf("%s%s%s%d\n", indent, indent, "consume delay, default is s", g_stConfInfo.consumeDelay);
exit(EXIT_SUCCESS); exit(EXIT_SUCCESS);
} }
void initLogFile() { void initLogFile() {
// FILE *fp = fopen(g_stConfInfo.resultFileName, "a"); // FILE *fp = fopen(g_stConfInfo.resultFileName, "a");
TdFilePtr pFile = taosOpenFile("./tmqlog.txt", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_STREAM); char file[256];
sprintf(file, "%s/../log/tmqlog.txt", configDir);
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_STREAM);
if (NULL == pFile) { if (NULL == pFile) {
fprintf(stderr, "Failed to open %s for save result\n", "./tmqlog.txt"); fprintf(stderr, "Failed to open %s for save result\n", "./tmqlog.txt");
exit - 1; exit -1;
}; };
g_fp = pFile; g_fp = pFile;
}
void saveConfigToLogFile() {
time_t tTime = taosGetTimestampSec(); time_t tTime = taosGetTimestampSec();
struct tm tm = *taosLocalTime(&tTime, NULL); struct tm tm = *taosLocalTime(&tTime, NULL);
taosFprintfFile(pFile, "###################################################################\n"); taosFprintfFile(g_fp, "###################################################################\n");
taosFprintfFile(pFile, "# configDir: %s\n", configDir); taosFprintfFile(g_fp, "# configDir: %s\n", configDir);
taosFprintfFile(pFile, "# dbName: %s\n", g_stConfInfo.dbName); taosFprintfFile(g_fp, "# dbName: %s\n", g_stConfInfo.dbName);
taosFprintfFile(pFile, "# showMsgFlag: %d\n", g_stConfInfo.showMsgFlag); taosFprintfFile(g_fp, "# cdbName: %s\n", g_stConfInfo.cdbName);
taosFprintfFile(pFile, "# consumeDelay: %d\n", g_stConfInfo.consumeDelay); taosFprintfFile(g_fp, "# showMsgFlag: %d\n", g_stConfInfo.showMsgFlag);
taosFprintfFile(g_fp, "# showRowFlag: %d\n", g_stConfInfo.showRowFlag);
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { taosFprintfFile(g_fp, "# consumeDelay: %d\n", g_stConfInfo.consumeDelay);
taosFprintfFile(pFile, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId);
taosFprintfFile(pFile, " Topics: "); for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
for (int i = 0; i < g_stConfInfo.stThreads[i].numOfTopic; i++) { taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId);
taosFprintfFile(pFile, "%s, ", g_stConfInfo.stThreads[i].topics[i]); taosFprintfFile(g_fp, " Topics: ");
for (int i = 0 ; i < g_stConfInfo.stThreads[i].numOfTopic; i++) {
taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[i]);
} }
taosFprintfFile(pFile, "\n"); taosFprintfFile(g_fp, "\n");
taosFprintfFile(pFile, " Key: "); taosFprintfFile(g_fp, " Key: ");
for (int i = 0; i < g_stConfInfo.stThreads[i].numOfKey; i++) { for (int i = 0 ; i < g_stConfInfo.stThreads[i].numOfKey; i++) {
taosFprintfFile(pFile, "%s:%s, ", g_stConfInfo.stThreads[i].key[i], g_stConfInfo.stThreads[i].value[i]); taosFprintfFile(g_fp, "%s:%s, ", g_stConfInfo.stThreads[i].key[i], g_stConfInfo.stThreads[i].value[i]);
} }
taosFprintfFile(pFile, "\n"); taosFprintfFile(g_fp, "\n");
} }
taosFprintfFile(pFile, "# Test time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1, taosFprintfFile(g_fp, "# Test time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1,
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec); tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
taosFprintfFile(pFile, "###################################################################\n"); taosFprintfFile(g_fp, "###################################################################\n");
} }
void parseArgument(int32_t argc, char* argv[]) { void parseArgument(int32_t argc, char* argv[]) {
memset(&g_stConfInfo, 0, sizeof(SConfInfo)); memset(&g_stConfInfo, 0, sizeof(SConfInfo));
g_stConfInfo.showMsgFlag = 0; g_stConfInfo.showMsgFlag = 0;
g_stConfInfo.showRowFlag = 0;
g_stConfInfo.consumeDelay = 5; g_stConfInfo.consumeDelay = 5;
for (int32_t i = 1; i < argc; i++) { for (int32_t i = 1; i < argc; i++) {
...@@ -138,10 +152,14 @@ void parseArgument(int32_t argc, char* argv[]) { ...@@ -138,10 +152,14 @@ void parseArgument(int32_t argc, char* argv[]) {
exit(0); exit(0);
} else if (strcmp(argv[i], "-d") == 0) { } else if (strcmp(argv[i], "-d") == 0) {
strcpy(g_stConfInfo.dbName, argv[++i]); strcpy(g_stConfInfo.dbName, argv[++i]);
} else if (strcmp(argv[i], "-w") == 0) {
strcpy(g_stConfInfo.cdbName, argv[++i]);
} else if (strcmp(argv[i], "-c") == 0) { } else if (strcmp(argv[i], "-c") == 0) {
strcpy(configDir, argv[++i]); strcpy(configDir, argv[++i]);
} else if (strcmp(argv[i], "-g") == 0) { } else if (strcmp(argv[i], "-g") == 0) {
g_stConfInfo.showMsgFlag = atol(argv[++i]); g_stConfInfo.showMsgFlag = atol(argv[++i]);
} else if (strcmp(argv[i], "-r") == 0) {
g_stConfInfo.showRowFlag = atol(argv[++i]);
} else if (strcmp(argv[i], "-y") == 0) { } else if (strcmp(argv[i], "-y") == 0) {
g_stConfInfo.consumeDelay = atol(argv[++i]); g_stConfInfo.consumeDelay = atol(argv[++i]);
} else { } else {
...@@ -150,11 +168,17 @@ void parseArgument(int32_t argc, char* argv[]) { ...@@ -150,11 +168,17 @@ void parseArgument(int32_t argc, char* argv[]) {
} }
} }
initLogFile();
taosFprintfFile(g_fp, "====parseArgument() success\n");
#if 1 #if 1
pPrint("%s configDir:%s %s", GREEN, configDir, NC); pPrint("%s configDir:%s %s", GREEN, configDir, NC);
pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC); pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC);
pPrint("%s cdbName:%s %s", GREEN, g_stConfInfo.cdbName, NC);
pPrint("%s consumeDelay:%d %s", GREEN, g_stConfInfo.consumeDelay, NC); pPrint("%s consumeDelay:%d %s", GREEN, g_stConfInfo.consumeDelay, NC);
pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC); pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC);
pPrint("%s showRowFlag:%d %s", GREEN, g_stConfInfo.showRowFlag, NC);
#endif #endif
} }
...@@ -180,24 +204,29 @@ void ltrim(char* str) { ...@@ -180,24 +204,29 @@ void ltrim(char* str) {
// return str; // return str;
} }
static int running = 1; static int running = 1;
static void msg_process(TAOS_RES* msg, int32_t msgIndex, int32_t threadLable) { static int32_t msg_process(TAOS_RES* msg, int64_t msgIndex, int32_t threadLable) {
char buf[1024]; char buf[1024];
int32_t totalRows = 0;
// printf("topic: %s\n", tmq_get_topic_name(msg)); //printf("topic: %s\n", tmq_get_topic_name(msg));
// printf("vg:%d\n", tmq_get_vgroup_id(msg)); //printf("vg:%d\n", tmq_get_vgroup_id(msg));
taosFprintfFile(g_fp, "msg index:%d, threadLable: %d\n", msgIndex, threadLable); taosFprintfFile(g_fp, "msg index:%" PRId64 ", threadLable: %d\n", msgIndex, threadLable);
taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), tmq_get_vgroup_id(msg)); taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), tmq_get_vgroup_id(msg));
while (1) { while (1) {
TAOS_ROW row = taos_fetch_row(msg); TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break; if (row == NULL) break;
TAOS_FIELD* fields = taos_fetch_fields(msg); if (0 != g_stConfInfo.showRowFlag) {
int32_t numOfFields = taos_field_count(msg); TAOS_FIELD* fields = taos_fetch_fields(msg);
// taos_print_row(buf, row, fields, numOfFields); int32_t numOfFields = taos_field_count(msg);
// printf("%s\n", buf); taos_print_row(buf, row, fields, numOfFields);
// taosFprintfFile(g_fp, "%s\n", buf); taosFprintfFile(g_fp, "rows[%d]: %s\n", totalRows, buf);
}
totalRows++;
} }
return totalRows;
} }
int queryDB(TAOS* taos, char* command) { int queryDB(TAOS* taos, char* command) {
...@@ -213,31 +242,43 @@ int queryDB(TAOS* taos, char* command) { ...@@ -213,31 +242,43 @@ int queryDB(TAOS* taos, char* command) {
return 0; return 0;
} }
void build_consumer(SThreadInfo* pInfo) { static void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets) {
char sqlStr[1024] = {0}; printf("tmq_commit_cb_print() commit %d\n", resp);
}
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); void build_consumer(SThreadInfo *pInfo) {
assert(pConn != NULL); tmq_conf_t* conf = tmq_conf_new();
sprintf(sqlStr, "use %s", g_stConfInfo.dbName); //tmq_conf_set(conf, "td.connect.ip", "localhost");
TAOS_RES* pRes = taos_query(pConn, sqlStr); //tmq_conf_set(conf, "td.connect.port", "6030");
if (taos_errno(pRes) != 0) { tmq_conf_set(conf, "td.connect.user", "root");
printf("error in use db, reason:%s\n", taos_errstr(pRes)); tmq_conf_set(conf, "td.connect.pass", "taosdata");
taos_free_result(pRes);
exit(-1);
}
taos_free_result(pRes);
tmq_conf_t* conf = tmq_conf_new(); tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
// tmq_conf_set(conf, "group.id", "tg2");
tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print);
// tmq_conf_set(conf, "group.id", "cgrp1");
for (int32_t i = 0; i < pInfo->numOfKey; i++) { for (int32_t i = 0; i < pInfo->numOfKey; i++) {
tmq_conf_set(conf, pInfo->key[i], pInfo->value[i]); tmq_conf_set(conf, pInfo->key[i], pInfo->value[i]);
} }
//tmq_conf_set(conf, "client.id", "c-001");
//tmq_conf_set(conf, "enable.auto.commit", "true");
//tmq_conf_set(conf, "enable.auto.commit", "false");
//tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
//tmq_conf_set(conf, "auto.offset.reset", "none");
//tmq_conf_set(conf, "auto.offset.reset", "earliest");
//tmq_conf_set(conf, "auto.offset.reset", "latest");
pInfo->tmq = tmq_consumer_new(conf, NULL, 0); pInfo->tmq = tmq_consumer_new(conf, NULL, 0);
return; return;
} }
void build_topic_list(SThreadInfo* pInfo) { void build_topic_list(SThreadInfo *pInfo) {
pInfo->topicList = tmq_list_new(); pInfo->topicList = tmq_list_new();
// tmq_list_append(topic_list, "test_stb_topic_1"); // tmq_list_append(topic_list, "test_stb_topic_1");
for (int32_t i = 0; i < pInfo->numOfTopic; i++) { for (int32_t i = 0; i < pInfo->numOfTopic; i++) {
...@@ -246,45 +287,49 @@ void build_topic_list(SThreadInfo* pInfo) { ...@@ -246,45 +287,49 @@ void build_topic_list(SThreadInfo* pInfo) {
return; return;
} }
int32_t saveConsumeResult(SThreadInfo* pInfo) { int32_t saveConsumeResult(SThreadInfo *pInfo) {
char sqlStr[1024] = {0}; char sqlStr[1024] = {0};
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
assert(pConn != NULL); assert(pConn != NULL);
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %d)", g_stConfInfo.dbName, sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %" PRId64 ", %d)",
pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->checkresult); g_stConfInfo.cdbName,
pInfo->consumerId,
pInfo->consumeMsgCnt,
pInfo->consumeRowCnt,
pInfo->checkresult);
TAOS_RES* pRes = taos_query(pConn, sqlStr); TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in save consumeinfo, reason:%s\n", taos_errstr(pRes)); printf("error in save consumeinfo, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes); taos_free_result(pRes);
exit(-1); exit(-1);
} }
taos_free_result(pRes); taos_free_result(pRes);
return 0; return 0;
} }
void loop_consume(SThreadInfo* pInfo) { void loop_consume(SThreadInfo *pInfo) {
tmq_resp_err_t err; tmq_resp_err_t err;
int64_t totalMsgs = 0; int64_t totalMsgs = 0;
// int64_t totalRows = 0; int64_t totalRows = 0;
while (running) { while (running) {
TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, g_stConfInfo.consumeDelay * 1000); TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, g_stConfInfo.consumeDelay * 1000);
if (tmqMsg) { if (tmqMsg) {
if (0 != g_stConfInfo.showMsgFlag) { if (0 != g_stConfInfo.showMsgFlag) {
msg_process(tmqMsg, totalMsgs, 0); totalRows += msg_process(tmqMsg, totalMsgs, pInfo->consumerId);
} }
taos_free_result(tmqMsg); taos_free_result(tmqMsg);
totalMsgs++; totalMsgs++;
if (totalMsgs >= pInfo->expectMsgCnt) { if (totalMsgs >= pInfo->expectMsgCnt) {
break; break;
} }
...@@ -292,7 +337,7 @@ void loop_consume(SThreadInfo* pInfo) { ...@@ -292,7 +337,7 @@ void loop_consume(SThreadInfo* pInfo) {
break; break;
} }
} }
err = tmq_consumer_close(pInfo->tmq); err = tmq_consumer_close(pInfo->tmq);
if (err) { if (err) {
printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
...@@ -300,34 +345,38 @@ void loop_consume(SThreadInfo* pInfo) { ...@@ -300,34 +345,38 @@ void loop_consume(SThreadInfo* pInfo) {
} }
pInfo->consumeMsgCnt = totalMsgs; pInfo->consumeMsgCnt = totalMsgs;
pInfo->consumeRowCnt = totalRows;
taosFprintfFile(g_fp, "==== consumerId: %d, consumeMsgCnt: %"PRId64", consumeRowCnt: %"PRId64"\n", pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt);
} }
void* consumeThreadFunc(void* param) { void *consumeThreadFunc(void *param) {
int32_t totalMsgs = 0; int32_t totalMsgs = 0;
SThreadInfo* pInfo = (SThreadInfo*)param; SThreadInfo *pInfo = (SThreadInfo *)param;
build_consumer(pInfo); build_consumer(pInfo);
build_topic_list(pInfo); build_topic_list(pInfo);
if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) { if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)){
return NULL; return NULL;
} }
tmq_resp_err_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList); tmq_resp_err_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList);
if (err) { if (err) {
printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
exit(-1); exit(-1);
} }
loop_consume(pInfo); loop_consume(pInfo);
err = tmq_unsubscribe(pInfo->tmq); err = tmq_unsubscribe(pInfo->tmq);
if (err) { if (err) {
printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
pInfo->consumeMsgCnt = -1; pInfo->consumeMsgCnt = -1;
return NULL; return NULL;
} }
// save consume result into consumeresult table // save consume result into consumeresult table
saveConsumeResult(pInfo); saveConsumeResult(pInfo);
...@@ -339,7 +388,7 @@ void parseConsumeInfo() { ...@@ -339,7 +388,7 @@ void parseConsumeInfo() {
const char delim[2] = ","; const char delim[2] = ",";
const char ch = ':'; const char ch = ':';
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
token = strtok(g_stConfInfo.stThreads[i].topicString, delim); token = strtok(g_stConfInfo.stThreads[i].topicString, delim);
while (token != NULL) { while (token != NULL) {
// printf("%s\n", token ); // printf("%s\n", token );
...@@ -347,10 +396,10 @@ void parseConsumeInfo() { ...@@ -347,10 +396,10 @@ void parseConsumeInfo() {
ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]); ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]);
// printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]); // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
g_stConfInfo.stThreads[i].numOfTopic++; g_stConfInfo.stThreads[i].numOfTopic++;
token = strtok(NULL, delim); token = strtok(NULL, delim);
} }
token = strtok(g_stConfInfo.stThreads[i].keyString, delim); token = strtok(g_stConfInfo.stThreads[i].keyString, delim);
while (token != NULL) { while (token != NULL) {
// printf("%s\n", token ); // printf("%s\n", token );
...@@ -364,7 +413,7 @@ void parseConsumeInfo() { ...@@ -364,7 +413,7 @@ void parseConsumeInfo() {
// g_stConfInfo.value[g_stConfInfo.numOfKey]); // g_stConfInfo.value[g_stConfInfo.numOfKey]);
g_stConfInfo.stThreads[i].numOfKey++; g_stConfInfo.stThreads[i].numOfKey++;
} }
token = strtok(NULL, delim); token = strtok(NULL, delim);
} }
} }
...@@ -372,47 +421,48 @@ void parseConsumeInfo() { ...@@ -372,47 +421,48 @@ void parseConsumeInfo() {
int32_t getConsumeInfo() { int32_t getConsumeInfo() {
char sqlStr[1024] = {0}; char sqlStr[1024] = {0};
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
assert(pConn != NULL); assert(pConn != NULL);
sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.dbName); sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.cdbName);
TAOS_RES* pRes = taos_query(pConn, sqlStr); TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in get consumeinfo, reason:%s\n", taos_errstr(pRes)); printf("error in get consumeinfo, reason:%s\n", taos_errstr(pRes));
taosFprintfFile(g_fp, "error in get consumeinfo, reason:%s\n", taos_errstr(pRes));
taosCloseFile(&g_fp);
taos_free_result(pRes); taos_free_result(pRes);
exit(-1); exit(-1);
} }
TAOS_ROW row = NULL; TAOS_ROW row = NULL;
int num_fields = taos_num_fields(pRes); int num_fields = taos_num_fields(pRes);
TAOS_FIELD* fields = taos_fetch_fields(pRes); TAOS_FIELD* fields = taos_fetch_fields(pRes);
// schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, // schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int
// ifcheckdata int
int32_t numOfThread = 0; int32_t numOfThread = 0;
while ((row = taos_fetch_row(pRes))) { while ((row = taos_fetch_row(pRes))) {
int32_t* lengths = taos_fetch_lengths(pRes); int32_t* lengths = taos_fetch_lengths(pRes);
for (int i = 0; i < num_fields; ++i) { for (int i = 0; i < num_fields; ++i) {
if (row[i] == NULL || 0 == i) { if (row[i] == NULL || 0 == i) {
continue; continue;
} }
if ((1 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) { if ((1 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
g_stConfInfo.stThreads[numOfThread].consumerId = *((int32_t*)row[i]); g_stConfInfo.stThreads[numOfThread].consumerId = *((int32_t *)row[i]);
} else if ((2 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) { } else if ((2 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
memcpy(g_stConfInfo.stThreads[numOfThread].topicString, row[i], lengths[i]); memcpy(g_stConfInfo.stThreads[numOfThread].topicString, row[i], lengths[i]);
} else if ((3 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) { } else if ((3 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
memcpy(g_stConfInfo.stThreads[numOfThread].keyString, row[i], lengths[i]); memcpy(g_stConfInfo.stThreads[numOfThread].keyString, row[i], lengths[i]);
} else if ((4 == i) && (fields[i].type == TSDB_DATA_TYPE_BIGINT)) { } else if ((4 == i) && (fields[i].type == TSDB_DATA_TYPE_BIGINT)) {
g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t*)row[i]); g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t *)row[i]);
} else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) { } else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t*)row[i]); g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t *)row[i]);
} }
} }
numOfThread++; numOfThread ++;
} }
g_stConfInfo.numOfThread = numOfThread; g_stConfInfo.numOfThread = numOfThread;
...@@ -423,10 +473,11 @@ int32_t getConsumeInfo() { ...@@ -423,10 +473,11 @@ int32_t getConsumeInfo() {
return 0; return 0;
} }
int main(int32_t argc, char* argv[]) { int main(int32_t argc, char* argv[]) {
parseArgument(argc, argv); parseArgument(argc, argv);
getConsumeInfo(); getConsumeInfo();
initLogFile(); saveConfigToLogFile();
TdThreadAttr thattr; TdThreadAttr thattr;
taosThreadAttrInit(&thattr); taosThreadAttrInit(&thattr);
...@@ -434,19 +485,18 @@ int main(int32_t argc, char* argv[]) { ...@@ -434,19 +485,18 @@ int main(int32_t argc, char* argv[]) {
// pthread_create one thread to consume // pthread_create one thread to consume
for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) { for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) {
taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, consumeThreadFunc, taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, consumeThreadFunc, (void *)(&(g_stConfInfo.stThreads[i])));
(void*)(&(g_stConfInfo.stThreads[i])));
} }
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL); taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL);
} }
// printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt); //printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);
taosFprintfFile(g_fp, "\n"); taosFprintfFile(g_fp, "\n");
taosCloseFile(&g_fp); taosCloseFile(&g_fp);
return 0; return 0;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册