diff --git a/utils/test/c/tmqSim.c b/utils/test/c/tmqSim.c index fe2bb59af3b8b60211dc62c2c96cc4dee8e4acd5..9c1dc2e063ee96e7e593154fcc602c6b3be9073f 100644 --- a/utils/test/c/tmqSim.c +++ b/utils/test/c/tmqSim.c @@ -14,13 +14,13 @@ */ #include +#include #include #include #include #include #include #include -#include #include "taos.h" #include "taosdef.h" @@ -36,7 +36,7 @@ #define MAX_ROW_STR_LEN (16 * 1024) #define MAX_CONSUMER_THREAD_CNT (16) #define MAX_VGROUP_CNT (32) -#define SEND_TIME_UNIT 10 // ms +#define SEND_TIME_UNIT 10 // ms #define MAX_SQL_LEN 1048576 typedef enum { @@ -45,11 +45,7 @@ typedef enum { NOTIFY_CMD_ID_BUTT, } NOTIFY_CMD_ID; -typedef enum enumQUERY_TYPE { - NO_INSERT_TYPE, - INSERT_TYPE, - QUERY_TYPE_BUT -} QUERY_TYPE; +typedef enum enumQUERY_TYPE { NO_INSERT_TYPE, INSERT_TYPE, QUERY_TYPE_BUT } QUERY_TYPE; typedef struct { TdThread thread; @@ -61,7 +57,7 @@ typedef struct { // char autoOffsetRest[16]; // none, earliest, latest TdFilePtr pConsumeRowsFile; - TdFilePtr pConsumeMetaFile; + TdFilePtr pConsumeMetaFile; int32_t ifCheckData; int64_t expectMsgCnt; @@ -87,12 +83,12 @@ typedef struct { int32_t rowsOfPerVgroups[MAX_VGROUP_CNT][2]; // [i][0]: vgroup id, [i][1]: rows of consume int64_t ts; - TAOS* taos; + TAOS* taos; // below parameters is used by omb test - int32_t producerRate; // unit: msgs/s - int64_t totalProduceMsgs; - int64_t totalMsgsLen; + int32_t producerRate; // unit: msgs/s + int64_t totalProduceMsgs; + int64_t totalMsgsLen; } SThreadInfo; @@ -112,12 +108,12 @@ typedef struct { SThreadInfo stProdThreads[MAX_CONSUMER_THREAD_CNT]; // below parameters is used by omb test - char topic[64]; - int32_t producers; - int32_t producerRate; - int32_t runDurationMinutes; - int32_t batchSize; - int32_t payloadLen; + char topic[64]; + int32_t producers; + int32_t producerRate; + int32_t runDurationMinutes; + int32_t batchSize; + int32_t payloadLen; } SConfInfo; static SConfInfo g_stConfInfo; @@ -146,14 +142,13 @@ static void printHelp() { printf("%s%s%s%ds\n", indent, indent, "consume delay, default is ", g_stConfInfo.consumeDelay); printf("%s%s\n", indent, "-e"); printf("%s%s%s%d\n", indent, indent, "snapshot, default is ", g_stConfInfo.useSnapshot); - + printf("%s%s\n", indent, "-t"); printf("%s%s%s\n", indent, indent, "topic name, default is null"); printf("%s%s\n", indent, "-x"); printf("%s%s%s\n", indent, indent, "consume thread number, default is 1"); - printf("%s%s\n", indent, "-l"); printf("%s%s%s%d\n", indent, indent, "run duration unit is minutes, default is ", g_stConfInfo.runDurationMinutes); printf("%s%s\n", indent, "-p"); @@ -165,7 +160,6 @@ static void printHelp() { printf("%s%s\n", indent, "-n"); printf("%s%s%s\n", indent, indent, "payload len unit is byte, default is 1000"); - exit(EXIT_SUCCESS); } @@ -194,7 +188,7 @@ void initLogFile() { pid_t process_id = getpid(); if (0 != strlen(g_stConfInfo.topic)) { - sprintf(filename, "/tmp/tmqlog-%d-%s.txt", process_id, getCurrentTimeString(tmpString)); + sprintf(filename, "/tmp/tmqlog-%d-%s.txt", process_id, getCurrentTimeString(tmpString)); } else { sprintf(filename, "%s/../log/tmqlog-%d-%s.txt", configDir, process_id, getCurrentTimeString(tmpString)); } @@ -294,7 +288,7 @@ void parseArgument(int32_t argc, char* argv[]) { g_stConfInfo.producerRate = atol(argv[++i]); } else if (strcmp(argv[i], "-n") == 0) { g_stConfInfo.payloadLen = atol(argv[++i]); - if(g_stConfInfo.payloadLen <= 0 || g_stConfInfo.payloadLen > 1024 * 1024 * 1024){ + if (g_stConfInfo.payloadLen <= 0 || g_stConfInfo.payloadLen > 1024 * 1024 * 1024) { pError("%s calloc size is too large: %s %s", GREEN, argv[++i], NC); exit(-1); } @@ -357,9 +351,9 @@ void ltrim(char* str) { } int queryDB(TAOS* taos, char* command) { - int retryCnt = 10; - int code = 0; - TAOS_RES* pRes = NULL; + int retryCnt = 10; + int code = 0; + TAOS_RES* pRes = NULL; while (retryCnt--) { pRes = taos_query(taos, command); @@ -379,7 +373,6 @@ int queryDB(TAOS* taos, char* command) { return -1; } - void addRowsToVgroupId(SThreadInfo* pInfo, int32_t vgroupId, int32_t rows) { int32_t i; for (i = 0; i < pInfo->numOfVgroups; i++) { @@ -403,22 +396,21 @@ void addRowsToVgroupId(SThreadInfo* pInfo, int32_t vgroupId, int32_t rows) { } TAOS* createNewTaosConnect() { - TAOS* taos = NULL; - int32_t retryCnt = 10; + TAOS* taos = NULL; + int32_t retryCnt = 10; while (retryCnt--) { TAOS* taos = taos_connect(NULL, "root", "taosdata", NULL, 0); - if (NULL != taos) { - return taos; - } - taosSsleep(1); + if (NULL != taos) { + return taos; + } + taosSsleep(1); } taosFprintfFile(g_fp, "taos_connect() fail\n"); return NULL; } - int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) { char sqlStr[1100] = {0}; @@ -440,7 +432,7 @@ int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) { if (retCode != 0) { taosFprintfFile(g_fp, "error in save consume content\n"); taosCloseFile(&g_fp); - taos_close(pConn); + taos_close(pConn); exit(-1); } @@ -481,7 +473,7 @@ static char* shellFormatTimestamp(char* buf, int64_t val, int32_t precision) { struct tm ptm; taosLocalTime(&tt, &ptm); - size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", &ptm); + size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", &ptm); if (precision == TSDB_TIME_PRECISION_NANO) { sprintf(buf + pos, ".%09d", ms); @@ -548,22 +540,20 @@ static void shellDumpFieldToFile(TdFilePtr pFile, const char* val, TAOS_FIELD* f break; case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: - case TSDB_DATA_TYPE_JSON: - { - int32_t bufIndex = 0; - for (int32_t i = 0; i < length; i++) { + case TSDB_DATA_TYPE_JSON: { + int32_t bufIndex = 0; + for (int32_t i = 0; i < length; i++) { + buf[bufIndex] = val[i]; + bufIndex++; + if (val[i] == '\"') { buf[bufIndex] = val[i]; bufIndex++; - if (val[i] == '\"') { - buf[bufIndex] = val[i]; - bufIndex++; - } } - buf[bufIndex] = 0; - - taosFprintfFile(pFile, "%s%s%s", quotationStr, buf, quotationStr); } - break; + buf[bufIndex] = 0; + + taosFprintfFile(pFile, "%s%s%s", quotationStr, buf, quotationStr); + } break; case TSDB_DATA_TYPE_TIMESTAMP: shellFormatTimestamp(buf, *(int64_t*)val, precision); taosFprintfFile(pFile, "%s%s%s", quotationStr, buf, quotationStr); @@ -635,7 +625,6 @@ static int32_t data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn return totalRows; } - static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex) { char buf[1024]; int32_t totalRows = 0; @@ -650,24 +639,24 @@ static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn { tmq_raw_data raw = {0}; - int32_t code = tmq_get_raw(msg, &raw); - - if(code == TSDB_CODE_SUCCESS){ -// int retCode = queryDB(pInfo->taos, "use metadb"); -// if (retCode != 0) { -// taosFprintfFile(g_fp, "error when use metadb\n"); -// taosCloseFile(&g_fp); -// exit(-1); -// } -// taosFprintfFile(g_fp, "raw:%p\n", &raw); -// -// tmq_write_raw(pInfo->taos, raw); + int32_t code = tmq_get_raw(msg, &raw); + + if (code == TSDB_CODE_SUCCESS) { + // int retCode = queryDB(pInfo->taos, "use metadb"); + // if (retCode != 0) { + // taosFprintfFile(g_fp, "error when use metadb\n"); + // taosCloseFile(&g_fp); + // exit(-1); + // } + // taosFprintfFile(g_fp, "raw:%p\n", &raw); + // + // tmq_write_raw(pInfo->taos, raw); } - + char* result = tmq_get_json_meta(msg); - if(result && strcmp(result, "") != 0){ - //printf("meta result: %s\n", result); - taosFprintfFile(pInfo->pConsumeMetaFile, "%s\n", result); + if (result && strcmp(result, "") != 0) { + // printf("meta result: %s\n", result); + taosFprintfFile(pInfo->pConsumeMetaFile, "%s\n", result); } tmq_free_json_meta(result); } @@ -683,8 +672,8 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) { char sqlStr[1024] = {0}; // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int - sprintf(sqlStr, "insert into %s.notifyinfo values (%" PRId64 ", %d, %d)", g_stConfInfo.cdbName, atomic_fetch_add_64(&g_stConfInfo.nowTime, 1), cmdId, - pInfo->consumerId); + sprintf(sqlStr, "insert into %s.notifyinfo values (%" PRId64 ", %d, %d)", g_stConfInfo.cdbName, + atomic_fetch_add_64(&g_stConfInfo.nowTime, 1), cmdId, pInfo->consumerId); taos_query_a(pInfo->taos, sqlStr, appNothing, NULL); @@ -695,15 +684,15 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) { static int32_t g_once_commit_flag = 0; static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { - taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code); + taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code); - if (0 == g_once_commit_flag) { - g_once_commit_flag = 1; - notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT); + if (0 == g_once_commit_flag) { + g_once_commit_flag = 1; + notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT); } - char tmpString[128]; - taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString)); + char tmpString[128]; + taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString)); } void build_consumer(SThreadInfo* pInfo) { @@ -768,7 +757,7 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) { int retCode = queryDB(pInfo->taos, sqlStr); if (retCode != 0) { - taosFprintfFile(g_fp, "consume id %d error in save consume result\n", pInfo->consumerId); + taosFprintfFile(g_fp, "consume id %d error in save consume result\n", pInfo->consumerId); return -1; } @@ -797,9 +786,9 @@ void loop_consume(SThreadInfo* pInfo) { sprintf(filename, "%s/../log/consumerid_%d.txt", configDir, pInfo->consumerId); pInfo->pConsumeRowsFile = taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); - sprintf(filename, "%s/../log/meta_consumerid_%d.txt", configDir, pInfo->consumerId); - pInfo->pConsumeMetaFile = taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); - + sprintf(filename, "%s/../log/meta_consumerid_%d.txt", configDir, pInfo->consumerId); + pInfo->pConsumeMetaFile = taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); + if (pInfo->pConsumeRowsFile == NULL || pInfo->pConsumeMetaFile == NULL) { taosFprintfFile(g_fp, "%s create file fail for save rows or save meta\n", getCurrentTimeString(tmpString)); return; @@ -815,15 +804,15 @@ void loop_consume(SThreadInfo* pInfo) { TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, consumeDelay); if (tmqMsg) { if (0 != g_stConfInfo.showMsgFlag) { - tmq_res_t msgType = tmq_get_res_type(tmqMsg); - if (msgType == TMQ_RES_TABLE_META) { - totalRows += meta_msg_process(tmqMsg, pInfo, totalMsgs); - } else if (msgType == TMQ_RES_DATA){ - totalRows += data_msg_process(tmqMsg, pInfo, totalMsgs); - } else if (msgType == TMQ_RES_METADATA){ - meta_msg_process(tmqMsg, pInfo, totalMsgs); - totalRows += data_msg_process(tmqMsg, pInfo, totalMsgs); - } + tmq_res_t msgType = tmq_get_res_type(tmqMsg); + if (msgType == TMQ_RES_TABLE_META) { + totalRows += meta_msg_process(tmqMsg, pInfo, totalMsgs); + } else if (msgType == TMQ_RES_DATA) { + totalRows += data_msg_process(tmqMsg, pInfo, totalMsgs); + } else if (msgType == TMQ_RES_METADATA) { + meta_msg_process(tmqMsg, pInfo, totalMsgs); + totalRows += data_msg_process(tmqMsg, pInfo, totalMsgs); + } } taos_free_result(tmqMsg); @@ -865,7 +854,7 @@ void loop_consume(SThreadInfo* pInfo) { taosFprintfFile(g_fp, "==== consumerId: %d, consumeMsgCnt: %" PRId64 ", consumeRowCnt: %" PRId64 "\n", pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt); - + taosFsyncFile(pInfo->pConsumeRowsFile); taosCloseFile(&pInfo->pConsumeRowsFile); } @@ -882,8 +871,8 @@ void* consumeThreadFunc(void* param) { build_consumer(pInfo); build_topic_list(pInfo); if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) { - taosFprintfFile(g_fp, "create consumer fail! tmq is null or topicList is null\n"); - taos_close(pInfo->taos); + taosFprintfFile(g_fp, "create consumer fail! tmq is null or topicList is null\n"); + taos_close(pInfo->taos); pInfo->taos = NULL; return NULL; } @@ -892,7 +881,7 @@ void* consumeThreadFunc(void* param) { if (err != 0) { pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); taosFprintfFile(g_fp, "tmq_subscribe() fail! reason: %s\n", tmq_err2str(err)); - taos_close(pInfo->taos); + taos_close(pInfo->taos); pInfo->taos = NULL; return NULL; } @@ -947,7 +936,8 @@ void parseConsumeInfo() { token = strtok(g_stConfInfo.stThreads[i].topicString, delim); while (token != NULL) { // printf("%s\n", token ); - tstrncpy(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic], token, sizeof(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic])); + tstrncpy(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic], token, + sizeof(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic])); ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]); // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]); g_stConfInfo.stThreads[i].numOfTopic++; @@ -963,7 +953,8 @@ void parseConsumeInfo() { ltrim(pstr); char* ret = strchr(pstr, ch); memcpy(g_stConfInfo.stThreads[i].key[g_stConfInfo.stThreads[i].numOfKey], pstr, ret - pstr); - tstrncpy(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey], ret + 1, sizeof(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey])); + tstrncpy(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey], ret + 1, + sizeof(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey])); // printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey], // g_stConfInfo.value[g_stConfInfo.numOfKey]); g_stConfInfo.stThreads[i].numOfKey++; @@ -984,12 +975,12 @@ int32_t getConsumeInfo() { } sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.cdbName); - TAOS_RES *pRes = taos_query(pConn, sqlStr); + TAOS_RES* pRes = taos_query(pConn, sqlStr); if (taos_errno(pRes) != 0) { taosFprintfFile(g_fp, "error in get consumeinfo for %s\n", taos_errstr(pRes)); taosCloseFile(&g_fp); taos_free_result(pRes); - taos_close(pConn); + taos_close(pConn); return -1; } @@ -1040,19 +1031,18 @@ int32_t getConsumeInfo() { return 0; } - static int32_t omb_data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex, int64_t* lenOfRows) { - char buf[16*1024]; + char buf[16 * 1024]; int32_t totalRows = 0; int32_t totalLen = 0; // printf("topic: %s\n", tmq_get_topic_name(msg)); - //int32_t vgroupId = tmq_get_vgroup_id(msg); - //const char* dbName = tmq_get_db_name(msg); + // int32_t vgroupId = tmq_get_vgroup_id(msg); + // const char* dbName = tmq_get_db_name(msg); - //taosFprintfFile(g_fp, "consumerId: %d, msg index:%" PRId64 "\n", pInfo->consumerId, msgIndex); - //taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId: %d\n", dbName != NULL ? dbName : "invalid table", - // tmq_get_topic_name(msg), vgroupId); + // taosFprintfFile(g_fp, "consumerId: %d, msg index:%" PRId64 "\n", pInfo->consumerId, msgIndex); + // taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId: %d\n", dbName != NULL ? dbName : "invalid table", + // tmq_get_topic_name(msg), vgroupId); while (1) { TAOS_ROW row = taos_fetch_row(msg); @@ -1061,9 +1051,9 @@ static int32_t omb_data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t m TAOS_FIELD* fields = taos_fetch_fields(msg); int32_t numOfFields = taos_field_count(msg); - //int32_t* length = taos_fetch_lengths(msg); - //int32_t precision = taos_result_precision(msg); - //const char* tbName = tmq_get_table_name(msg); + // int32_t* length = taos_fetch_lengths(msg); + // int32_t precision = taos_result_precision(msg); + // const char* tbName = tmq_get_table_name(msg); taos_print_row(buf, row, fields, numOfFields); totalLen += strlen(buf); @@ -1085,8 +1075,7 @@ void omb_loop_consume(SThreadInfo* pInfo) { char tmpString[128]; taosFprintfFile(g_fp, "%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString), pInfo->consumerId); - printf("%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString), - pInfo->consumerId); + printf("%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString), pInfo->consumerId); pInfo->ts = taosGetTimestampMs(); @@ -1094,55 +1083,55 @@ void omb_loop_consume(SThreadInfo* pInfo) { uint64_t lastPrintTime = taosGetTimestampMs(); uint64_t startTs = taosGetTimestampMs(); - int64_t totalLenOfMsg = 0; - int64_t lastTotalLenOfMsg = 0; + int64_t totalLenOfMsg = 0; + int64_t lastTotalLenOfMsg = 0; int32_t consumeDelay = g_stConfInfo.consumeDelay == -1 ? -1 : (g_stConfInfo.consumeDelay * 1000); while (running) { TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, consumeDelay); if (tmqMsg) { - int64_t lenOfMsg = 0; + int64_t lenOfMsg = 0; totalRows += omb_data_msg_process(tmqMsg, pInfo, totalMsgs, &lenOfMsg); - totalLenOfMsg += lenOfMsg; + totalLenOfMsg += lenOfMsg; taos_free_result(tmqMsg); totalMsgs++; int64_t currentPrintTime = taosGetTimestampMs(); if (currentPrintTime - lastPrintTime > 10 * 1000) { - int64_t currentLenOfMsg = totalLenOfMsg - lastTotalLenOfMsg; - int64_t deltaTime = currentPrintTime - lastPrintTime; - printf("consumer id %d has currently cons total rows: %" PRId64 ", msgs: %" PRId64 ", rate: %.3f msgs/s, %.1f MB/s\n", - pInfo->consumerId, totalRows, totalMsgs, - (totalMsgs - lastTotalMsgs) * 1000.0 / deltaTime, - currentLenOfMsg*1000.0/(1024*1024)/deltaTime); - - taosFprintfFile( - g_fp, "consumer id %d has currently poll total msgs: %" PRId64 ", period cons rate: %.3f msgs/s, %.1f MB/s\n", - pInfo->consumerId, totalMsgs, (totalMsgs - lastTotalMsgs) * 1000.0 / deltaTime, currentLenOfMsg*1000.0/deltaTime); + int64_t currentLenOfMsg = totalLenOfMsg - lastTotalLenOfMsg; + int64_t deltaTime = currentPrintTime - lastPrintTime; + printf("consumer id %d has currently cons total rows: %" PRId64 ", msgs: %" PRId64 + ", rate: %.3f msgs/s, %.1f MB/s\n", + pInfo->consumerId, totalRows, totalMsgs, (totalMsgs - lastTotalMsgs) * 1000.0 / deltaTime, + currentLenOfMsg * 1000.0 / (1024 * 1024) / deltaTime); + + taosFprintfFile(g_fp, + "consumer id %d has currently poll total msgs: %" PRId64 + ", period cons rate: %.3f msgs/s, %.1f MB/s\n", + pInfo->consumerId, totalMsgs, (totalMsgs - lastTotalMsgs) * 1000.0 / deltaTime, + currentLenOfMsg * 1000.0 / deltaTime); lastPrintTime = currentPrintTime; lastTotalMsgs = totalMsgs; - lastTotalLenOfMsg = totalLenOfMsg; + lastTotalLenOfMsg = totalLenOfMsg; } } else { char tmpString[128]; taosFprintfFile(g_fp, "%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString)); - printf("%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString)); + printf("%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString)); int64_t currentPrintTime = taosGetTimestampMs(); int64_t currentLenOfMsg = totalLenOfMsg - lastTotalLenOfMsg; - int64_t deltaTime = currentPrintTime - lastPrintTime; - printf("consumer id %d has currently cons total rows: %" PRId64 ", msgs: %" PRId64 ", rate: %.3f msgs/s, %.1f MB/s\n", - pInfo->consumerId, totalRows, totalMsgs, - (totalMsgs - lastTotalMsgs) * 1000.0 / deltaTime, - currentLenOfMsg*1000.0/(1024*1024)/deltaTime); + int64_t deltaTime = currentPrintTime - lastPrintTime; + printf("consumer id %d has currently cons total rows: %" PRId64 ", msgs: %" PRId64 + ", rate: %.3f msgs/s, %.1f MB/s\n", + pInfo->consumerId, totalRows, totalMsgs, (totalMsgs - lastTotalMsgs) * 1000.0 / deltaTime, + currentLenOfMsg * 1000.0 / (1024 * 1024) / deltaTime); break; } } pInfo->consumeMsgCnt = totalMsgs; pInfo->consumeRowCnt = totalRows; - pInfo->consumeLen = totalLenOfMsg; - + pInfo->consumeLen = totalLenOfMsg; } - void* ombConsumeThreadFunc(void* param) { SThreadInfo* pInfo = (SThreadInfo*)param; @@ -1209,26 +1198,24 @@ void* ombConsumeThreadFunc(void* param) { return NULL; } +static int queryDbExec(TAOS* taos, char* command, QUERY_TYPE type) { + TAOS_RES* res = taos_query(taos, command); + int32_t code = taos_errno(res); + if (code != 0) { + pPrint("%s Failed to execute <%s>, reason: %s %s", GREEN, command, taos_errstr(res), NC); + taos_free_result(res); + return -1; + } -static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type) { - TAOS_RES *res = taos_query(taos, command); - int32_t code = taos_errno(res); - - if (code != 0) { - pPrint("%s Failed to execute <%s>, reason: %s %s", GREEN, command, taos_errstr(res), NC); - taos_free_result(res); - return -1; - } - - if (INSERT_TYPE == type) { - int affectedRows = taos_affected_rows(res); - taos_free_result(res); - return affectedRows; - } - + if (INSERT_TYPE == type) { + int affectedRows = taos_affected_rows(res); taos_free_result(res); - return 0; + return affectedRows; + } + + taos_free_result(res); + return 0; } void* ombProduceThreadFunc(void* param) { @@ -1236,101 +1223,100 @@ void* ombProduceThreadFunc(void* param) { pInfo->taos = createNewTaosConnect(); if (pInfo->taos == NULL) { - taosFprintfFile(g_fp, "taos_connect() fail, can not start producers!\n"); + taosFprintfFile(g_fp, "taos_connect() fail, can not start producers!\n"); return NULL; } int64_t affectedRowsTotal = 0; int64_t sendMsgs = 0; - uint32_t totalSendLoopTimes = g_stConfInfo.runDurationMinutes * 60 * 1000 / SEND_TIME_UNIT; // send some msgs per 10ms - uint32_t batchPerTblTimes = pInfo->producerRate / 100 / g_stConfInfo.batchSize; - uint32_t remainder = (pInfo->producerRate / 100) % g_stConfInfo.batchSize; + uint32_t totalSendLoopTimes = + g_stConfInfo.runDurationMinutes * 60 * 1000 / SEND_TIME_UNIT; // send some msgs per 10ms + uint32_t batchPerTblTimes = pInfo->producerRate / 100 / g_stConfInfo.batchSize; + uint32_t remainder = (pInfo->producerRate / 100) % g_stConfInfo.batchSize; if (remainder) { - batchPerTblTimes += 1; + batchPerTblTimes += 1; } char* sqlBuf = taosMemoryMalloc(MAX_SQL_LEN); if (NULL == sqlBuf) { printf("malloc fail for sqlBuf\n"); - taos_close(pInfo->taos); + taos_close(pInfo->taos); pInfo->taos = NULL; - return NULL; + return NULL; } - printf("Produce Info: totalSendLoopTimes: %d, batchPerTblTimes: %d, producerRate: %d\n", totalSendLoopTimes, batchPerTblTimes, pInfo->producerRate); + printf("Produce Info: totalSendLoopTimes: %d, batchPerTblTimes: %d, producerRate: %d\n", totalSendLoopTimes, + batchPerTblTimes, pInfo->producerRate); char ctbName[128] = {0}; sprintf(ctbName, "%s.ctb%d", g_stConfInfo.dbName, pInfo->consumerId); - int64_t lastPrintTime = taosGetTimestampUs(); - int64_t totalMsgLen = 0; - //int64_t timeStamp = taosGetTimestampUs(); + int64_t lastPrintTime = taosGetTimestampUs(); + int64_t totalMsgLen = 0; + // int64_t timeStamp = taosGetTimestampUs(); while (totalSendLoopTimes) { - int64_t startTs = taosGetTimestampUs(); + int64_t startTs = taosGetTimestampUs(); for (int i = 0; i < batchPerTblTimes; ++i) { - uint32_t msgsOfSql = g_stConfInfo.batchSize; - if ((i == batchPerTblTimes - 1) && (0 != remainder)) { - msgsOfSql = remainder; - } + uint32_t msgsOfSql = g_stConfInfo.batchSize; + if ((i == batchPerTblTimes - 1) && (0 != remainder)) { + msgsOfSql = remainder; + } int len = 0; - len += snprintf(sqlBuf+len, MAX_SQL_LEN - len, "insert into %s values ", ctbName); + len += snprintf(sqlBuf + len, MAX_SQL_LEN - len, "insert into %s values ", ctbName); for (int j = 0; j < msgsOfSql; j++) { - int64_t timeStamp = taosGetTimestampNs(); - len += snprintf(sqlBuf+len, MAX_SQL_LEN - len, "(%" PRId64 ", \"%s\")", timeStamp, g_payload); - sendMsgs++; - pInfo->totalProduceMsgs++; - } - - totalMsgLen += len; - pInfo->totalMsgsLen += len; - - int64_t affectedRows = queryDbExec(pInfo->taos, sqlBuf, INSERT_TYPE); - if (affectedRows < 0) { - taos_close(pInfo->taos); - pInfo->taos = NULL; - taosMemoryFree(sqlBuf); - return NULL; - } - - affectedRowsTotal += affectedRows; - - //printf("Produce Info: affectedRows: %" PRId64 "\n", affectedRows); + int64_t timeStamp = taosGetTimestampNs(); + len += snprintf(sqlBuf + len, MAX_SQL_LEN - len, "(%" PRId64 ", \"%s\")", timeStamp, g_payload); + sendMsgs++; + pInfo->totalProduceMsgs++; + } + + totalMsgLen += len; + pInfo->totalMsgsLen += len; + + int64_t affectedRows = queryDbExec(pInfo->taos, sqlBuf, INSERT_TYPE); + if (affectedRows < 0) { + taos_close(pInfo->taos); + pInfo->taos = NULL; + taosMemoryFree(sqlBuf); + return NULL; + } + + affectedRowsTotal += affectedRows; + + // printf("Produce Info: affectedRows: %" PRId64 "\n", affectedRows); } totalSendLoopTimes -= 1; - // calc spent time - int64_t currentTs = taosGetTimestampUs(); - int64_t delta = currentTs - startTs; - if (delta < SEND_TIME_UNIT * 1000) { - int64_t sleepLen = (int32_t)(SEND_TIME_UNIT * 1000 - delta); - //printf("sleep %" PRId64 " us, use time: %" PRId64 " us\n", sleepLen, delta); - taosUsleep((int32_t)sleepLen); - } + // calc spent time + int64_t currentTs = taosGetTimestampUs(); + int64_t delta = currentTs - startTs; + if (delta < SEND_TIME_UNIT * 1000) { + int64_t sleepLen = (int32_t)(SEND_TIME_UNIT * 1000 - delta); + // printf("sleep %" PRId64 " us, use time: %" PRId64 " us\n", sleepLen, delta); + taosUsleep((int32_t)sleepLen); + } currentTs = taosGetTimestampUs(); delta = currentTs - lastPrintTime; - if (delta > 10 * 1000 * 1000) { - printf("producer[%d] info: %" PRId64 " msgs, %" PRId64 " Byte, %" PRId64 " us, totalSendLoopTimes: %d\n", - pInfo->consumerId, sendMsgs, totalMsgLen, delta, totalSendLoopTimes); - printf("producer[%d] rate: %1.f msgs/s, %1.f KB/s\n", - pInfo->consumerId, - sendMsgs * 1000.0 * 1000 / delta, - (totalMsgLen / 1024.0) / (delta / (1000*1000))); - lastPrintTime = currentTs; - sendMsgs = 0; - totalMsgLen = 0; - } - } - - printf("affectedRowsTotal: %"PRId64"\n", affectedRowsTotal); + if (delta > 10 * 1000 * 1000) { + printf("producer[%d] info: %" PRId64 " msgs, %" PRId64 " Byte, %" PRId64 " us, totalSendLoopTimes: %d\n", + pInfo->consumerId, sendMsgs, totalMsgLen, delta, totalSendLoopTimes); + printf("producer[%d] rate: %1.f msgs/s, %1.f KB/s\n", pInfo->consumerId, sendMsgs * 1000.0 * 1000 / delta, + (totalMsgLen / 1024.0) / (delta / (1000 * 1000))); + lastPrintTime = currentTs; + sendMsgs = 0; + totalMsgLen = 0; + } + } + + printf("affectedRowsTotal: %" PRId64 "\n", affectedRowsTotal); taos_close(pInfo->taos); pInfo->taos = NULL; taosMemoryFree(sqlBuf); return NULL; } - void printProduceInfo(int64_t start) { int64_t totalMsgs = 0; int64_t totalLenOfMsgs = 0; @@ -1347,87 +1333,86 @@ void printProduceInfo(int64_t start) { double tInMs = (double)t / 1000000.0; printf("Spent %.3f seconds to prod %" PRIu64 " msgs, %" PRIu64 " Byte\n\n", tInMs, totalMsgs, totalLenOfMsgs); - printf("Spent %.3f seconds to prod %" PRIu64 " msgs with %d producer(s), throughput: %.3f msgs/s, %.1f MB/s\n\n", - tInMs, totalMsgs, g_stConfInfo.producers, - (double)totalMsgs / tInMs, - (double)totalLenOfMsgs/(1024.0*1024)/tInMs); + tInMs, totalMsgs, g_stConfInfo.producers, (double)totalMsgs / tInMs, + (double)totalLenOfMsgs / (1024.0 * 1024) / tInMs); return; } - void startOmbConsume() { - TdThreadAttr thattr; - taosThreadAttrInit(&thattr); - taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE); + TdThreadAttr thattr; + taosThreadAttrInit(&thattr); + taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE); if (0 != g_stConfInfo.producers) { TAOS* taos = createNewTaosConnect(); if (taos == NULL) { taosFprintfFile(g_fp, "taos_connect() fail, can not create db, stbl, ctbl, topic!\n"); - return ; + return; } - char stbName[16] = "stb"; - char ctbPrefix[16] = "ctb"; - + char stbName[16] = "stb"; + char ctbPrefix[16] = "ctb"; + char sql[256] = {0}; sprintf(sql, "drop database if exists %s", g_stConfInfo.dbName); - printf("SQL: %s\n", sql); + printf("SQL: %s\n", sql); queryDbExec(taos, sql, NO_INSERT_TYPE); - - sprintf(sql, "create database if not exists %s precision 'ns' vgroups %d", g_stConfInfo.dbName, g_stConfInfo.producers); - printf("SQL: %s\n", sql); + + sprintf(sql, "create database if not exists %s precision 'ns' vgroups %d", g_stConfInfo.dbName, + g_stConfInfo.producers); + printf("SQL: %s\n", sql); queryDbExec(taos, sql, NO_INSERT_TYPE); - sprintf(sql, "create stable %s.%s (ts timestamp, payload binary(%d)) tags (t bigint) ", g_stConfInfo.dbName, stbName, g_stConfInfo.payloadLen); - printf("SQL: %s\n", sql); + sprintf(sql, "create stable %s.%s (ts timestamp, payload binary(%d)) tags (t bigint) ", g_stConfInfo.dbName, + stbName, g_stConfInfo.payloadLen); + printf("SQL: %s\n", sql); queryDbExec(taos, sql, NO_INSERT_TYPE); - for (int i = 0; i < g_stConfInfo.producers; i++) { - sprintf(sql, "create table %s.%s%d using %s.stb tags(%d) ", g_stConfInfo.dbName, ctbPrefix, i, g_stConfInfo.dbName, i); - printf("SQL: %s\n", sql); + for (int i = 0; i < g_stConfInfo.producers; i++) { + sprintf(sql, "create table %s.%s%d using %s.stb tags(%d) ", g_stConfInfo.dbName, ctbPrefix, i, + g_stConfInfo.dbName, i); + printf("SQL: %s\n", sql); queryDbExec(taos, sql, NO_INSERT_TYPE); - } + } - // create topic + // create topic sprintf(sql, "create topic %s as stable %s.%s", g_stConfInfo.topic, g_stConfInfo.dbName, stbName); - printf("SQL: %s\n", sql); + printf("SQL: %s\n", sql); queryDbExec(taos, sql, NO_INSERT_TYPE); - - - int32_t producerRate = ceil(g_stConfInfo.producerRate / g_stConfInfo.producers); - - printf("==== create %d produce thread ====\n", g_stConfInfo.producers); - for (int32_t i = 0; i < g_stConfInfo.producers; ++i) { - g_stConfInfo.stProdThreads[i].consumerId = i; - g_stConfInfo.stProdThreads[i].producerRate = producerRate; - taosThreadCreate(&(g_stConfInfo.stProdThreads[i].thread), &thattr, ombProduceThreadFunc, - (void*)(&(g_stConfInfo.stProdThreads[i]))); - } - - if (0 == g_stConfInfo.numOfThread) { - int64_t start = taosGetTimestampUs(); + + int32_t producerRate = ceil(g_stConfInfo.producerRate / g_stConfInfo.producers); + + printf("==== create %d produce thread ====\n", g_stConfInfo.producers); + for (int32_t i = 0; i < g_stConfInfo.producers; ++i) { + g_stConfInfo.stProdThreads[i].consumerId = i; + g_stConfInfo.stProdThreads[i].producerRate = producerRate; + taosThreadCreate(&(g_stConfInfo.stProdThreads[i].thread), &thattr, ombProduceThreadFunc, + (void*)(&(g_stConfInfo.stProdThreads[i]))); + } + + if (0 == g_stConfInfo.numOfThread) { + int64_t start = taosGetTimestampUs(); for (int32_t i = 0; i < g_stConfInfo.producers; i++) { taosThreadJoin(g_stConfInfo.stProdThreads[i].thread, NULL); taosThreadClear(&g_stConfInfo.stProdThreads[i].thread); } - printProduceInfo(start); - - taosFprintfFile(g_fp, "==== close tmqlog ====\n"); - taosCloseFile(&g_fp); - taos_close(taos); + printProduceInfo(start); + + taosFprintfFile(g_fp, "==== close tmqlog ====\n"); + taosCloseFile(&g_fp); + taos_close(taos); return; - } + } - taos_close(taos); + taos_close(taos); } // pthread_create one thread to consume taosFprintfFile(g_fp, "==== create %d consume thread ====\n", g_stConfInfo.numOfThread); for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) { - g_stConfInfo.stThreads[i].consumerId = i; + g_stConfInfo.stThreads[i].consumerId = i; taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, ombConsumeThreadFunc, (void*)(&(g_stConfInfo.stThreads[i]))); } @@ -1446,24 +1431,23 @@ void startOmbConsume() { int64_t totalLenOfMsgs = 0; for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { totalMsgs += g_stConfInfo.stThreads[i].consumeMsgCnt; - totalLenOfMsgs += g_stConfInfo.stThreads[i].consumeLen; - totalRows += g_stConfInfo.stThreads[i].consumeRowCnt; + totalLenOfMsgs += g_stConfInfo.stThreads[i].consumeLen; + totalRows += g_stConfInfo.stThreads[i].consumeRowCnt; } int64_t t = end - start; if (0 == t) t = 1; double tInMs = (double)t / 1000000.0; - taosFprintfFile(g_fp, - "Spent %.3f seconds to poll msgs: %" PRIu64 " with %d thread(s), throughput: %.3f msgs/s, %.1f MB/s\n\n", - tInMs, totalMsgs, g_stConfInfo.numOfThread, - (double)(totalMsgs / tInMs), - (double)totalLenOfMsgs/(1024*1024)/tInMs); + taosFprintfFile( + g_fp, "Spent %.3f seconds to poll msgs: %" PRIu64 " with %d thread(s), throughput: %.3f msgs/s, %.1f MB/s\n\n", + tInMs, totalMsgs, g_stConfInfo.numOfThread, (double)(totalMsgs / tInMs), + (double)totalLenOfMsgs / (1024 * 1024) / tInMs); - printf("Spent %.3f seconds to cons rows: %" PRIu64 " msgs: %" PRIu64 " with %d thread(s), throughput: %.3f msgs/s, %.1f MB/s\n\n", - tInMs, totalRows, totalMsgs, g_stConfInfo.numOfThread, - (double)(totalMsgs / tInMs), - (double)totalLenOfMsgs/(1024*1024)/tInMs); + printf("Spent %.3f seconds to cons rows: %" PRIu64 " msgs: %" PRIu64 + " with %d thread(s), throughput: %.3f msgs/s, %.1f MB/s\n\n", + tInMs, totalRows, totalMsgs, g_stConfInfo.numOfThread, (double)(totalMsgs / tInMs), + (double)totalLenOfMsgs / (1024 * 1024) / tInMs); taosFprintfFile(g_fp, "==== close tmqlog ====\n"); taosCloseFile(&g_fp); @@ -1471,20 +1455,19 @@ void startOmbConsume() { return; } - int main(int32_t argc, char* argv[]) { parseArgument(argc, argv); if (0 != strlen(g_stConfInfo.topic)) { startOmbConsume(); - return 0; + return 0; } - + int32_t retCode = getConsumeInfo(); if (0 != retCode) { return -1; } - + saveConfigToLogFile(); tmqSetSignalHandle();