提交 4bc09c22 编写于 作者: P plum-lihui

test:merge 3.0

上级 b25442a2
......@@ -22,9 +22,9 @@
#include <time.h>
#include "taos.h"
#include "taosdef.h"
#include "taoserror.h"
#include "tlog.h"
#include "taosdef.h"
#include "types.h"
#define GREEN "\033[1;32m"
......@@ -36,11 +36,7 @@
#define MAX_CONSUMER_THREAD_CNT (16)
#define MAX_VGROUP_CNT (32)
typedef enum {
NOTIFY_CMD_START_CONSUM,
NOTIFY_CMD_START_COMMIT,
NOTIFY_CMD_ID_BUTT
}NOTIFY_CMD_ID;
typedef enum { NOTIFY_CMD_START_CONSUM, NOTIFY_CMD_START_COMMIT, NOTIFY_CMD_ID_BUTT } NOTIFY_CMD_ID;
typedef struct {
TdThread thread;
......@@ -52,8 +48,8 @@ typedef struct {
// char autoOffsetRest[16]; // none, earliest, latest
TdFilePtr pConsumeRowsFile;
int32_t ifCheckData;
int64_t expectMsgCnt;
int32_t ifCheckData;
int64_t expectMsgCnt;
int64_t consumeMsgCnt;
int64_t consumeRowCnt;
......@@ -89,6 +85,7 @@ typedef struct {
int32_t saveRowFlag;
int32_t consumeDelay; // unit s
int32_t numOfThread;
int32_t useSnapshot;
SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT];
} SConfInfo;
......@@ -217,6 +214,8 @@ void parseArgument(int32_t argc, char* argv[]) {
g_stConfInfo.saveRowFlag = atol(argv[++i]);
} else if (strcmp(argv[i], "-y") == 0) {
g_stConfInfo.consumeDelay = atol(argv[++i]);
} else if (strcmp(argv[i], "-e") == 0) {
g_stConfInfo.useSnapshot = atol(argv[++i]);
} else {
pError("%s unknow para: %s %s", GREEN, argv[++i], NC);
exit(-1);
......@@ -310,11 +309,11 @@ int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) {
return 0;
}
static char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision) {
//if (shell.args.is_raw_time) {
// sprintf(buf, "%" PRId64, val);
// return buf;
//}
static char* shellFormatTimestamp(char* buf, int64_t val, int32_t precision) {
// if (shell.args.is_raw_time) {
// sprintf(buf, "%" PRId64, val);
// return buf;
// }
time_t tt;
int32_t ms = 0;
......@@ -352,7 +351,7 @@ static char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision) {
}
}
struct tm *ptm = taosLocalTime(&tt, NULL);
struct tm* ptm = taosLocalTime(&tt, NULL);
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm);
if (precision == TSDB_TIME_PRECISION_NANO) {
......@@ -366,7 +365,8 @@ static char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision) {
return buf;
}
static void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, int32_t length, int32_t precision) {
static void shellDumpFieldToFile(TdFilePtr pFile, const char* val, TAOS_FIELD* field, int32_t length,
int32_t precision) {
if (val == NULL) {
taosFprintfFile(pFile, "%s", TSDB_DATA_NULL_STR);
return;
......@@ -376,31 +376,31 @@ static void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *f
char buf[TSDB_MAX_BYTES_PER_ROW];
switch (field->type) {
case TSDB_DATA_TYPE_BOOL:
taosFprintfFile(pFile, "%d", ((((int32_t)(*((char *)val))) == 1) ? 1 : 0));
taosFprintfFile(pFile, "%d", ((((int32_t)(*((char*)val))) == 1) ? 1 : 0));
break;
case TSDB_DATA_TYPE_TINYINT:
taosFprintfFile(pFile, "%d", *((int8_t *)val));
taosFprintfFile(pFile, "%d", *((int8_t*)val));
break;
case TSDB_DATA_TYPE_UTINYINT:
taosFprintfFile(pFile, "%u", *((uint8_t *)val));
taosFprintfFile(pFile, "%u", *((uint8_t*)val));
break;
case TSDB_DATA_TYPE_SMALLINT:
taosFprintfFile(pFile, "%d", *((int16_t *)val));
taosFprintfFile(pFile, "%d", *((int16_t*)val));
break;
case TSDB_DATA_TYPE_USMALLINT:
taosFprintfFile(pFile, "%u", *((uint16_t *)val));
taosFprintfFile(pFile, "%u", *((uint16_t*)val));
break;
case TSDB_DATA_TYPE_INT:
taosFprintfFile(pFile, "%d", *((int32_t *)val));
taosFprintfFile(pFile, "%d", *((int32_t*)val));
break;
case TSDB_DATA_TYPE_UINT:
taosFprintfFile(pFile, "%u", *((uint32_t *)val));
taosFprintfFile(pFile, "%u", *((uint32_t*)val));
break;
case TSDB_DATA_TYPE_BIGINT:
taosFprintfFile(pFile, "%" PRId64, *((int64_t *)val));
taosFprintfFile(pFile, "%" PRId64, *((int64_t*)val));
break;
case TSDB_DATA_TYPE_UBIGINT:
taosFprintfFile(pFile, "%" PRIu64, *((uint64_t *)val));
taosFprintfFile(pFile, "%" PRIu64, *((uint64_t*)val));
break;
case TSDB_DATA_TYPE_FLOAT:
taosFprintfFile(pFile, "%.5f", GET_FLOAT_VAL(val));
......@@ -421,7 +421,7 @@ static void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *f
taosFprintfFile(pFile, "\'%s\'", buf);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
shellFormatTimestamp(buf, *(int64_t *)val, precision);
shellFormatTimestamp(buf, *(int64_t*)val, precision);
taosFprintfFile(pFile, "'%s'", buf);
break;
default:
......@@ -429,12 +429,13 @@ static void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *f
}
}
static void dumpToFileForCheck(TdFilePtr pFile, TAOS_ROW row, TAOS_FIELD* fields, int32_t* length, int32_t num_fields, int32_t precision) {
static void dumpToFileForCheck(TdFilePtr pFile, TAOS_ROW row, TAOS_FIELD* fields, int32_t* length, int32_t num_fields,
int32_t precision) {
for (int32_t i = 0; i < num_fields; i++) {
if (i > 0) {
taosFprintfFile(pFile, "\n");
}
shellDumpFieldToFile(pFile, (const char *)row[i], fields + i, length[i], precision);
shellDumpFieldToFile(pFile, (const char*)row[i], fields + i, length[i], precision);
}
taosFprintfFile(pFile, "\n");
}
......@@ -444,40 +445,42 @@ static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex)
int32_t totalRows = 0;
// printf("topic: %s\n", tmq_get_topic_name(msg));
int32_t vgroupId = tmq_get_vgroup_id(msg);
const char* dbName = tmq_get_db_name(msg);
int32_t vgroupId = tmq_get_vgroup_id(msg);
const char* dbName = tmq_get_db_name(msg);
taosFprintfFile(g_fp, "consumerId: %d, msg index:%" PRId64 "\n", pInfo->consumerId, msgIndex);
taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId: %d\n", dbName != NULL ? dbName : "invalid table", tmq_get_topic_name(msg), vgroupId);
taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId: %d\n", dbName != NULL ? dbName : "invalid table",
tmq_get_topic_name(msg), vgroupId);
while (1) {
TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break;
TAOS_FIELD* fields = taos_fetch_fields(msg);
TAOS_FIELD* fields = taos_fetch_fields(msg);
int32_t numOfFields = taos_field_count(msg);
int32_t* length = taos_fetch_lengths(msg);
int32_t precision = taos_result_precision(msg);
const char* tbName = tmq_get_table_name(msg);
int32_t* length = taos_fetch_lengths(msg);
int32_t precision = taos_result_precision(msg);
const char* tbName = tmq_get_table_name(msg);
#if 0
#if 0
// get schema
//============================== stub =================================================//
for (int32_t i = 0; i < numOfFields; i++) {
taosFprintfFile(g_fp, "%02d: name: %s, type: %d, len: %d\n", i, fields[i].name, fields[i].type, fields[i].bytes);
}
//============================== stub =================================================//
#endif
dumpToFileForCheck(pInfo->pConsumeRowsFile, row, fields, length, numOfFields, precision);
#endif
dumpToFileForCheck(pInfo->pConsumeRowsFile, row, fields, length, numOfFields, precision);
taos_print_row(buf, row, fields, numOfFields);
if (0 != g_stConfInfo.showRowFlag) {
taosFprintfFile(g_fp, "tbname:%s, rows[%d]: %s\n", (tbName != NULL ? tbName : "null table"), totalRows, buf);
//if (0 != g_stConfInfo.saveRowFlag) {
// saveConsumeContentToTbl(pInfo, buf);
//}
// if (0 != g_stConfInfo.saveRowFlag) {
// saveConsumeContentToTbl(pInfo, buf);
// }
}
totalRows++;
......@@ -500,8 +503,7 @@ int queryDB(TAOS* taos, char* command) {
return 0;
}
static void appNothing(void* param, TAOS_RES* res, int32_t numOfRows) {
}
static void appNothing(void* param, TAOS_RES* res, int32_t numOfRows) {}
int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
char sqlStr[1024] = {0};
......@@ -509,11 +511,8 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
int64_t now = taosGetTimestampMs();
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
sprintf(sqlStr, "insert into %s.notifyinfo values (%"PRId64", %d, %d)",
g_stConfInfo.cdbName,
now,
cmdId,
pInfo->consumerId);
sprintf(sqlStr, "insert into %s.notifyinfo values (%" PRId64 ", %d, %d)", g_stConfInfo.cdbName, now, cmdId,
pInfo->consumerId);
taos_query_a(pInfo->taos, sqlStr, appNothing, NULL);
......@@ -523,12 +522,12 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
}
static int32_t g_once_commit_flag = 0;
static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
pError("tmq_commit_cb_print() commit %d\n", code);
static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
pError("tmq_commit_cb_print() commit %d\n", code);
if (0 == g_once_commit_flag) {
g_once_commit_flag = 1;
notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT);
notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT);
}
char tmpString[128];
......@@ -564,6 +563,10 @@ void build_consumer(SThreadInfo* pInfo) {
// tmq_conf_set(conf, "auto.offset.reset", "none");
// tmq_conf_set(conf, "auto.offset.reset", "earliest");
// tmq_conf_set(conf, "auto.offset.reset", "latest");
//
if (g_stConfInfo.useSnapshot) {
tmq_conf_set(conf, "experiment.use.snapshot", "true");
}
pInfo->tmq = tmq_consumer_new(conf, NULL, 0);
......@@ -618,12 +621,13 @@ void loop_consume(SThreadInfo* pInfo) {
pInfo->consumerId);
pInfo->ts = taosGetTimestampMs();
if (pInfo->ifCheckData) {
char filename[256] = {0};
char filename[256] = {0};
char tmpString[128];
//sprintf(filename, "%s/../log/consumerid_%d_%s.txt", configDir, pInfo->consumerId, getCurrentTimeString(tmpString));
sprintf(filename, "%s/../log/consumerid_%d.txt", configDir, pInfo->consumerId);
// sprintf(filename, "%s/../log/consumerid_%d_%s.txt", configDir, pInfo->consumerId,
// getCurrentTimeString(tmpString));
sprintf(filename, "%s/../log/consumerid_%d.txt", configDir, pInfo->consumerId);
pInfo->pConsumeRowsFile = taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
if (pInfo->pConsumeRowsFile == NULL) {
taosFprintfFile(g_fp, "%s create file fail for save rows content\n", getCurrentTimeString(tmpString));
......@@ -642,10 +646,10 @@ void loop_consume(SThreadInfo* pInfo) {
totalMsgs++;
if (0 == once_flag) {
if (0 == once_flag) {
once_flag = 1;
notifyMainScript(pInfo, NOTIFY_CMD_START_CONSUM);
}
notifyMainScript(pInfo, NOTIFY_CMD_START_CONSUM);
}
if (totalRows >= pInfo->expectMsgCnt) {
char tmpString[128];
......@@ -678,7 +682,7 @@ void* consumeThreadFunc(void* param) {
pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
if (pInfo->taos == NULL) {
taosFprintfFile(g_fp, "taos_connect() fail, can not notify and save consume result to main scripte\n");
return NULL;
return NULL;
}
build_consumer(pInfo);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册