未验证 提交 29c7cbf4 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #11702 from taosdata/feature/tq

refactor(tmq)
......@@ -141,7 +141,7 @@ int32_t create_topic() {
return 0;
}
void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets, void* param) {
void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets) {
printf("commit %d\n", resp);
}
......@@ -163,7 +163,7 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "td.connect.db", "abc1");
tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print);
tmq_t* tmq = tmq_consumer_new1(conf, NULL, 0);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
return tmq;
}
......@@ -189,7 +189,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
cnt++;
/*printf("get data\n");*/
/*msg_process(tmqmessage);*/
tmq_message_destroy(tmqmessage);
taos_free_result(tmqmessage);
/*} else {*/
/*break;*/
}
......@@ -219,7 +219,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000);
if (tmqmessage) {
msg_process(tmqmessage);
tmq_message_destroy(tmqmessage);
taos_free_result(tmqmessage);
/*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
}
......@@ -249,7 +249,7 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics) {
batchCnt++;
/*skipLogNum += tmqGetSkipLogNum(tmqmessage);*/
/*msg_process(tmqmessage);*/
tmq_message_destroy(tmqmessage);
taos_free_result(tmqmessage);
} else {
break;
}
......
......@@ -217,17 +217,17 @@ typedef struct tmq_conf_t tmq_conf_t;
typedef struct tmq_list_t tmq_list_t;
// typedef struct tmq_message_t tmq_message_t;
typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *, void *param));
typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *));
DLL_EXPORT tmq_list_t *tmq_list_new();
DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
DLL_EXPORT void tmq_list_destroy(tmq_list_t *);
#if 1
#if 0
DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen);
#endif
DLL_EXPORT tmq_t *tmq_consumer_new1(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
......@@ -271,14 +271,19 @@ int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message);
DLL_EXPORT char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
// TODO
#if 0
DLL_EXPORT char *tmq_get_block_table_name(TAOS_RES *res);
#endif
#if 0
DLL_EXPORT TAOS_ROW tmq_get_row(tmq_message_t *message);
DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message);
DLL_EXPORT int64_t tmq_get_response_offset(tmq_message_t *message);
DLL_EXPORT TAOS_FIELD *tmq_get_fields(tmq_t *tmq, const char *topic);
DLL_EXPORT int32_t tmq_field_count(tmq_t *tmq, const char *topic);
#endif
DLL_EXPORT void tmq_message_destroy(TAOS_RES *res);
#endif
/* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */
#if 0
DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen);
......
......@@ -24,6 +24,7 @@
#include "tqueue.h"
#include "tref.h"
#if 0
struct tmq_message_t {
SMqPollRsp msg;
char* topic;
......@@ -31,6 +32,7 @@ struct tmq_message_t {
int32_t vgId;
int32_t resIter;
};
#endif
typedef struct {
int8_t tmqRspType;
......@@ -52,9 +54,7 @@ struct tmq_topic_vgroup_t {
};
struct tmq_topic_vgroup_list_t {
int32_t cnt;
int32_t size;
tmq_topic_vgroup_t* elems;
SArray container; // SArray<tmq_topic_vgroup_t*>
};
struct tmq_conf_t {
......@@ -63,6 +63,7 @@ struct tmq_conf_t {
int8_t autoCommit;
int8_t resetOffset;
uint16_t port;
uint16_t autoCommitInterval;
char* ip;
char* user;
char* pass;
......@@ -202,6 +203,11 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
}
}
if (strcmp(key, "auto.commit.interval.ms") == 0) {
conf->autoCommitInterval = atoi(value);
return TMQ_CONF_OK;
}
if (strcmp(key, "auto.offset.reset") == 0) {
if (strcmp(value, "none") == 0) {
conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
......@@ -300,7 +306,7 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
pParam->rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL;
if (pParam->tmq->commit_cb) {
pParam->tmq->commit_cb(pParam->tmq, pParam->rspErr, NULL, NULL);
pParam->tmq->commit_cb(pParam->tmq, pParam->rspErr, NULL);
}
if (!pParam->async) tsem_post(&pParam->rspSem);
return 0;
......@@ -322,6 +328,7 @@ tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq) {
return tmq_subscribe(tmq, lst);
}
#if 0
tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
tmq_t* pTmq = taosMemoryCalloc(sizeof(tmq_t), 1);
if (pTmq == NULL) {
......@@ -357,8 +364,9 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
return pTmq;
}
#endif
tmq_t* tmq_consumer_new1(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
if (pTmq == NULL) {
return NULL;
......@@ -369,6 +377,7 @@ tmq_t* tmq_consumer_new1(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
ASSERT(user);
ASSERT(pass);
ASSERT(conf->db);
ASSERT(conf->groupId[0]);
pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, conf->db, conf->port, CONN_TYPE__TMQ);
if (pTmq->pTscObj == NULL) return NULL;
......@@ -429,8 +438,8 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
req.num = pArray->size;
req.offsets = pArray->pData;
} else {
req.num = offsets->cnt;
req.offsets = (SMqOffset*)offsets->elems;
req.num = taosArrayGetSize(&offsets->container);
req.offsets = (SMqOffset*)offsets->container.pData;
}
SCoder encoder;
......@@ -1538,16 +1547,6 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v
}
#endif
#if 0
void tmq_message_destroy(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return;
SMqPollRsp* pRsp = &tmq_message->msg;
tDeleteSMqConsumeRsp(pRsp);
/*taosMemoryFree(tmq_message);*/
taosFreeQitem(tmq_message);
}
#endif
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { return TMQ_RESP_ERR__SUCCESS; }
const char* tmq_err2str(tmq_resp_err_t err) {
......
......@@ -108,7 +108,7 @@ TEST(testCase, tmq_subscribe_ctb_Test) {
while (1) {
tmq_message_t* msg = tmq_consumer_poll(tmq, 1000);
tmq_message_destroy(msg);
taos_free_result(msg);
//printf("get msg\n");
//if (msg == NULL) break;
}
......@@ -141,7 +141,7 @@ TEST(testCase, tmq_subscribe_stb_Test) {
tmq_commit(tmq, NULL, 0);
}
//tmq_commit(tmq, NULL, 0);
tmq_message_destroy(msg);
taos_free_result(msg);
//printf("get msg\n");
}
}
......
......@@ -340,7 +340,7 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
tmq_t* tmq = tmq_consumer_new1(conf, NULL, 0);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
assert(tmq);
tmq_conf_destroy(conf);
return tmq;
......@@ -367,7 +367,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1);
if (tmqmessage) {
/*msg_process(tmqmessage);*/
tmq_message_destroy(tmqmessage);
taos_free_result(tmqmessage);
if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);
}
......@@ -400,7 +400,7 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog
if (0 != g_stConfInfo.showMsgFlag) {
/*msg_process(tmqmessage);*/
}
tmq_message_destroy(tmqmessage);
taos_free_result(tmqmessage);
} else {
break;
}
......
......@@ -31,46 +31,46 @@
#define NC "\033[0m"
#define min(a, b) (((a) < (b)) ? (a) : (b))
#define MAX_SQL_STR_LEN (1024 * 1024)
#define MAX_ROW_STR_LEN (16 * 1024)
#define MAX_CONSUMER_THREAD_CNT (16)
#define MAX_SQL_STR_LEN (1024 * 1024)
#define MAX_ROW_STR_LEN (16 * 1024)
#define MAX_CONSUMER_THREAD_CNT (16)
typedef struct {
TdThread thread;
int32_t consumerId;
TdThread thread;
int32_t consumerId;
int32_t ifCheckData;
int64_t expectMsgCnt;
int64_t consumeMsgCnt;
int32_t checkresult;
int32_t ifCheckData;
int64_t expectMsgCnt;
char topicString[1024];
char keyString[1024];
int64_t consumeMsgCnt;
int32_t checkresult;
int32_t numOfTopic;
char topics[32][64];
char topicString[1024];
char keyString[1024];
int32_t numOfKey;
char key[32][64];
char value[32][64];
int32_t numOfTopic;
char topics[32][64];
int32_t numOfKey;
char key[32][64];
char value[32][64];
tmq_t* tmq;
tmq_list_t* topicList;
} SThreadInfo;
typedef struct {
// input from argvs
char dbName[32];
int32_t showMsgFlag;
int32_t consumeDelay; // unit s
int32_t numOfThread;
SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT];
char dbName[32];
int32_t showMsgFlag;
int32_t consumeDelay; // unit s
int32_t numOfThread;
SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT];
} SConfInfo;
static SConfInfo g_stConfInfo;
TdFilePtr g_fp = NULL;
TdFilePtr g_fp = NULL;
// char* g_pRowValue = NULL;
// TdFilePtr g_fp = NULL;
......@@ -95,7 +95,7 @@ void initLogFile() {
TdFilePtr pFile = taosOpenFile("./tmqlog.txt", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_STREAM);
if (NULL == pFile) {
fprintf(stderr, "Failed to open %s for save result\n", "./tmqlog.txt");
exit -1;
exit - 1;
};
g_fp = pFile;
......@@ -103,27 +103,27 @@ void initLogFile() {
struct tm tm = *taosLocalTime(&tTime, NULL);
taosFprintfFile(pFile, "###################################################################\n");
taosFprintfFile(pFile, "# configDir: %s\n", configDir);
taosFprintfFile(pFile, "# dbName: %s\n", g_stConfInfo.dbName);
taosFprintfFile(pFile, "# showMsgFlag: %d\n", g_stConfInfo.showMsgFlag);
taosFprintfFile(pFile, "# consumeDelay: %d\n", g_stConfInfo.consumeDelay);
taosFprintfFile(pFile, "# configDir: %s\n", configDir);
taosFprintfFile(pFile, "# dbName: %s\n", g_stConfInfo.dbName);
taosFprintfFile(pFile, "# showMsgFlag: %d\n", g_stConfInfo.showMsgFlag);
taosFprintfFile(pFile, "# consumeDelay: %d\n", g_stConfInfo.consumeDelay);
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
taosFprintfFile(pFile, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId);
taosFprintfFile(pFile, " Topics: ");
for (int i = 0 ; i < g_stConfInfo.stThreads[i].numOfTopic; i++) {
taosFprintfFile(pFile, "%s, ", g_stConfInfo.stThreads[i].topics[i]);
taosFprintfFile(pFile, " Topics: ");
for (int i = 0; i < g_stConfInfo.stThreads[i].numOfTopic; i++) {
taosFprintfFile(pFile, "%s, ", g_stConfInfo.stThreads[i].topics[i]);
}
taosFprintfFile(pFile, "\n");
taosFprintfFile(pFile, "\n");
taosFprintfFile(pFile, " Key: ");
for (int i = 0 ; i < g_stConfInfo.stThreads[i].numOfKey; i++) {
taosFprintfFile(pFile, "%s:%s, ", g_stConfInfo.stThreads[i].key[i], g_stConfInfo.stThreads[i].value[i]);
for (int i = 0; i < g_stConfInfo.stThreads[i].numOfKey; i++) {
taosFprintfFile(pFile, "%s:%s, ", g_stConfInfo.stThreads[i].key[i], g_stConfInfo.stThreads[i].value[i]);
}
taosFprintfFile(pFile, "\n");
}
taosFprintfFile(pFile, "# Test time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1,
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
taosFprintfFile(pFile, "###################################################################\n");
}
......@@ -180,23 +180,23 @@ void ltrim(char* str) {
// return str;
}
static int running = 1;
static int running = 1;
static void msg_process(TAOS_RES* msg, int32_t msgIndex, int32_t threadLable) {
char buf[1024];
//printf("topic: %s\n", tmq_get_topic_name(msg));
//printf("vg:%d\n", tmq_get_vgroup_id(msg));
taosFprintfFile(g_fp, "msg index:%d, threadLable: %d\n", msgIndex, threadLable);
taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), tmq_get_vgroup_id(msg));
// printf("topic: %s\n", tmq_get_topic_name(msg));
// printf("vg:%d\n", tmq_get_vgroup_id(msg));
taosFprintfFile(g_fp, "msg index:%d, threadLable: %d\n", msgIndex, threadLable);
taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), tmq_get_vgroup_id(msg));
while (1) {
TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break;
TAOS_FIELD* fields = taos_fetch_fields(msg);
int32_t numOfFields = taos_field_count(msg);
//taos_print_row(buf, row, fields, numOfFields);
//printf("%s\n", buf);
//taosFprintfFile(g_fp, "%s\n", buf);
// taos_print_row(buf, row, fields, numOfFields);
// printf("%s\n", buf);
// taosFprintfFile(g_fp, "%s\n", buf);
}
}
......@@ -213,7 +213,7 @@ int queryDB(TAOS* taos, char* command) {
return 0;
}
void build_consumer(SThreadInfo *pInfo) {
void build_consumer(SThreadInfo* pInfo) {
char sqlStr[1024] = {0};
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
......@@ -233,11 +233,11 @@ void build_consumer(SThreadInfo *pInfo) {
for (int32_t i = 0; i < pInfo->numOfKey; i++) {
tmq_conf_set(conf, pInfo->key[i], pInfo->value[i]);
}
pInfo->tmq = tmq_consumer_new(pConn, conf, NULL, 0);
pInfo->tmq = tmq_consumer_new(conf, NULL, 0);
return;
}
void build_topic_list(SThreadInfo *pInfo) {
void build_topic_list(SThreadInfo* pInfo) {
pInfo->topicList = tmq_list_new();
// tmq_list_append(topic_list, "test_stb_topic_1");
for (int32_t i = 0; i < pInfo->numOfTopic; i++) {
......@@ -246,48 +246,45 @@ void build_topic_list(SThreadInfo *pInfo) {
return;
}
int32_t saveConsumeResult(SThreadInfo *pInfo) {
int32_t saveConsumeResult(SThreadInfo* pInfo) {
char sqlStr[1024] = {0};
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
assert(pConn != NULL);
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %d)",
g_stConfInfo.dbName,
pInfo->consumerId,
pInfo->consumeMsgCnt,
pInfo->checkresult);
sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %d)", g_stConfInfo.dbName,
pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->checkresult);
TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) {
printf("error in save consumeinfo, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
exit(-1);
}
taos_free_result(pRes);
return 0;
}
void loop_consume(SThreadInfo *pInfo) {
void loop_consume(SThreadInfo* pInfo) {
tmq_resp_err_t err;
int64_t totalMsgs = 0;
//int64_t totalRows = 0;
// int64_t totalRows = 0;
while (running) {
TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, g_stConfInfo.consumeDelay * 1000);
if (tmqMsg) {
if (tmqMsg) {
if (0 != g_stConfInfo.showMsgFlag) {
msg_process(tmqMsg, totalMsgs, 0);
}
tmq_message_destroy(tmqMsg);
taos_free_result(tmqMsg);
totalMsgs++;
if (totalMsgs >= pInfo->expectMsgCnt) {
break;
}
......@@ -295,7 +292,7 @@ void loop_consume(SThreadInfo *pInfo) {
break;
}
}
err = tmq_consumer_close(pInfo->tmq);
if (err) {
printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
......@@ -303,35 +300,34 @@ void loop_consume(SThreadInfo *pInfo) {
}
pInfo->consumeMsgCnt = totalMsgs;
}
void *consumeThreadFunc(void *param) {
void* consumeThreadFunc(void* param) {
int32_t totalMsgs = 0;
SThreadInfo *pInfo = (SThreadInfo *)param;
SThreadInfo* pInfo = (SThreadInfo*)param;
build_consumer(pInfo);
build_topic_list(pInfo);
if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)){
if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) {
return NULL;
}
tmq_resp_err_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList);
if (err) {
printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
exit(-1);
}
loop_consume(pInfo);
err = tmq_unsubscribe(pInfo->tmq);
if (err) {
printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
pInfo->consumeMsgCnt = -1;
pInfo->consumeMsgCnt = -1;
return NULL;
}
}
// save consume result into consumeresult table
saveConsumeResult(pInfo);
......@@ -343,7 +339,7 @@ void parseConsumeInfo() {
const char delim[2] = ",";
const char ch = ':';
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
token = strtok(g_stConfInfo.stThreads[i].topicString, delim);
while (token != NULL) {
// printf("%s\n", token );
......@@ -351,10 +347,10 @@ void parseConsumeInfo() {
ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]);
// printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
g_stConfInfo.stThreads[i].numOfTopic++;
token = strtok(NULL, delim);
}
token = strtok(g_stConfInfo.stThreads[i].keyString, delim);
while (token != NULL) {
// printf("%s\n", token );
......@@ -368,7 +364,7 @@ void parseConsumeInfo() {
// g_stConfInfo.value[g_stConfInfo.numOfKey]);
g_stConfInfo.stThreads[i].numOfKey++;
}
token = strtok(NULL, delim);
}
}
......@@ -376,46 +372,47 @@ void parseConsumeInfo() {
int32_t getConsumeInfo() {
char sqlStr[1024] = {0};
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
assert(pConn != NULL);
sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.dbName);
TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) {
printf("error in get consumeinfo, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
exit(-1);
}
TAOS_ROW row = NULL;
int num_fields = taos_num_fields(pRes);
TAOS_FIELD* fields = taos_fetch_fields(pRes);
// schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int
}
TAOS_ROW row = NULL;
int num_fields = taos_num_fields(pRes);
TAOS_FIELD* fields = taos_fetch_fields(pRes);
// schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint,
// ifcheckdata int
int32_t numOfThread = 0;
while ((row = taos_fetch_row(pRes))) {
int32_t* lengths = taos_fetch_lengths(pRes);
for (int i = 0; i < num_fields; ++i) {
int32_t* lengths = taos_fetch_lengths(pRes);
for (int i = 0; i < num_fields; ++i) {
if (row[i] == NULL || 0 == i) {
continue;
}
if ((1 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
g_stConfInfo.stThreads[numOfThread].consumerId = *((int32_t *)row[i]);
g_stConfInfo.stThreads[numOfThread].consumerId = *((int32_t*)row[i]);
} else if ((2 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
memcpy(g_stConfInfo.stThreads[numOfThread].topicString, row[i], lengths[i]);
} else if ((3 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
memcpy(g_stConfInfo.stThreads[numOfThread].keyString, row[i], lengths[i]);
} else if ((4 == i) && (fields[i].type == TSDB_DATA_TYPE_BIGINT)) {
g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t *)row[i]);
g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t*)row[i]);
} else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t *)row[i]);
g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t*)row[i]);
}
}
numOfThread ++;
numOfThread++;
}
g_stConfInfo.numOfThread = numOfThread;
......@@ -426,7 +423,6 @@ int32_t getConsumeInfo() {
return 0;
}
int main(int32_t argc, char* argv[]) {
parseArgument(argc, argv);
getConsumeInfo();
......@@ -438,18 +434,19 @@ int main(int32_t argc, char* argv[]) {
// pthread_create one thread to consume
for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) {
taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, consumeThreadFunc, (void *)(&(g_stConfInfo.stThreads[i])));
taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, consumeThreadFunc,
(void*)(&(g_stConfInfo.stThreads[i])));
}
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL);
}
//printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);
taosFprintfFile(g_fp, "\n");
taosCloseFile(&g_fp);
// printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);
taosFprintfFile(g_fp, "\n");
taosCloseFile(&g_fp);
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册