提交 4cd660b3 编写于 作者: S Shengliang Guan

refactor: format tmqSim.c

上级 f08a183a
...@@ -14,13 +14,13 @@ ...@@ -14,13 +14,13 @@
*/ */
#include <assert.h> #include <assert.h>
#include <math.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/types.h> #include <sys/types.h>
#include <time.h> #include <time.h>
#include <math.h>
#include "taos.h" #include "taos.h"
#include "taosdef.h" #include "taosdef.h"
...@@ -36,7 +36,7 @@ ...@@ -36,7 +36,7 @@
#define MAX_ROW_STR_LEN (16 * 1024) #define MAX_ROW_STR_LEN (16 * 1024)
#define MAX_CONSUMER_THREAD_CNT (16) #define MAX_CONSUMER_THREAD_CNT (16)
#define MAX_VGROUP_CNT (32) #define MAX_VGROUP_CNT (32)
#define SEND_TIME_UNIT 10 // ms #define SEND_TIME_UNIT 10 // ms
#define MAX_SQL_LEN 1048576 #define MAX_SQL_LEN 1048576
typedef enum { typedef enum {
...@@ -45,11 +45,7 @@ typedef enum { ...@@ -45,11 +45,7 @@ typedef enum {
NOTIFY_CMD_ID_BUTT, NOTIFY_CMD_ID_BUTT,
} NOTIFY_CMD_ID; } NOTIFY_CMD_ID;
typedef enum enumQUERY_TYPE { typedef enum enumQUERY_TYPE { NO_INSERT_TYPE, INSERT_TYPE, QUERY_TYPE_BUT } QUERY_TYPE;
NO_INSERT_TYPE,
INSERT_TYPE,
QUERY_TYPE_BUT
} QUERY_TYPE;
typedef struct { typedef struct {
TdThread thread; TdThread thread;
...@@ -61,7 +57,7 @@ typedef struct { ...@@ -61,7 +57,7 @@ typedef struct {
// char autoOffsetRest[16]; // none, earliest, latest // char autoOffsetRest[16]; // none, earliest, latest
TdFilePtr pConsumeRowsFile; TdFilePtr pConsumeRowsFile;
TdFilePtr pConsumeMetaFile; TdFilePtr pConsumeMetaFile;
int32_t ifCheckData; int32_t ifCheckData;
int64_t expectMsgCnt; int64_t expectMsgCnt;
...@@ -87,12 +83,12 @@ typedef struct { ...@@ -87,12 +83,12 @@ typedef struct {
int32_t rowsOfPerVgroups[MAX_VGROUP_CNT][2]; // [i][0]: vgroup id, [i][1]: rows of consume int32_t rowsOfPerVgroups[MAX_VGROUP_CNT][2]; // [i][0]: vgroup id, [i][1]: rows of consume
int64_t ts; int64_t ts;
TAOS* taos; TAOS* taos;
// below parameters is used by omb test // below parameters is used by omb test
int32_t producerRate; // unit: msgs/s int32_t producerRate; // unit: msgs/s
int64_t totalProduceMsgs; int64_t totalProduceMsgs;
int64_t totalMsgsLen; int64_t totalMsgsLen;
} SThreadInfo; } SThreadInfo;
...@@ -112,12 +108,12 @@ typedef struct { ...@@ -112,12 +108,12 @@ typedef struct {
SThreadInfo stProdThreads[MAX_CONSUMER_THREAD_CNT]; SThreadInfo stProdThreads[MAX_CONSUMER_THREAD_CNT];
// below parameters is used by omb test // below parameters is used by omb test
char topic[64]; char topic[64];
int32_t producers; int32_t producers;
int32_t producerRate; int32_t producerRate;
int32_t runDurationMinutes; int32_t runDurationMinutes;
int32_t batchSize; int32_t batchSize;
int32_t payloadLen; int32_t payloadLen;
} SConfInfo; } SConfInfo;
static SConfInfo g_stConfInfo; static SConfInfo g_stConfInfo;
...@@ -146,14 +142,13 @@ static void printHelp() { ...@@ -146,14 +142,13 @@ static void printHelp() {
printf("%s%s%s%ds\n", indent, indent, "consume delay, default is ", g_stConfInfo.consumeDelay); printf("%s%s%s%ds\n", indent, indent, "consume delay, default is ", g_stConfInfo.consumeDelay);
printf("%s%s\n", indent, "-e"); printf("%s%s\n", indent, "-e");
printf("%s%s%s%d\n", indent, indent, "snapshot, default is ", g_stConfInfo.useSnapshot); printf("%s%s%s%d\n", indent, indent, "snapshot, default is ", g_stConfInfo.useSnapshot);
printf("%s%s\n", indent, "-t"); printf("%s%s\n", indent, "-t");
printf("%s%s%s\n", indent, indent, "topic name, default is null"); printf("%s%s%s\n", indent, indent, "topic name, default is null");
printf("%s%s\n", indent, "-x"); printf("%s%s\n", indent, "-x");
printf("%s%s%s\n", indent, indent, "consume thread number, default is 1"); printf("%s%s%s\n", indent, indent, "consume thread number, default is 1");
printf("%s%s\n", indent, "-l"); printf("%s%s\n", indent, "-l");
printf("%s%s%s%d\n", indent, indent, "run duration unit is minutes, default is ", g_stConfInfo.runDurationMinutes); printf("%s%s%s%d\n", indent, indent, "run duration unit is minutes, default is ", g_stConfInfo.runDurationMinutes);
printf("%s%s\n", indent, "-p"); printf("%s%s\n", indent, "-p");
...@@ -165,7 +160,6 @@ static void printHelp() { ...@@ -165,7 +160,6 @@ static void printHelp() {
printf("%s%s\n", indent, "-n"); printf("%s%s\n", indent, "-n");
printf("%s%s%s\n", indent, indent, "payload len unit is byte, default is 1000"); printf("%s%s%s\n", indent, indent, "payload len unit is byte, default is 1000");
exit(EXIT_SUCCESS); exit(EXIT_SUCCESS);
} }
...@@ -194,7 +188,7 @@ void initLogFile() { ...@@ -194,7 +188,7 @@ void initLogFile() {
pid_t process_id = getpid(); pid_t process_id = getpid();
if (0 != strlen(g_stConfInfo.topic)) { if (0 != strlen(g_stConfInfo.topic)) {
sprintf(filename, "/tmp/tmqlog-%d-%s.txt", process_id, getCurrentTimeString(tmpString)); sprintf(filename, "/tmp/tmqlog-%d-%s.txt", process_id, getCurrentTimeString(tmpString));
} else { } else {
sprintf(filename, "%s/../log/tmqlog-%d-%s.txt", configDir, process_id, getCurrentTimeString(tmpString)); sprintf(filename, "%s/../log/tmqlog-%d-%s.txt", configDir, process_id, getCurrentTimeString(tmpString));
} }
...@@ -294,7 +288,7 @@ void parseArgument(int32_t argc, char* argv[]) { ...@@ -294,7 +288,7 @@ void parseArgument(int32_t argc, char* argv[]) {
g_stConfInfo.producerRate = atol(argv[++i]); g_stConfInfo.producerRate = atol(argv[++i]);
} else if (strcmp(argv[i], "-n") == 0) { } else if (strcmp(argv[i], "-n") == 0) {
g_stConfInfo.payloadLen = atol(argv[++i]); g_stConfInfo.payloadLen = atol(argv[++i]);
if(g_stConfInfo.payloadLen <= 0 || g_stConfInfo.payloadLen > 1024 * 1024 * 1024){ if (g_stConfInfo.payloadLen <= 0 || g_stConfInfo.payloadLen > 1024 * 1024 * 1024) {
pError("%s calloc size is too large: %s %s", GREEN, argv[++i], NC); pError("%s calloc size is too large: %s %s", GREEN, argv[++i], NC);
exit(-1); exit(-1);
} }
...@@ -357,9 +351,9 @@ void ltrim(char* str) { ...@@ -357,9 +351,9 @@ void ltrim(char* str) {
} }
int queryDB(TAOS* taos, char* command) { int queryDB(TAOS* taos, char* command) {
int retryCnt = 10; int retryCnt = 10;
int code = 0; int code = 0;
TAOS_RES* pRes = NULL; TAOS_RES* pRes = NULL;
while (retryCnt--) { while (retryCnt--) {
pRes = taos_query(taos, command); pRes = taos_query(taos, command);
...@@ -379,7 +373,6 @@ int queryDB(TAOS* taos, char* command) { ...@@ -379,7 +373,6 @@ int queryDB(TAOS* taos, char* command) {
return -1; return -1;
} }
void addRowsToVgroupId(SThreadInfo* pInfo, int32_t vgroupId, int32_t rows) { void addRowsToVgroupId(SThreadInfo* pInfo, int32_t vgroupId, int32_t rows) {
int32_t i; int32_t i;
for (i = 0; i < pInfo->numOfVgroups; i++) { for (i = 0; i < pInfo->numOfVgroups; i++) {
...@@ -403,22 +396,21 @@ void addRowsToVgroupId(SThreadInfo* pInfo, int32_t vgroupId, int32_t rows) { ...@@ -403,22 +396,21 @@ void addRowsToVgroupId(SThreadInfo* pInfo, int32_t vgroupId, int32_t rows) {
} }
TAOS* createNewTaosConnect() { TAOS* createNewTaosConnect() {
TAOS* taos = NULL; TAOS* taos = NULL;
int32_t retryCnt = 10; int32_t retryCnt = 10;
while (retryCnt--) { while (retryCnt--) {
TAOS* taos = taos_connect(NULL, "root", "taosdata", NULL, 0); TAOS* taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
if (NULL != taos) { if (NULL != taos) {
return taos; return taos;
} }
taosSsleep(1); taosSsleep(1);
} }
taosFprintfFile(g_fp, "taos_connect() fail\n"); taosFprintfFile(g_fp, "taos_connect() fail\n");
return NULL; return NULL;
} }
int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) { int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) {
char sqlStr[1100] = {0}; char sqlStr[1100] = {0};
...@@ -440,7 +432,7 @@ int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) { ...@@ -440,7 +432,7 @@ int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) {
if (retCode != 0) { if (retCode != 0) {
taosFprintfFile(g_fp, "error in save consume content\n"); taosFprintfFile(g_fp, "error in save consume content\n");
taosCloseFile(&g_fp); taosCloseFile(&g_fp);
taos_close(pConn); taos_close(pConn);
exit(-1); exit(-1);
} }
...@@ -481,7 +473,7 @@ static char* shellFormatTimestamp(char* buf, int64_t val, int32_t precision) { ...@@ -481,7 +473,7 @@ static char* shellFormatTimestamp(char* buf, int64_t val, int32_t precision) {
struct tm ptm; struct tm ptm;
taosLocalTime(&tt, &ptm); taosLocalTime(&tt, &ptm);
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", &ptm); size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", &ptm);
if (precision == TSDB_TIME_PRECISION_NANO) { if (precision == TSDB_TIME_PRECISION_NANO) {
sprintf(buf + pos, ".%09d", ms); sprintf(buf + pos, ".%09d", ms);
...@@ -548,22 +540,20 @@ static void shellDumpFieldToFile(TdFilePtr pFile, const char* val, TAOS_FIELD* f ...@@ -548,22 +540,20 @@ static void shellDumpFieldToFile(TdFilePtr pFile, const char* val, TAOS_FIELD* f
break; break;
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_JSON: case TSDB_DATA_TYPE_JSON: {
{ int32_t bufIndex = 0;
int32_t bufIndex = 0; for (int32_t i = 0; i < length; i++) {
for (int32_t i = 0; i < length; i++) { buf[bufIndex] = val[i];
bufIndex++;
if (val[i] == '\"') {
buf[bufIndex] = val[i]; buf[bufIndex] = val[i];
bufIndex++; bufIndex++;
if (val[i] == '\"') {
buf[bufIndex] = val[i];
bufIndex++;
}
} }
buf[bufIndex] = 0;
taosFprintfFile(pFile, "%s%s%s", quotationStr, buf, quotationStr);
} }
break; buf[bufIndex] = 0;
taosFprintfFile(pFile, "%s%s%s", quotationStr, buf, quotationStr);
} break;
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
shellFormatTimestamp(buf, *(int64_t*)val, precision); shellFormatTimestamp(buf, *(int64_t*)val, precision);
taosFprintfFile(pFile, "%s%s%s", quotationStr, buf, quotationStr); taosFprintfFile(pFile, "%s%s%s", quotationStr, buf, quotationStr);
...@@ -635,7 +625,6 @@ static int32_t data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn ...@@ -635,7 +625,6 @@ static int32_t data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
return totalRows; return totalRows;
} }
static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex) { static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex) {
char buf[1024]; char buf[1024];
int32_t totalRows = 0; int32_t totalRows = 0;
...@@ -650,24 +639,24 @@ static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn ...@@ -650,24 +639,24 @@ static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
{ {
tmq_raw_data raw = {0}; tmq_raw_data raw = {0};
int32_t code = tmq_get_raw(msg, &raw); int32_t code = tmq_get_raw(msg, &raw);
if(code == TSDB_CODE_SUCCESS){ if (code == TSDB_CODE_SUCCESS) {
// int retCode = queryDB(pInfo->taos, "use metadb"); // int retCode = queryDB(pInfo->taos, "use metadb");
// if (retCode != 0) { // if (retCode != 0) {
// taosFprintfFile(g_fp, "error when use metadb\n"); // taosFprintfFile(g_fp, "error when use metadb\n");
// taosCloseFile(&g_fp); // taosCloseFile(&g_fp);
// exit(-1); // exit(-1);
// } // }
// taosFprintfFile(g_fp, "raw:%p\n", &raw); // taosFprintfFile(g_fp, "raw:%p\n", &raw);
// //
// tmq_write_raw(pInfo->taos, raw); // tmq_write_raw(pInfo->taos, raw);
} }
char* result = tmq_get_json_meta(msg); char* result = tmq_get_json_meta(msg);
if(result && strcmp(result, "") != 0){ if (result && strcmp(result, "") != 0) {
//printf("meta result: %s\n", result); // printf("meta result: %s\n", result);
taosFprintfFile(pInfo->pConsumeMetaFile, "%s\n", result); taosFprintfFile(pInfo->pConsumeMetaFile, "%s\n", result);
} }
tmq_free_json_meta(result); tmq_free_json_meta(result);
} }
...@@ -683,8 +672,8 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) { ...@@ -683,8 +672,8 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
char sqlStr[1024] = {0}; char sqlStr[1024] = {0};
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
sprintf(sqlStr, "insert into %s.notifyinfo values (%" PRId64 ", %d, %d)", g_stConfInfo.cdbName, atomic_fetch_add_64(&g_stConfInfo.nowTime, 1), cmdId, sprintf(sqlStr, "insert into %s.notifyinfo values (%" PRId64 ", %d, %d)", g_stConfInfo.cdbName,
pInfo->consumerId); atomic_fetch_add_64(&g_stConfInfo.nowTime, 1), cmdId, pInfo->consumerId);
taos_query_a(pInfo->taos, sqlStr, appNothing, NULL); taos_query_a(pInfo->taos, sqlStr, appNothing, NULL);
...@@ -695,15 +684,15 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) { ...@@ -695,15 +684,15 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
static int32_t g_once_commit_flag = 0; static int32_t g_once_commit_flag = 0;
static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code); taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code);
if (0 == g_once_commit_flag) { if (0 == g_once_commit_flag) {
g_once_commit_flag = 1; g_once_commit_flag = 1;
notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT); notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT);
} }
char tmpString[128]; char tmpString[128];
taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString)); taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString));
} }
void build_consumer(SThreadInfo* pInfo) { void build_consumer(SThreadInfo* pInfo) {
...@@ -768,7 +757,7 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) { ...@@ -768,7 +757,7 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
int retCode = queryDB(pInfo->taos, sqlStr); int retCode = queryDB(pInfo->taos, sqlStr);
if (retCode != 0) { if (retCode != 0) {
taosFprintfFile(g_fp, "consume id %d error in save consume result\n", pInfo->consumerId); taosFprintfFile(g_fp, "consume id %d error in save consume result\n", pInfo->consumerId);
return -1; return -1;
} }
...@@ -797,9 +786,9 @@ void loop_consume(SThreadInfo* pInfo) { ...@@ -797,9 +786,9 @@ void loop_consume(SThreadInfo* pInfo) {
sprintf(filename, "%s/../log/consumerid_%d.txt", configDir, pInfo->consumerId); sprintf(filename, "%s/../log/consumerid_%d.txt", configDir, pInfo->consumerId);
pInfo->pConsumeRowsFile = taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); pInfo->pConsumeRowsFile = taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
sprintf(filename, "%s/../log/meta_consumerid_%d.txt", configDir, pInfo->consumerId); sprintf(filename, "%s/../log/meta_consumerid_%d.txt", configDir, pInfo->consumerId);
pInfo->pConsumeMetaFile = taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); pInfo->pConsumeMetaFile = taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
if (pInfo->pConsumeRowsFile == NULL || pInfo->pConsumeMetaFile == NULL) { if (pInfo->pConsumeRowsFile == NULL || pInfo->pConsumeMetaFile == NULL) {
taosFprintfFile(g_fp, "%s create file fail for save rows or save meta\n", getCurrentTimeString(tmpString)); taosFprintfFile(g_fp, "%s create file fail for save rows or save meta\n", getCurrentTimeString(tmpString));
return; return;
...@@ -815,15 +804,15 @@ void loop_consume(SThreadInfo* pInfo) { ...@@ -815,15 +804,15 @@ void loop_consume(SThreadInfo* pInfo) {
TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, consumeDelay); TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, consumeDelay);
if (tmqMsg) { if (tmqMsg) {
if (0 != g_stConfInfo.showMsgFlag) { if (0 != g_stConfInfo.showMsgFlag) {
tmq_res_t msgType = tmq_get_res_type(tmqMsg); tmq_res_t msgType = tmq_get_res_type(tmqMsg);
if (msgType == TMQ_RES_TABLE_META) { if (msgType == TMQ_RES_TABLE_META) {
totalRows += meta_msg_process(tmqMsg, pInfo, totalMsgs); totalRows += meta_msg_process(tmqMsg, pInfo, totalMsgs);
} else if (msgType == TMQ_RES_DATA){ } else if (msgType == TMQ_RES_DATA) {
totalRows += data_msg_process(tmqMsg, pInfo, totalMsgs); totalRows += data_msg_process(tmqMsg, pInfo, totalMsgs);
} else if (msgType == TMQ_RES_METADATA){ } else if (msgType == TMQ_RES_METADATA) {
meta_msg_process(tmqMsg, pInfo, totalMsgs); meta_msg_process(tmqMsg, pInfo, totalMsgs);
totalRows += data_msg_process(tmqMsg, pInfo, totalMsgs); totalRows += data_msg_process(tmqMsg, pInfo, totalMsgs);
} }
} }
taos_free_result(tmqMsg); taos_free_result(tmqMsg);
...@@ -865,7 +854,7 @@ void loop_consume(SThreadInfo* pInfo) { ...@@ -865,7 +854,7 @@ void loop_consume(SThreadInfo* pInfo) {
taosFprintfFile(g_fp, "==== consumerId: %d, consumeMsgCnt: %" PRId64 ", consumeRowCnt: %" PRId64 "\n", taosFprintfFile(g_fp, "==== consumerId: %d, consumeMsgCnt: %" PRId64 ", consumeRowCnt: %" PRId64 "\n",
pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt); pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt);
taosFsyncFile(pInfo->pConsumeRowsFile); taosFsyncFile(pInfo->pConsumeRowsFile);
taosCloseFile(&pInfo->pConsumeRowsFile); taosCloseFile(&pInfo->pConsumeRowsFile);
} }
...@@ -882,8 +871,8 @@ void* consumeThreadFunc(void* param) { ...@@ -882,8 +871,8 @@ void* consumeThreadFunc(void* param) {
build_consumer(pInfo); build_consumer(pInfo);
build_topic_list(pInfo); build_topic_list(pInfo);
if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) { if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) {
taosFprintfFile(g_fp, "create consumer fail! tmq is null or topicList is null\n"); taosFprintfFile(g_fp, "create consumer fail! tmq is null or topicList is null\n");
taos_close(pInfo->taos); taos_close(pInfo->taos);
pInfo->taos = NULL; pInfo->taos = NULL;
return NULL; return NULL;
} }
...@@ -892,7 +881,7 @@ void* consumeThreadFunc(void* param) { ...@@ -892,7 +881,7 @@ void* consumeThreadFunc(void* param) {
if (err != 0) { if (err != 0) {
pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
taosFprintfFile(g_fp, "tmq_subscribe() fail! reason: %s\n", tmq_err2str(err)); taosFprintfFile(g_fp, "tmq_subscribe() fail! reason: %s\n", tmq_err2str(err));
taos_close(pInfo->taos); taos_close(pInfo->taos);
pInfo->taos = NULL; pInfo->taos = NULL;
return NULL; return NULL;
} }
...@@ -947,7 +936,8 @@ void parseConsumeInfo() { ...@@ -947,7 +936,8 @@ void parseConsumeInfo() {
token = strtok(g_stConfInfo.stThreads[i].topicString, delim); token = strtok(g_stConfInfo.stThreads[i].topicString, delim);
while (token != NULL) { while (token != NULL) {
// printf("%s\n", token ); // printf("%s\n", token );
tstrncpy(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic], token, sizeof(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic])); tstrncpy(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic], token,
sizeof(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]));
ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]); ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]);
// printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]); // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
g_stConfInfo.stThreads[i].numOfTopic++; g_stConfInfo.stThreads[i].numOfTopic++;
...@@ -963,7 +953,8 @@ void parseConsumeInfo() { ...@@ -963,7 +953,8 @@ void parseConsumeInfo() {
ltrim(pstr); ltrim(pstr);
char* ret = strchr(pstr, ch); char* ret = strchr(pstr, ch);
memcpy(g_stConfInfo.stThreads[i].key[g_stConfInfo.stThreads[i].numOfKey], pstr, ret - pstr); memcpy(g_stConfInfo.stThreads[i].key[g_stConfInfo.stThreads[i].numOfKey], pstr, ret - pstr);
tstrncpy(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey], ret + 1, sizeof(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey])); tstrncpy(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey], ret + 1,
sizeof(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey]));
// printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey], // printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
// g_stConfInfo.value[g_stConfInfo.numOfKey]); // g_stConfInfo.value[g_stConfInfo.numOfKey]);
g_stConfInfo.stThreads[i].numOfKey++; g_stConfInfo.stThreads[i].numOfKey++;
...@@ -984,12 +975,12 @@ int32_t getConsumeInfo() { ...@@ -984,12 +975,12 @@ int32_t getConsumeInfo() {
} }
sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.cdbName); sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.cdbName);
TAOS_RES *pRes = taos_query(pConn, sqlStr); TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
taosFprintfFile(g_fp, "error in get consumeinfo for %s\n", taos_errstr(pRes)); taosFprintfFile(g_fp, "error in get consumeinfo for %s\n", taos_errstr(pRes));
taosCloseFile(&g_fp); taosCloseFile(&g_fp);
taos_free_result(pRes); taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);
return -1; return -1;
} }
...@@ -1040,19 +1031,18 @@ int32_t getConsumeInfo() { ...@@ -1040,19 +1031,18 @@ int32_t getConsumeInfo() {
return 0; return 0;
} }
static int32_t omb_data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex, int64_t* lenOfRows) { static int32_t omb_data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex, int64_t* lenOfRows) {
char buf[16*1024]; char buf[16 * 1024];
int32_t totalRows = 0; int32_t totalRows = 0;
int32_t totalLen = 0; int32_t totalLen = 0;
// printf("topic: %s\n", tmq_get_topic_name(msg)); // printf("topic: %s\n", tmq_get_topic_name(msg));
//int32_t vgroupId = tmq_get_vgroup_id(msg); // int32_t vgroupId = tmq_get_vgroup_id(msg);
//const char* dbName = tmq_get_db_name(msg); // const char* dbName = tmq_get_db_name(msg);
//taosFprintfFile(g_fp, "consumerId: %d, msg index:%" PRId64 "\n", pInfo->consumerId, msgIndex); // taosFprintfFile(g_fp, "consumerId: %d, msg index:%" PRId64 "\n", pInfo->consumerId, msgIndex);
//taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId: %d\n", dbName != NULL ? dbName : "invalid table", // taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId: %d\n", dbName != NULL ? dbName : "invalid table",
// tmq_get_topic_name(msg), vgroupId); // tmq_get_topic_name(msg), vgroupId);
while (1) { while (1) {
TAOS_ROW row = taos_fetch_row(msg); TAOS_ROW row = taos_fetch_row(msg);
...@@ -1061,9 +1051,9 @@ static int32_t omb_data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t m ...@@ -1061,9 +1051,9 @@ static int32_t omb_data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t m
TAOS_FIELD* fields = taos_fetch_fields(msg); TAOS_FIELD* fields = taos_fetch_fields(msg);
int32_t numOfFields = taos_field_count(msg); int32_t numOfFields = taos_field_count(msg);
//int32_t* length = taos_fetch_lengths(msg); // int32_t* length = taos_fetch_lengths(msg);
//int32_t precision = taos_result_precision(msg); // int32_t precision = taos_result_precision(msg);
//const char* tbName = tmq_get_table_name(msg); // const char* tbName = tmq_get_table_name(msg);
taos_print_row(buf, row, fields, numOfFields); taos_print_row(buf, row, fields, numOfFields);
totalLen += strlen(buf); totalLen += strlen(buf);
...@@ -1085,8 +1075,7 @@ void omb_loop_consume(SThreadInfo* pInfo) { ...@@ -1085,8 +1075,7 @@ void omb_loop_consume(SThreadInfo* pInfo) {
char tmpString[128]; char tmpString[128];
taosFprintfFile(g_fp, "%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString), taosFprintfFile(g_fp, "%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString),
pInfo->consumerId); pInfo->consumerId);
printf("%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString), printf("%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString), pInfo->consumerId);
pInfo->consumerId);
pInfo->ts = taosGetTimestampMs(); pInfo->ts = taosGetTimestampMs();
...@@ -1094,55 +1083,55 @@ void omb_loop_consume(SThreadInfo* pInfo) { ...@@ -1094,55 +1083,55 @@ void omb_loop_consume(SThreadInfo* pInfo) {
uint64_t lastPrintTime = taosGetTimestampMs(); uint64_t lastPrintTime = taosGetTimestampMs();
uint64_t startTs = taosGetTimestampMs(); uint64_t startTs = taosGetTimestampMs();
int64_t totalLenOfMsg = 0; int64_t totalLenOfMsg = 0;
int64_t lastTotalLenOfMsg = 0; int64_t lastTotalLenOfMsg = 0;
int32_t consumeDelay = g_stConfInfo.consumeDelay == -1 ? -1 : (g_stConfInfo.consumeDelay * 1000); int32_t consumeDelay = g_stConfInfo.consumeDelay == -1 ? -1 : (g_stConfInfo.consumeDelay * 1000);
while (running) { while (running) {
TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, consumeDelay); TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, consumeDelay);
if (tmqMsg) { if (tmqMsg) {
int64_t lenOfMsg = 0; int64_t lenOfMsg = 0;
totalRows += omb_data_msg_process(tmqMsg, pInfo, totalMsgs, &lenOfMsg); totalRows += omb_data_msg_process(tmqMsg, pInfo, totalMsgs, &lenOfMsg);
totalLenOfMsg += lenOfMsg; totalLenOfMsg += lenOfMsg;
taos_free_result(tmqMsg); taos_free_result(tmqMsg);
totalMsgs++; totalMsgs++;
int64_t currentPrintTime = taosGetTimestampMs(); int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 10 * 1000) { if (currentPrintTime - lastPrintTime > 10 * 1000) {
int64_t currentLenOfMsg = totalLenOfMsg - lastTotalLenOfMsg; int64_t currentLenOfMsg = totalLenOfMsg - lastTotalLenOfMsg;
int64_t deltaTime = currentPrintTime - lastPrintTime; int64_t deltaTime = currentPrintTime - lastPrintTime;
printf("consumer id %d has currently cons total rows: %" PRId64 ", msgs: %" PRId64 ", rate: %.3f msgs/s, %.1f MB/s\n", printf("consumer id %d has currently cons total rows: %" PRId64 ", msgs: %" PRId64
pInfo->consumerId, totalRows, totalMsgs, ", rate: %.3f msgs/s, %.1f MB/s\n",
(totalMsgs - lastTotalMsgs) * 1000.0 / deltaTime, pInfo->consumerId, totalRows, totalMsgs, (totalMsgs - lastTotalMsgs) * 1000.0 / deltaTime,
currentLenOfMsg*1000.0/(1024*1024)/deltaTime); currentLenOfMsg * 1000.0 / (1024 * 1024) / deltaTime);
taosFprintfFile( taosFprintfFile(g_fp,
g_fp, "consumer id %d has currently poll total msgs: %" PRId64 ", period cons rate: %.3f msgs/s, %.1f MB/s\n", "consumer id %d has currently poll total msgs: %" PRId64
pInfo->consumerId, totalMsgs, (totalMsgs - lastTotalMsgs) * 1000.0 / deltaTime, currentLenOfMsg*1000.0/deltaTime); ", period cons rate: %.3f msgs/s, %.1f MB/s\n",
pInfo->consumerId, totalMsgs, (totalMsgs - lastTotalMsgs) * 1000.0 / deltaTime,
currentLenOfMsg * 1000.0 / deltaTime);
lastPrintTime = currentPrintTime; lastPrintTime = currentPrintTime;
lastTotalMsgs = totalMsgs; lastTotalMsgs = totalMsgs;
lastTotalLenOfMsg = totalLenOfMsg; lastTotalLenOfMsg = totalLenOfMsg;
} }
} else { } else {
char tmpString[128]; char tmpString[128];
taosFprintfFile(g_fp, "%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString)); taosFprintfFile(g_fp, "%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString));
printf("%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString)); printf("%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString));
int64_t currentPrintTime = taosGetTimestampMs(); int64_t currentPrintTime = taosGetTimestampMs();
int64_t currentLenOfMsg = totalLenOfMsg - lastTotalLenOfMsg; int64_t currentLenOfMsg = totalLenOfMsg - lastTotalLenOfMsg;
int64_t deltaTime = currentPrintTime - lastPrintTime; int64_t deltaTime = currentPrintTime - lastPrintTime;
printf("consumer id %d has currently cons total rows: %" PRId64 ", msgs: %" PRId64 ", rate: %.3f msgs/s, %.1f MB/s\n", printf("consumer id %d has currently cons total rows: %" PRId64 ", msgs: %" PRId64
pInfo->consumerId, totalRows, totalMsgs, ", rate: %.3f msgs/s, %.1f MB/s\n",
(totalMsgs - lastTotalMsgs) * 1000.0 / deltaTime, pInfo->consumerId, totalRows, totalMsgs, (totalMsgs - lastTotalMsgs) * 1000.0 / deltaTime,
currentLenOfMsg*1000.0/(1024*1024)/deltaTime); currentLenOfMsg * 1000.0 / (1024 * 1024) / deltaTime);
break; break;
} }
} }
pInfo->consumeMsgCnt = totalMsgs; pInfo->consumeMsgCnt = totalMsgs;
pInfo->consumeRowCnt = totalRows; pInfo->consumeRowCnt = totalRows;
pInfo->consumeLen = totalLenOfMsg; pInfo->consumeLen = totalLenOfMsg;
} }
void* ombConsumeThreadFunc(void* param) { void* ombConsumeThreadFunc(void* param) {
SThreadInfo* pInfo = (SThreadInfo*)param; SThreadInfo* pInfo = (SThreadInfo*)param;
...@@ -1209,26 +1198,24 @@ void* ombConsumeThreadFunc(void* param) { ...@@ -1209,26 +1198,24 @@ void* ombConsumeThreadFunc(void* param) {
return NULL; return NULL;
} }
static int queryDbExec(TAOS* taos, char* command, QUERY_TYPE type) {
TAOS_RES* res = taos_query(taos, command);
int32_t code = taos_errno(res);
if (code != 0) {
pPrint("%s Failed to execute <%s>, reason: %s %s", GREEN, command, taos_errstr(res), NC);
taos_free_result(res);
return -1;
}
static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type) { if (INSERT_TYPE == type) {
TAOS_RES *res = taos_query(taos, command); int affectedRows = taos_affected_rows(res);
int32_t code = taos_errno(res);
if (code != 0) {
pPrint("%s Failed to execute <%s>, reason: %s %s", GREEN, command, taos_errstr(res), NC);
taos_free_result(res);
return -1;
}
if (INSERT_TYPE == type) {
int affectedRows = taos_affected_rows(res);
taos_free_result(res);
return affectedRows;
}
taos_free_result(res); taos_free_result(res);
return 0; return affectedRows;
}
taos_free_result(res);
return 0;
} }
void* ombProduceThreadFunc(void* param) { void* ombProduceThreadFunc(void* param) {
...@@ -1236,101 +1223,100 @@ void* ombProduceThreadFunc(void* param) { ...@@ -1236,101 +1223,100 @@ void* ombProduceThreadFunc(void* param) {
pInfo->taos = createNewTaosConnect(); pInfo->taos = createNewTaosConnect();
if (pInfo->taos == NULL) { if (pInfo->taos == NULL) {
taosFprintfFile(g_fp, "taos_connect() fail, can not start producers!\n"); taosFprintfFile(g_fp, "taos_connect() fail, can not start producers!\n");
return NULL; return NULL;
} }
int64_t affectedRowsTotal = 0; int64_t affectedRowsTotal = 0;
int64_t sendMsgs = 0; int64_t sendMsgs = 0;
uint32_t totalSendLoopTimes = g_stConfInfo.runDurationMinutes * 60 * 1000 / SEND_TIME_UNIT; // send some msgs per 10ms uint32_t totalSendLoopTimes =
uint32_t batchPerTblTimes = pInfo->producerRate / 100 / g_stConfInfo.batchSize; g_stConfInfo.runDurationMinutes * 60 * 1000 / SEND_TIME_UNIT; // send some msgs per 10ms
uint32_t remainder = (pInfo->producerRate / 100) % g_stConfInfo.batchSize; uint32_t batchPerTblTimes = pInfo->producerRate / 100 / g_stConfInfo.batchSize;
uint32_t remainder = (pInfo->producerRate / 100) % g_stConfInfo.batchSize;
if (remainder) { if (remainder) {
batchPerTblTimes += 1; batchPerTblTimes += 1;
} }
char* sqlBuf = taosMemoryMalloc(MAX_SQL_LEN); char* sqlBuf = taosMemoryMalloc(MAX_SQL_LEN);
if (NULL == sqlBuf) { if (NULL == sqlBuf) {
printf("malloc fail for sqlBuf\n"); printf("malloc fail for sqlBuf\n");
taos_close(pInfo->taos); taos_close(pInfo->taos);
pInfo->taos = NULL; pInfo->taos = NULL;
return NULL; return NULL;
} }
printf("Produce Info: totalSendLoopTimes: %d, batchPerTblTimes: %d, producerRate: %d\n", totalSendLoopTimes, batchPerTblTimes, pInfo->producerRate); printf("Produce Info: totalSendLoopTimes: %d, batchPerTblTimes: %d, producerRate: %d\n", totalSendLoopTimes,
batchPerTblTimes, pInfo->producerRate);
char ctbName[128] = {0}; char ctbName[128] = {0};
sprintf(ctbName, "%s.ctb%d", g_stConfInfo.dbName, pInfo->consumerId); sprintf(ctbName, "%s.ctb%d", g_stConfInfo.dbName, pInfo->consumerId);
int64_t lastPrintTime = taosGetTimestampUs(); int64_t lastPrintTime = taosGetTimestampUs();
int64_t totalMsgLen = 0; int64_t totalMsgLen = 0;
//int64_t timeStamp = taosGetTimestampUs(); // int64_t timeStamp = taosGetTimestampUs();
while (totalSendLoopTimes) { while (totalSendLoopTimes) {
int64_t startTs = taosGetTimestampUs(); int64_t startTs = taosGetTimestampUs();
for (int i = 0; i < batchPerTblTimes; ++i) { for (int i = 0; i < batchPerTblTimes; ++i) {
uint32_t msgsOfSql = g_stConfInfo.batchSize; uint32_t msgsOfSql = g_stConfInfo.batchSize;
if ((i == batchPerTblTimes - 1) && (0 != remainder)) { if ((i == batchPerTblTimes - 1) && (0 != remainder)) {
msgsOfSql = remainder; msgsOfSql = remainder;
} }
int len = 0; int len = 0;
len += snprintf(sqlBuf+len, MAX_SQL_LEN - len, "insert into %s values ", ctbName); len += snprintf(sqlBuf + len, MAX_SQL_LEN - len, "insert into %s values ", ctbName);
for (int j = 0; j < msgsOfSql; j++) { for (int j = 0; j < msgsOfSql; j++) {
int64_t timeStamp = taosGetTimestampNs(); int64_t timeStamp = taosGetTimestampNs();
len += snprintf(sqlBuf+len, MAX_SQL_LEN - len, "(%" PRId64 ", \"%s\")", timeStamp, g_payload); len += snprintf(sqlBuf + len, MAX_SQL_LEN - len, "(%" PRId64 ", \"%s\")", timeStamp, g_payload);
sendMsgs++; sendMsgs++;
pInfo->totalProduceMsgs++; pInfo->totalProduceMsgs++;
} }
totalMsgLen += len; totalMsgLen += len;
pInfo->totalMsgsLen += len; pInfo->totalMsgsLen += len;
int64_t affectedRows = queryDbExec(pInfo->taos, sqlBuf, INSERT_TYPE); int64_t affectedRows = queryDbExec(pInfo->taos, sqlBuf, INSERT_TYPE);
if (affectedRows < 0) { if (affectedRows < 0) {
taos_close(pInfo->taos); taos_close(pInfo->taos);
pInfo->taos = NULL; pInfo->taos = NULL;
taosMemoryFree(sqlBuf); taosMemoryFree(sqlBuf);
return NULL; return NULL;
} }
affectedRowsTotal += affectedRows; affectedRowsTotal += affectedRows;
//printf("Produce Info: affectedRows: %" PRId64 "\n", affectedRows); // printf("Produce Info: affectedRows: %" PRId64 "\n", affectedRows);
} }
totalSendLoopTimes -= 1; totalSendLoopTimes -= 1;
// calc spent time // calc spent time
int64_t currentTs = taosGetTimestampUs(); int64_t currentTs = taosGetTimestampUs();
int64_t delta = currentTs - startTs; int64_t delta = currentTs - startTs;
if (delta < SEND_TIME_UNIT * 1000) { if (delta < SEND_TIME_UNIT * 1000) {
int64_t sleepLen = (int32_t)(SEND_TIME_UNIT * 1000 - delta); int64_t sleepLen = (int32_t)(SEND_TIME_UNIT * 1000 - delta);
//printf("sleep %" PRId64 " us, use time: %" PRId64 " us\n", sleepLen, delta); // printf("sleep %" PRId64 " us, use time: %" PRId64 " us\n", sleepLen, delta);
taosUsleep((int32_t)sleepLen); taosUsleep((int32_t)sleepLen);
} }
currentTs = taosGetTimestampUs(); currentTs = taosGetTimestampUs();
delta = currentTs - lastPrintTime; delta = currentTs - lastPrintTime;
if (delta > 10 * 1000 * 1000) { if (delta > 10 * 1000 * 1000) {
printf("producer[%d] info: %" PRId64 " msgs, %" PRId64 " Byte, %" PRId64 " us, totalSendLoopTimes: %d\n", printf("producer[%d] info: %" PRId64 " msgs, %" PRId64 " Byte, %" PRId64 " us, totalSendLoopTimes: %d\n",
pInfo->consumerId, sendMsgs, totalMsgLen, delta, totalSendLoopTimes); pInfo->consumerId, sendMsgs, totalMsgLen, delta, totalSendLoopTimes);
printf("producer[%d] rate: %1.f msgs/s, %1.f KB/s\n", printf("producer[%d] rate: %1.f msgs/s, %1.f KB/s\n", pInfo->consumerId, sendMsgs * 1000.0 * 1000 / delta,
pInfo->consumerId, (totalMsgLen / 1024.0) / (delta / (1000 * 1000)));
sendMsgs * 1000.0 * 1000 / delta, lastPrintTime = currentTs;
(totalMsgLen / 1024.0) / (delta / (1000*1000))); sendMsgs = 0;
lastPrintTime = currentTs; totalMsgLen = 0;
sendMsgs = 0; }
totalMsgLen = 0; }
}
} printf("affectedRowsTotal: %" PRId64 "\n", affectedRowsTotal);
printf("affectedRowsTotal: %"PRId64"\n", affectedRowsTotal);
taos_close(pInfo->taos); taos_close(pInfo->taos);
pInfo->taos = NULL; pInfo->taos = NULL;
taosMemoryFree(sqlBuf); taosMemoryFree(sqlBuf);
return NULL; return NULL;
} }
void printProduceInfo(int64_t start) { void printProduceInfo(int64_t start) {
int64_t totalMsgs = 0; int64_t totalMsgs = 0;
int64_t totalLenOfMsgs = 0; int64_t totalLenOfMsgs = 0;
...@@ -1347,87 +1333,86 @@ void printProduceInfo(int64_t start) { ...@@ -1347,87 +1333,86 @@ void printProduceInfo(int64_t start) {
double tInMs = (double)t / 1000000.0; double tInMs = (double)t / 1000000.0;
printf("Spent %.3f seconds to prod %" PRIu64 " msgs, %" PRIu64 " Byte\n\n", tInMs, totalMsgs, totalLenOfMsgs); printf("Spent %.3f seconds to prod %" PRIu64 " msgs, %" PRIu64 " Byte\n\n", tInMs, totalMsgs, totalLenOfMsgs);
printf("Spent %.3f seconds to prod %" PRIu64 " msgs with %d producer(s), throughput: %.3f msgs/s, %.1f MB/s\n\n", printf("Spent %.3f seconds to prod %" PRIu64 " msgs with %d producer(s), throughput: %.3f msgs/s, %.1f MB/s\n\n",
tInMs, totalMsgs, g_stConfInfo.producers, tInMs, totalMsgs, g_stConfInfo.producers, (double)totalMsgs / tInMs,
(double)totalMsgs / tInMs, (double)totalLenOfMsgs / (1024.0 * 1024) / tInMs);
(double)totalLenOfMsgs/(1024.0*1024)/tInMs);
return; return;
} }
void startOmbConsume() { void startOmbConsume() {
TdThreadAttr thattr; TdThreadAttr thattr;
taosThreadAttrInit(&thattr); taosThreadAttrInit(&thattr);
taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE); taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);
if (0 != g_stConfInfo.producers) { if (0 != g_stConfInfo.producers) {
TAOS* taos = createNewTaosConnect(); TAOS* taos = createNewTaosConnect();
if (taos == NULL) { if (taos == NULL) {
taosFprintfFile(g_fp, "taos_connect() fail, can not create db, stbl, ctbl, topic!\n"); taosFprintfFile(g_fp, "taos_connect() fail, can not create db, stbl, ctbl, topic!\n");
return ; return;
} }
char stbName[16] = "stb"; char stbName[16] = "stb";
char ctbPrefix[16] = "ctb"; char ctbPrefix[16] = "ctb";
char sql[256] = {0}; char sql[256] = {0};
sprintf(sql, "drop database if exists %s", g_stConfInfo.dbName); sprintf(sql, "drop database if exists %s", g_stConfInfo.dbName);
printf("SQL: %s\n", sql); printf("SQL: %s\n", sql);
queryDbExec(taos, sql, NO_INSERT_TYPE); queryDbExec(taos, sql, NO_INSERT_TYPE);
sprintf(sql, "create database if not exists %s precision 'ns' vgroups %d", g_stConfInfo.dbName, g_stConfInfo.producers); sprintf(sql, "create database if not exists %s precision 'ns' vgroups %d", g_stConfInfo.dbName,
printf("SQL: %s\n", sql); g_stConfInfo.producers);
printf("SQL: %s\n", sql);
queryDbExec(taos, sql, NO_INSERT_TYPE); queryDbExec(taos, sql, NO_INSERT_TYPE);
sprintf(sql, "create stable %s.%s (ts timestamp, payload binary(%d)) tags (t bigint) ", g_stConfInfo.dbName, stbName, g_stConfInfo.payloadLen); sprintf(sql, "create stable %s.%s (ts timestamp, payload binary(%d)) tags (t bigint) ", g_stConfInfo.dbName,
printf("SQL: %s\n", sql); stbName, g_stConfInfo.payloadLen);
printf("SQL: %s\n", sql);
queryDbExec(taos, sql, NO_INSERT_TYPE); queryDbExec(taos, sql, NO_INSERT_TYPE);
for (int i = 0; i < g_stConfInfo.producers; i++) { for (int i = 0; i < g_stConfInfo.producers; i++) {
sprintf(sql, "create table %s.%s%d using %s.stb tags(%d) ", g_stConfInfo.dbName, ctbPrefix, i, g_stConfInfo.dbName, i); sprintf(sql, "create table %s.%s%d using %s.stb tags(%d) ", g_stConfInfo.dbName, ctbPrefix, i,
printf("SQL: %s\n", sql); g_stConfInfo.dbName, i);
printf("SQL: %s\n", sql);
queryDbExec(taos, sql, NO_INSERT_TYPE); queryDbExec(taos, sql, NO_INSERT_TYPE);
} }
// create topic // create topic
sprintf(sql, "create topic %s as stable %s.%s", g_stConfInfo.topic, g_stConfInfo.dbName, stbName); sprintf(sql, "create topic %s as stable %s.%s", g_stConfInfo.topic, g_stConfInfo.dbName, stbName);
printf("SQL: %s\n", sql); printf("SQL: %s\n", sql);
queryDbExec(taos, sql, NO_INSERT_TYPE); queryDbExec(taos, sql, NO_INSERT_TYPE);
int32_t producerRate = ceil(g_stConfInfo.producerRate / g_stConfInfo.producers);
int32_t producerRate = ceil(g_stConfInfo.producerRate / g_stConfInfo.producers);
printf("==== create %d produce thread ====\n", g_stConfInfo.producers);
printf("==== create %d produce thread ====\n", g_stConfInfo.producers); for (int32_t i = 0; i < g_stConfInfo.producers; ++i) {
for (int32_t i = 0; i < g_stConfInfo.producers; ++i) { g_stConfInfo.stProdThreads[i].consumerId = i;
g_stConfInfo.stProdThreads[i].consumerId = i; g_stConfInfo.stProdThreads[i].producerRate = producerRate;
g_stConfInfo.stProdThreads[i].producerRate = producerRate; taosThreadCreate(&(g_stConfInfo.stProdThreads[i].thread), &thattr, ombProduceThreadFunc,
taosThreadCreate(&(g_stConfInfo.stProdThreads[i].thread), &thattr, ombProduceThreadFunc, (void*)(&(g_stConfInfo.stProdThreads[i])));
(void*)(&(g_stConfInfo.stProdThreads[i]))); }
}
if (0 == g_stConfInfo.numOfThread) {
if (0 == g_stConfInfo.numOfThread) { int64_t start = taosGetTimestampUs();
int64_t start = taosGetTimestampUs();
for (int32_t i = 0; i < g_stConfInfo.producers; i++) { for (int32_t i = 0; i < g_stConfInfo.producers; i++) {
taosThreadJoin(g_stConfInfo.stProdThreads[i].thread, NULL); taosThreadJoin(g_stConfInfo.stProdThreads[i].thread, NULL);
taosThreadClear(&g_stConfInfo.stProdThreads[i].thread); taosThreadClear(&g_stConfInfo.stProdThreads[i].thread);
} }
printProduceInfo(start); printProduceInfo(start);
taosFprintfFile(g_fp, "==== close tmqlog ====\n"); taosFprintfFile(g_fp, "==== close tmqlog ====\n");
taosCloseFile(&g_fp); taosCloseFile(&g_fp);
taos_close(taos); taos_close(taos);
return; return;
} }
taos_close(taos); taos_close(taos);
} }
// pthread_create one thread to consume // pthread_create one thread to consume
taosFprintfFile(g_fp, "==== create %d consume thread ====\n", g_stConfInfo.numOfThread); taosFprintfFile(g_fp, "==== create %d consume thread ====\n", g_stConfInfo.numOfThread);
for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) { for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) {
g_stConfInfo.stThreads[i].consumerId = i; g_stConfInfo.stThreads[i].consumerId = i;
taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, ombConsumeThreadFunc, taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, ombConsumeThreadFunc,
(void*)(&(g_stConfInfo.stThreads[i]))); (void*)(&(g_stConfInfo.stThreads[i])));
} }
...@@ -1446,24 +1431,23 @@ void startOmbConsume() { ...@@ -1446,24 +1431,23 @@ void startOmbConsume() {
int64_t totalLenOfMsgs = 0; int64_t totalLenOfMsgs = 0;
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
totalMsgs += g_stConfInfo.stThreads[i].consumeMsgCnt; totalMsgs += g_stConfInfo.stThreads[i].consumeMsgCnt;
totalLenOfMsgs += g_stConfInfo.stThreads[i].consumeLen; totalLenOfMsgs += g_stConfInfo.stThreads[i].consumeLen;
totalRows += g_stConfInfo.stThreads[i].consumeRowCnt; totalRows += g_stConfInfo.stThreads[i].consumeRowCnt;
} }
int64_t t = end - start; int64_t t = end - start;
if (0 == t) t = 1; if (0 == t) t = 1;
double tInMs = (double)t / 1000000.0; double tInMs = (double)t / 1000000.0;
taosFprintfFile(g_fp, taosFprintfFile(
"Spent %.3f seconds to poll msgs: %" PRIu64 " with %d thread(s), throughput: %.3f msgs/s, %.1f MB/s\n\n", g_fp, "Spent %.3f seconds to poll msgs: %" PRIu64 " with %d thread(s), throughput: %.3f msgs/s, %.1f MB/s\n\n",
tInMs, totalMsgs, g_stConfInfo.numOfThread, tInMs, totalMsgs, g_stConfInfo.numOfThread, (double)(totalMsgs / tInMs),
(double)(totalMsgs / tInMs), (double)totalLenOfMsgs / (1024 * 1024) / tInMs);
(double)totalLenOfMsgs/(1024*1024)/tInMs);
printf("Spent %.3f seconds to cons rows: %" PRIu64 " msgs: %" PRIu64 " with %d thread(s), throughput: %.3f msgs/s, %.1f MB/s\n\n", printf("Spent %.3f seconds to cons rows: %" PRIu64 " msgs: %" PRIu64
tInMs, totalRows, totalMsgs, g_stConfInfo.numOfThread, " with %d thread(s), throughput: %.3f msgs/s, %.1f MB/s\n\n",
(double)(totalMsgs / tInMs), tInMs, totalRows, totalMsgs, g_stConfInfo.numOfThread, (double)(totalMsgs / tInMs),
(double)totalLenOfMsgs/(1024*1024)/tInMs); (double)totalLenOfMsgs / (1024 * 1024) / tInMs);
taosFprintfFile(g_fp, "==== close tmqlog ====\n"); taosFprintfFile(g_fp, "==== close tmqlog ====\n");
taosCloseFile(&g_fp); taosCloseFile(&g_fp);
...@@ -1471,20 +1455,19 @@ void startOmbConsume() { ...@@ -1471,20 +1455,19 @@ void startOmbConsume() {
return; return;
} }
int main(int32_t argc, char* argv[]) { int main(int32_t argc, char* argv[]) {
parseArgument(argc, argv); parseArgument(argc, argv);
if (0 != strlen(g_stConfInfo.topic)) { if (0 != strlen(g_stConfInfo.topic)) {
startOmbConsume(); startOmbConsume();
return 0; return 0;
} }
int32_t retCode = getConsumeInfo(); int32_t retCode = getConsumeInfo();
if (0 != retCode) { if (0 != retCode) {
return -1; return -1;
} }
saveConfigToLogFile(); saveConfigToLogFile();
tmqSetSignalHandle(); tmqSetSignalHandle();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册