提交 79bc5797 编写于 作者: L Liu Jicong

test: fix tmq test case

上级 1a13affb
...@@ -13,51 +13,49 @@ ...@@ -13,51 +13,49 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
// clang-format off
#include <assert.h> #include <assert.h>
#include <dirent.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h>
#include <string.h> #include <string.h>
#include <time.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/types.h> #include <sys/types.h>
#include <time.h>
#include <unistd.h> #include <unistd.h>
#include <stdlib.h>
#include <dirent.h>
#include "taos.h" #include "taos.h"
#include "taoserror.h" #include "taoserror.h"
#include "tlog.h" #include "tlog.h"
#define GREEN "\033[1;32m" #define GREEN "\033[1;32m"
#define NC "\033[0m" #define NC "\033[0m"
#define min(a, b) (((a) < (b)) ? (a) : (b)) #define min(a, b) (((a) < (b)) ? (a) : (b))
#define MAX_SQL_STR_LEN (1024 * 1024) #define MAX_SQL_STR_LEN (1024 * 1024)
#define MAX_ROW_STR_LEN (16 * 1024) #define MAX_ROW_STR_LEN (16 * 1024)
typedef struct { typedef struct {
// input from argvs // input from argvs
char dbName[32]; char dbName[32];
char topicString[256]; char topicString[256];
char keyString[1024]; char keyString[1024];
int32_t showMsgFlag; int32_t showMsgFlag;
int32_t consumeDelay; // unit s int32_t consumeDelay; // unit s
int32_t consumeMsgCnt; int32_t consumeMsgCnt;
// save result after parse agrvs // save result after parse agrvs
int32_t numOfTopic; int32_t numOfTopic;
char topics[32][64]; char topics[32][64];
int32_t numOfKey; int32_t numOfKey;
char key[32][64]; char key[32][64];
char value[32][64]; char value[32][64];
} SConfInfo; } SConfInfo;
static SConfInfo g_stConfInfo; static SConfInfo g_stConfInfo;
//char* g_pRowValue = NULL; // char* g_pRowValue = NULL;
//TdFilePtr g_fp = NULL; // TdFilePtr g_fp = NULL;
static void printHelp() { static void printHelp() {
char indent[10] = " "; char indent[10] = " ";
...@@ -80,13 +78,12 @@ static void printHelp() { ...@@ -80,13 +78,12 @@ static void printHelp() {
exit(EXIT_SUCCESS); exit(EXIT_SUCCESS);
} }
void parseArgument(int32_t argc, char *argv[]) { void parseArgument(int32_t argc, char* argv[]) {
memset(&g_stConfInfo, 0, sizeof(SConfInfo)); memset(&g_stConfInfo, 0, sizeof(SConfInfo));
g_stConfInfo.showMsgFlag = 0; g_stConfInfo.showMsgFlag = 0;
g_stConfInfo.consumeDelay = 8000; g_stConfInfo.consumeDelay = 8000;
g_stConfInfo.consumeMsgCnt = 0; g_stConfInfo.consumeMsgCnt = 0;
for (int32_t i = 1; i < argc; i++) { for (int32_t i = 1; i < argc; i++) {
if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) { if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
printHelp(); printHelp();
...@@ -107,7 +104,7 @@ void parseArgument(int32_t argc, char *argv[]) { ...@@ -107,7 +104,7 @@ void parseArgument(int32_t argc, char *argv[]) {
g_stConfInfo.consumeMsgCnt = atol(argv[++i]); g_stConfInfo.consumeMsgCnt = atol(argv[++i]);
} else { } else {
printf("%s unknow para: %s %s", GREEN, argv[++i], NC); printf("%s unknow para: %s %s", GREEN, argv[++i], NC);
exit(-1); exit(-1);
} }
} }
...@@ -117,91 +114,87 @@ void parseArgument(int32_t argc, char *argv[]) { ...@@ -117,91 +114,87 @@ void parseArgument(int32_t argc, char *argv[]) {
pPrint("%s topicString:%s %s", GREEN, g_stConfInfo.topicString, NC); pPrint("%s topicString:%s %s", GREEN, g_stConfInfo.topicString, NC);
pPrint("%s keyString:%s %s", GREEN, g_stConfInfo.keyString, NC); pPrint("%s keyString:%s %s", GREEN, g_stConfInfo.keyString, NC);
pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC); pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC);
#endif #endif
} }
void splitStr(char **arr, char *str, const char *del) { void splitStr(char** arr, char* str, const char* del) {
char *s = strtok(str, del); char* s = strtok(str, del);
while(s != NULL) { while (s != NULL) {
*arr++ = s; *arr++ = s;
s = strtok(NULL, del); s = strtok(NULL, del);
} }
} }
void ltrim(char *str) void ltrim(char* str) {
{ if (str == NULL || *str == '\0') {
if (str == NULL || *str == '\0') return;
{ }
return; int len = 0;
} char* p = str;
int len = 0; while (*p != '\0' && isspace(*p)) {
char *p = str; ++p;
while (*p != '\0' && isspace(*p)) ++len;
{ }
++p; ++len; memmove(str, p, strlen(str) - len + 1);
} // return str;
memmove(str, p, strlen(str) - len + 1);
//return str;
} }
void parseInputString() { void parseInputString() {
//printf("topicString: %s\n", g_stConfInfo.topicString); // printf("topicString: %s\n", g_stConfInfo.topicString);
//printf("keyString: %s\n\n", g_stConfInfo.keyString); // printf("keyString: %s\n\n", g_stConfInfo.keyString);
char *token; char* token;
const char delim[2] = ","; const char delim[2] = ",";
const char ch = ':'; const char ch = ':';
token = strtok(g_stConfInfo.topicString, delim); token = strtok(g_stConfInfo.topicString, delim);
while(token != NULL) { while (token != NULL) {
//printf("%s\n", token ); // printf("%s\n", token );
strcpy(g_stConfInfo.topics[g_stConfInfo.numOfTopic], token); strcpy(g_stConfInfo.topics[g_stConfInfo.numOfTopic], token);
ltrim(g_stConfInfo.topics[g_stConfInfo.numOfTopic]); ltrim(g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
//printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]); // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
g_stConfInfo.numOfTopic++; g_stConfInfo.numOfTopic++;
token = strtok(NULL, delim); token = strtok(NULL, delim);
} }
token = strtok(g_stConfInfo.keyString, delim); token = strtok(g_stConfInfo.keyString, delim);
while(token != NULL) { while (token != NULL) {
//printf("%s\n", token ); // printf("%s\n", token );
{ {
char* pstr = token; char* pstr = token;
ltrim(pstr); ltrim(pstr);
char *ret = strchr(pstr, ch); char* ret = strchr(pstr, ch);
memcpy(g_stConfInfo.key[g_stConfInfo.numOfKey], pstr, ret-pstr); memcpy(g_stConfInfo.key[g_stConfInfo.numOfKey], pstr, ret - pstr);
strcpy(g_stConfInfo.value[g_stConfInfo.numOfKey], ret+1); strcpy(g_stConfInfo.value[g_stConfInfo.numOfKey], ret + 1);
//printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey], g_stConfInfo.value[g_stConfInfo.numOfKey]); // printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
g_stConfInfo.numOfKey++; // g_stConfInfo.value[g_stConfInfo.numOfKey]);
g_stConfInfo.numOfKey++;
} }
token = strtok(NULL, delim); token = strtok(NULL, delim);
} }
} }
static int running = 1;
static int running = 1;
/*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/ /*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/
int queryDB(TAOS* taos, char* command) {
int queryDB(TAOS *taos, char *command) { TAOS_RES* pRes = taos_query(taos, command);
TAOS_RES *pRes = taos_query(taos, command); int code = taos_errno(pRes);
int code = taos_errno(pRes); // if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) {
//if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) { if (code != 0) {
if (code != 0) { pError("failed to reason:%s, sql: %s", tstrerror(code), command);
pError("failed to reason:%s, sql: %s", tstrerror(code), command); taos_free_result(pRes);
taos_free_result(pRes); return -1;
return -1; }
} taos_free_result(pRes);
taos_free_result(pRes); return 0;
return 0 ;
} }
tmq_t* build_consumer() { tmq_t* build_consumer() {
char sqlStr[1024] = {0}; char sqlStr[1024] = {0};
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
assert(pConn != NULL); assert(pConn != NULL);
...@@ -209,13 +202,13 @@ tmq_t* build_consumer() { ...@@ -209,13 +202,13 @@ tmq_t* build_consumer() {
TAOS_RES* pRes = taos_query(pConn, sqlStr); TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes)); printf("error in use db, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes); taos_free_result(pRes);
exit(-1); exit(-1);
} }
taos_free_result(pRes); taos_free_result(pRes);
tmq_conf_t* conf = tmq_conf_new(); tmq_conf_t* conf = tmq_conf_new();
//tmq_conf_set(conf, "group.id", "tg2"); // tmq_conf_set(conf, "group.id", "tg2");
for (int32_t i = 0; i < g_stConfInfo.numOfKey; i++) { for (int32_t i = 0; i < g_stConfInfo.numOfKey; i++) {
tmq_conf_set(conf, g_stConfInfo.key[i], g_stConfInfo.value[i]); tmq_conf_set(conf, g_stConfInfo.key[i], g_stConfInfo.value[i]);
} }
...@@ -225,7 +218,7 @@ tmq_t* build_consumer() { ...@@ -225,7 +218,7 @@ tmq_t* build_consumer() {
tmq_list_t* build_topic_list() { tmq_list_t* build_topic_list() {
tmq_list_t* topic_list = tmq_list_new(); tmq_list_t* topic_list = tmq_list_new();
//tmq_list_append(topic_list, "test_stb_topic_1"); // tmq_list_append(topic_list, "test_stb_topic_1");
for (int32_t i = 0; i < g_stConfInfo.numOfTopic; i++) { for (int32_t i = 0; i < g_stConfInfo.numOfTopic; i++) {
tmq_list_append(topic_list, g_stConfInfo.topics[i]); tmq_list_append(topic_list, g_stConfInfo.topics[i]);
} }
...@@ -241,19 +234,19 @@ void loop_consume(tmq_t* tmq) { ...@@ -241,19 +234,19 @@ void loop_consume(tmq_t* tmq) {
while (running) { while (running) {
TAOS_RES* tmqMsg = tmq_consumer_poll(tmq, 8000); TAOS_RES* tmqMsg = tmq_consumer_poll(tmq, 8000);
if (tmqMsg) { if (tmqMsg) {
totalMsgs++; totalMsgs++;
#if 0 #if 0
TAOS_ROW row; TAOS_ROW row;
while (NULL != (row = tmq_get_row(tmqMsg))) { while (NULL != (row = tmq_get_row(tmqMsg))) {
totalRows++; totalRows++;
} }
#endif #endif
/*skipLogNum += tmqGetSkipLogNum(tmqMsg);*/ /*skipLogNum += tmqGetSkipLogNum(tmqMsg);*/
if (0 != g_stConfInfo.showMsgFlag) { if (0 != g_stConfInfo.showMsgFlag) {
/*msg_process(tmqMsg);*/ /*msg_process(tmqMsg);*/
} }
tmq_message_destroy(tmqMsg); tmq_message_destroy(tmqMsg);
} else { } else {
break; break;
...@@ -263,13 +256,12 @@ void loop_consume(tmq_t* tmq) { ...@@ -263,13 +256,12 @@ void loop_consume(tmq_t* tmq) {
err = tmq_consumer_close(tmq); err = tmq_consumer_close(tmq);
if (err) { if (err) {
printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
exit(-1); exit(-1);
} }
printf("{consume success: %d, %d}", totalMsgs, totalRows); printf("{consume success: %d, %d}", totalMsgs, totalRows);
} }
void parallel_consume(tmq_t* tmq) { void parallel_consume(tmq_t* tmq) {
tmq_resp_err_t err; tmq_resp_err_t err;
...@@ -277,26 +269,26 @@ void parallel_consume(tmq_t* tmq) { ...@@ -277,26 +269,26 @@ void parallel_consume(tmq_t* tmq) {
int32_t totalRows = 0; int32_t totalRows = 0;
int32_t skipLogNum = 0; int32_t skipLogNum = 0;
while (running) { while (running) {
tmq_message_t* tmqMsg = tmq_consumer_poll(tmq, g_stConfInfo.consumeDelay * 1000); TAOS_RES* tmqMsg = tmq_consumer_poll(tmq, g_stConfInfo.consumeDelay * 1000);
if (tmqMsg) { if (tmqMsg) {
totalMsgs++; totalMsgs++;
#if 0 #if 0
TAOS_ROW row; TAOS_ROW row;
while (NULL != (row = tmq_get_row(tmqMsg))) { while (NULL != (row = tmq_get_row(tmqMsg))) {
totalRows++; totalRows++;
} }
#endif #endif
skipLogNum += tmqGetSkipLogNum(tmqMsg); /*skipLogNum += tmqGetSkipLogNum(tmqMsg);*/
if (0 != g_stConfInfo.showMsgFlag) { if (0 != g_stConfInfo.showMsgFlag) {
msg_process(tmqMsg); /*msg_process(tmqMsg);*/
} }
tmq_message_destroy(tmqMsg); tmq_message_destroy(tmqMsg);
if (totalMsgs >= g_stConfInfo.consumeMsgCnt) { if (totalMsgs >= g_stConfInfo.consumeMsgCnt) {
break; break;
} }
} else { } else {
break; break;
} }
...@@ -305,22 +297,22 @@ void parallel_consume(tmq_t* tmq) { ...@@ -305,22 +297,22 @@ void parallel_consume(tmq_t* tmq) {
err = tmq_consumer_close(tmq); err = tmq_consumer_close(tmq);
if (err) { if (err) {
printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
exit(-1); exit(-1);
} }
printf("%d", totalMsgs); // output to sim for check result printf("%d", totalMsgs); // output to sim for check result
} }
int main(int32_t argc, char *argv[]) { int main(int32_t argc, char* argv[]) {
parseArgument(argc, argv); parseArgument(argc, argv);
parseInputString(); parseInputString();
tmq_t* tmq = build_consumer(); tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = build_topic_list(); tmq_list_t* topic_list = build_topic_list();
if ((NULL == tmq) || (NULL == topic_list)){ if ((NULL == tmq) || (NULL == topic_list)) {
return -1; return -1;
} }
tmq_resp_err_t err = tmq_subscribe(tmq, topic_list); tmq_resp_err_t err = tmq_subscribe(tmq, topic_list);
if (err) { if (err) {
printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册