提交 41d412fb 编写于 作者: P plum-lihui

test: modify consume processor

上级 31b20998
......@@ -61,6 +61,10 @@ typedef struct {
tmq_t* tmq;
tmq_list_t* topicList;
int32_t numOfVgroups;
int32_t rowsOfPerVgroups[32][2]; // [i][0]: vgroup id, [i][1]: rows of consume
int64_t ts;
} SThreadInfo;
......@@ -69,7 +73,8 @@ typedef struct {
char cdbName[32];
char dbName[32];
int32_t showMsgFlag;
int32_t showRowFlag;
int32_t showRowFlag;
int32_t saveRowFlag;
int32_t consumeDelay; // unit s
int32_t numOfThread;
SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT];
......@@ -77,6 +82,7 @@ typedef struct {
static SConfInfo g_stConfInfo;
TdFilePtr g_fp = NULL;
static int running = 1;
// char* g_pRowValue = NULL;
// TdFilePtr g_fp = NULL;
......@@ -93,6 +99,8 @@ static void printHelp() {
printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag);
printf("%s%s\n", indent, "-r");
printf("%s%s%s%d\n", indent, indent, "showRowFlag, default is ", g_stConfInfo.showRowFlag);
printf("%s%s\n", indent, "-s");
printf("%s%s%s%d\n", indent, indent, "saveRowFlag, default is ", g_stConfInfo.saveRowFlag);
printf("%s%s\n", indent, "-y");
printf("%s%s%s%d\n", indent, indent, "consume delay, default is s", g_stConfInfo.consumeDelay);
exit(EXIT_SUCCESS);
......@@ -135,6 +143,7 @@ void saveConfigToLogFile() {
taosFprintfFile(g_fp, "# cdbName: %s\n", g_stConfInfo.cdbName);
taosFprintfFile(g_fp, "# showMsgFlag: %d\n", g_stConfInfo.showMsgFlag);
taosFprintfFile(g_fp, "# showRowFlag: %d\n", g_stConfInfo.showRowFlag);
taosFprintfFile(g_fp, "# saveRowFlag: %d\n", g_stConfInfo.saveRowFlag);
taosFprintfFile(g_fp, "# consumeDelay: %d\n", g_stConfInfo.consumeDelay);
taosFprintfFile(g_fp, "# numOfThread: %d\n", g_stConfInfo.numOfThread);
......@@ -165,6 +174,7 @@ void parseArgument(int32_t argc, char* argv[]) {
memset(&g_stConfInfo, 0, sizeof(SConfInfo));
g_stConfInfo.showMsgFlag = 0;
g_stConfInfo.showRowFlag = 0;
g_stConfInfo.saveRowFlag = 0;
g_stConfInfo.consumeDelay = 5;
for (int32_t i = 1; i < argc; i++) {
......@@ -181,6 +191,8 @@ void parseArgument(int32_t argc, char* argv[]) {
g_stConfInfo.showMsgFlag = atol(argv[++i]);
} else if (strcmp(argv[i], "-r") == 0) {
g_stConfInfo.showRowFlag = atol(argv[++i]);
} else if (strcmp(argv[i], "-s") == 0) {
g_stConfInfo.saveRowFlag = atol(argv[++i]);
} else if (strcmp(argv[i], "-y") == 0) {
g_stConfInfo.consumeDelay = atol(argv[++i]);
} else {
......@@ -200,6 +212,7 @@ void parseArgument(int32_t argc, char* argv[]) {
pPrint("%s consumeDelay:%d %s", GREEN, g_stConfInfo.consumeDelay, NC);
pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC);
pPrint("%s showRowFlag:%d %s", GREEN, g_stConfInfo.showRowFlag, NC);
pPrint("%s saveRowFlag:%d %s", GREEN, g_stConfInfo.saveRowFlag, NC);
#endif
}
......@@ -225,15 +238,52 @@ void ltrim(char* str) {
// return str;
}
static int running = 1;
static int32_t msg_process(TAOS_RES* msg, int64_t msgIndex, int32_t threadLable) {
void addRowsToVgroupId(SThreadInfo* pInfo, int32_t vgroupId, int32_t rows) {
int32_t i;
for (i = 0; i < pInfo->numOfVgroups; i++) {
if (vgroupId == pInfo->rowsOfPerVgroups[i][0]) {
pInfo->rowsOfPerVgroups[i][1] += rows;
}
}
if (i == pInfo->numOfVgroups) {
pInfo->rowsOfPerVgroups[i][0] = vgroupId;
pInfo->rowsOfPerVgroups[i][1] += rows;
pInfo->numOfVgroups++;
}
}
int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) {
char sqlStr[1024] = {0};
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
assert(pConn != NULL);
sprintf(sqlStr, "insert into %s.content_%d values (%"PRId64", \'%s\')", g_stConfInfo.cdbName, pInfo->consumerId, pInfo->ts++, buf);
TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) {
pError("error in insert consume result, reason:%s\n", taos_errstr(pRes));
taosFprintfFile(g_fp, "error in insert consume result, reason:%s\n", taos_errstr(pRes));
taosCloseFile(&g_fp);
taos_free_result(pRes);
exit(-1);
}
taos_free_result(pRes);
return 0;
}
static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t threadLable) {
char buf[1024];
int32_t totalRows = 0;
// printf("topic: %s\n", tmq_get_topic_name(msg));
// printf("vg:%d\n", tmq_get_vgroup_id(msg));
taosFprintfFile(g_fp, "msg index:%" PRId64 ", threadLable: %d\n", msgIndex, threadLable);
taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), tmq_get_vgroup_id(msg));
int32_t vgroupId = tmq_get_vgroup_id(msg);
taosFprintfFile(g_fp, "msg index:%" PRId64 ", threadLable: %d\n", threadLable, pInfo->consumerId);
taosFprintfFile(g_fp, "topic: %s, vgroupId: %d, tableName: %s\n", tmq_get_topic_name(msg), vgroupId, tmq_get_table_name(msg));
while (1) {
TAOS_ROW row = taos_fetch_row(msg);
......@@ -247,11 +297,16 @@ static int32_t msg_process(TAOS_RES* msg, int64_t msgIndex, int32_t threadLable)
if (0 != g_stConfInfo.showRowFlag) {
taosFprintfFile(g_fp, "rows[%d]: %s\n", totalRows, buf);
if (0 != g_stConfInfo.saveRowFlag) {
saveConsumeContentToTbl(pInfo, buf);
}
}
totalRows++;
}
addRowsToVgroupId(pInfo, vgroupId, totalRows);
return totalRows;
}
......@@ -344,6 +399,32 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
taos_free_result(pRes);
#if 0
// vgroups
for (i = 0; i < pInfo->numOfVgroups; i++) {
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
sprintf(sqlStr, "insert into %s.vgroup_%d values (%"PRId64", %d, %" PRId64 ", %" PRId64 ", %d)",
g_stConfInfo.cdbName,
now,
pInfo->consumerId,
pInfo->consumeMsgCnt,
pInfo->consumeRowCnt,
pInfo->checkresult);
char tmpString[128];
taosFprintfFile(g_fp, "%s, consume id %d result: %s\n", getCurrentTimeString(tmpString), pInfo->consumerId ,sqlStr);
TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) {
pError("error in save consumeinfo, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
exit(-1);
}
taos_free_result(pRes);
}
#endif
return 0;
}
......@@ -356,11 +437,13 @@ void loop_consume(SThreadInfo* pInfo) {
char tmpString[128];
taosFprintfFile(g_fp, "%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString), pInfo->consumerId);
pInfo->ts = taosGetTimestampMs();
while (running) {
TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, g_stConfInfo.consumeDelay * 1000);
if (tmqMsg) {
if (0 != g_stConfInfo.showMsgFlag) {
totalRows += msg_process(tmqMsg, totalMsgs, pInfo->consumerId);
totalRows += msg_process(tmqMsg, pInfo, totalMsgs);
}
taos_free_result(tmqMsg);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册