From 684558f9d991369093776f9eddaafb76600aef72 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 15 Feb 2022 15:33:36 +0800 Subject: [PATCH] add perf test --- example/src/tmq.c | 38 ++++++++- include/client/taos.h | 175 ++++++++++++++++++++-------------------- source/client/src/tmq.c | 6 ++ 3 files changed, 131 insertions(+), 88 deletions(-) diff --git a/example/src/tmq.c b/example/src/tmq.c index bad789d0e1..5858282aab 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -63,7 +63,7 @@ int32_t init_env() { } taos_free_result(pRes); - const char* sql = "select * from tu2"; + const char* sql = "select * from tu1"; pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql)); if (taos_errno(pRes) != 0) { printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes)); @@ -159,11 +159,45 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { fprintf(stderr, "%% Consumer closed\n"); } +void perf_loop(tmq_t* tmq, tmq_list_t* topics) { + tmq_resp_err_t err; + + if ((err = tmq_subscribe(tmq, topics))) { + fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err)); + printf("subscribe err\n"); + return; + } + int32_t batchCnt = 0; + int32_t skipLogNum = 0; + clock_t startTime = clock(); + while (running) { + tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500); + if (tmqmessage) { + batchCnt++; + skipLogNum += tmqGetSkipLogNum(tmqmessage); + /*msg_process(tmqmessage);*/ + tmq_message_destroy(tmqmessage); + } else { + break; + } + } + clock_t endTime = clock(); + printf("log batch cnt: %d, skip log cnt: %d, time used:%f s\n", batchCnt, skipLogNum, + (double)(endTime - startTime) / CLOCKS_PER_SEC); + + err = tmq_consumer_close(tmq); + if (err) + fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err)); + else + fprintf(stderr, "%% Consumer closed\n"); +} + int main() { int code; code = init_env(); tmq_t* tmq = build_consumer(); tmq_list_t* topic_list = build_topic_list(); - basic_consume_loop(tmq, topic_list); + perf_loop(tmq, topic_list); + /*basic_consume_loop(tmq, topic_list);*/ /*sync_consume_loop(tmq, topic_list);*/ } diff --git a/include/client/taos.h b/include/client/taos.h index a8627a43da..9b676d1286 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -16,8 +16,8 @@ #ifndef TDENGINE_TAOS_H #define TDENGINE_TAOS_H -#include #include +#include #ifdef __cplusplus extern "C" { @@ -31,26 +31,26 @@ typedef void TAOS_SUB; typedef void **TAOS_ROW; // Data type definition -#define TSDB_DATA_TYPE_NULL 0 // 1 bytes -#define TSDB_DATA_TYPE_BOOL 1 // 1 bytes -#define TSDB_DATA_TYPE_TINYINT 2 // 1 byte -#define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes -#define TSDB_DATA_TYPE_INT 4 // 4 bytes -#define TSDB_DATA_TYPE_BIGINT 5 // 8 bytes -#define TSDB_DATA_TYPE_FLOAT 6 // 4 bytes -#define TSDB_DATA_TYPE_DOUBLE 7 // 8 bytes -#define TSDB_DATA_TYPE_BINARY 8 // string, alias for varchar -#define TSDB_DATA_TYPE_TIMESTAMP 9 // 8 bytes -#define TSDB_DATA_TYPE_NCHAR 10 // unicode string -#define TSDB_DATA_TYPE_UTINYINT 11 // 1 byte -#define TSDB_DATA_TYPE_USMALLINT 12 // 2 bytes -#define TSDB_DATA_TYPE_UINT 13 // 4 bytes -#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes -#define TSDB_DATA_TYPE_VARCHAR 15 // string -#define TSDB_DATA_TYPE_VARBINARY 16 // binary -#define TSDB_DATA_TYPE_JSON 17 // json -#define TSDB_DATA_TYPE_DECIMAL 18 // decimal -#define TSDB_DATA_TYPE_BLOB 19 // binary +#define TSDB_DATA_TYPE_NULL 0 // 1 bytes +#define TSDB_DATA_TYPE_BOOL 1 // 1 bytes +#define TSDB_DATA_TYPE_TINYINT 2 // 1 byte +#define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes +#define TSDB_DATA_TYPE_INT 4 // 4 bytes +#define TSDB_DATA_TYPE_BIGINT 5 // 8 bytes +#define TSDB_DATA_TYPE_FLOAT 6 // 4 bytes +#define TSDB_DATA_TYPE_DOUBLE 7 // 8 bytes +#define TSDB_DATA_TYPE_BINARY 8 // string, alias for varchar +#define TSDB_DATA_TYPE_TIMESTAMP 9 // 8 bytes +#define TSDB_DATA_TYPE_NCHAR 10 // unicode string +#define TSDB_DATA_TYPE_UTINYINT 11 // 1 byte +#define TSDB_DATA_TYPE_USMALLINT 12 // 2 bytes +#define TSDB_DATA_TYPE_UINT 13 // 4 bytes +#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes +#define TSDB_DATA_TYPE_VARCHAR 15 // string +#define TSDB_DATA_TYPE_VARBINARY 16 // binary +#define TSDB_DATA_TYPE_JSON 17 // json +#define TSDB_DATA_TYPE_DECIMAL 18 // decimal +#define TSDB_DATA_TYPE_BLOB 19 // binary typedef enum { TSDB_OPTION_LOCALE, @@ -63,9 +63,9 @@ typedef enum { typedef enum { TSDB_SML_UNKNOWN_PROTOCOL = 0, - TSDB_SML_LINE_PROTOCOL = 1, - TSDB_SML_TELNET_PROTOCOL = 2, - TSDB_SML_JSON_PROTOCOL = 3, + TSDB_SML_LINE_PROTOCOL = 1, + TSDB_SML_TELNET_PROTOCOL = 2, + TSDB_SML_JSON_PROTOCOL = 3, } TSDB_SML_PROTOCOL_TYPE; typedef enum { @@ -79,28 +79,28 @@ typedef enum { } TSDB_SML_TIMESTAMP_TYPE; typedef struct taosField { - char name[65]; - int8_t type; - int32_t bytes; + char name[65]; + int8_t type; + int32_t bytes; } TAOS_FIELD; #ifdef _TD_GO_DLL_ - #define DLL_EXPORT __declspec(dllexport) +#define DLL_EXPORT __declspec(dllexport) #else - #define DLL_EXPORT +#define DLL_EXPORT #endif typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code); typedef struct TAOS_BIND { - int buffer_type; - void * buffer; - uintptr_t buffer_length; // unused - uintptr_t *length; - int * is_null; - - int is_unsigned; // unused - int * error; // unused + int buffer_type; + void *buffer; + uintptr_t buffer_length; // unused + uintptr_t *length; + int *is_null; + + int is_unsigned; // unused + int *error; // unused union { int64_t ts; int8_t b; @@ -113,22 +113,23 @@ typedef struct TAOS_BIND { unsigned char *bin; char *nchar; } u; - unsigned int allocated; + unsigned int allocated; } TAOS_BIND; typedef struct TAOS_MULTI_BIND { - int buffer_type; - void *buffer; - uintptr_t buffer_length; - int32_t *length; - char *is_null; - int num; + int buffer_type; + void *buffer; + uintptr_t buffer_length; + int32_t *length; + char *is_null; + int num; } TAOS_MULTI_BIND; DLL_EXPORT void taos_cleanup(void); DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...); DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port); -DLL_EXPORT TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port); +DLL_EXPORT TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, + const char *db, int dbLen, uint16_t port); DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port); DLL_EXPORT void taos_close(TAOS *taos); @@ -136,62 +137,63 @@ const char *taos_data_type(int type); DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos); DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length); -DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags); -DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name); -DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT* stmt, const char* name); - -DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert); -DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums); -DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes); -DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind); -DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind); -DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int colIdx); -DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt); -DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt); -DLL_EXPORT TAOS_RES * taos_stmt_use_result(TAOS_STMT *stmt); -DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt); -DLL_EXPORT char * taos_stmt_errstr(TAOS_STMT *stmt); +DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags); +DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name); +DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name); + +DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert); +DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums); +DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes); +DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind); +DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind); +DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx); +DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt); +DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt); +DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt); +DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt); +DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt); DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql); DLL_EXPORT TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen); DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res); -DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result -DLL_EXPORT void taos_free_result(TAOS_RES *res); -DLL_EXPORT int taos_field_count(TAOS_RES *res); -DLL_EXPORT int taos_num_fields(TAOS_RES *res); -DLL_EXPORT int taos_affected_rows(TAOS_RES *res); +DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result +DLL_EXPORT void taos_free_result(TAOS_RES *res); +DLL_EXPORT int taos_field_count(TAOS_RES *res); +DLL_EXPORT int taos_num_fields(TAOS_RES *res); +DLL_EXPORT int taos_affected_rows(TAOS_RES *res); DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res); -DLL_EXPORT int taos_select_db(TAOS *taos, const char *db); -DLL_EXPORT int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields); -DLL_EXPORT void taos_stop_query(TAOS_RES *res); -DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col); -DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows); -DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql); +DLL_EXPORT int taos_select_db(TAOS *taos, const char *db); +DLL_EXPORT int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields); +DLL_EXPORT void taos_stop_query(TAOS_RES *res); +DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col); +DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows); +DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql); -DLL_EXPORT int* taos_fetch_lengths(TAOS_RES *res); +DLL_EXPORT int *taos_fetch_lengths(TAOS_RES *res); DLL_EXPORT const char *taos_get_server_info(TAOS *taos); DLL_EXPORT const char *taos_get_client_info(); DLL_EXPORT const char *taos_errstr(TAOS_RES *tres); -DLL_EXPORT int taos_errno(TAOS_RES *tres); +DLL_EXPORT int taos_errno(TAOS_RES *tres); DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param); DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param); -typedef void (*__taos_sub_fn_t)(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code); -DLL_EXPORT TAOS_SUB *taos_subscribe(TAOS* taos, int restart, const char* topic, const char *sql, __taos_sub_fn_t fp, void *param, int interval); +typedef void (*__taos_sub_fn_t)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code); +DLL_EXPORT TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char *topic, const char *sql, __taos_sub_fn_t fp, + void *param, int interval); DLL_EXPORT TAOS_RES *taos_consume(TAOS_SUB *tsub); DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress); DLL_EXPORT TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sql, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), - int64_t stime, void *param, void (*callback)(void *)); -DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr); + int64_t stime, void *param, void (*callback)(void *)); +DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr); -DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList); -DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision); +DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList); +DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision); /* --------------------------TMQ INTERFACE------------------------------- */ @@ -215,10 +217,10 @@ typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *, v DLL_EXPORT tmq_list_t *tmq_list_new(); DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *); -DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen); -DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen); -DLL_EXPORT void tmq_message_destroy(tmq_message_t* tmq_message); -DLL_EXPORT const char* tmq_err2str(tmq_resp_err_t); +DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen); +DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen); +DLL_EXPORT void tmq_message_destroy(tmq_message_t *tmq_message); +DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t); /* ------------------------TMQ CONSUMER INTERFACE------------------------ */ DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, tmq_list_t *topic_list); @@ -227,7 +229,7 @@ DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq); DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_topic_vgroup_list_t** topics); #endif DLL_EXPORT tmq_message_t *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time); -DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t* tmq); +DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq); #if 0 DLL_EXPORT tmq_resp_err_t tmq_assign(tmq_t* tmq, const tmq_topic_vgroup_list_t* vgroups); DLL_EXPORT tmq_resp_err_t tmq_assignment(tmq_t* tmq, tmq_topic_vgroup_list_t** vgroups); @@ -251,8 +253,9 @@ DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf); DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value); DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb); -//temporary used function for demo only -void tmqShowMsg(tmq_message_t* tmq_message); +// temporary used function for demo only +void tmqShowMsg(tmq_message_t *tmq_message); +int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message); #ifdef __cplusplus } diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 8b1faaef4e..315a632180 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -452,6 +452,12 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { return buf; } +int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) { + if (tmq_message == NULL) return 0; + SMqConsumeRsp* pRsp = (SMqConsumeRsp*)tmq_message; + return pRsp->skipLogNum; +} + void tmqShowMsg(tmq_message_t* tmq_message) { if (tmq_message == NULL) return; -- GitLab