From 612749e85af333cf535f62a02b3341262489c9bc Mon Sep 17 00:00:00 2001 From: tickduan <417921451@qq.com> Date: Fri, 9 Jul 2021 09:44:51 +0800 Subject: [PATCH] subscribe.c is modify by self restore with develop --- tests/examples/c/subscribe.c | 342 ++++++++++++++++++++++++----------- 1 file changed, 232 insertions(+), 110 deletions(-) diff --git a/tests/examples/c/subscribe.c b/tests/examples/c/subscribe.c index e158aa2da9..ad12f0e7a5 100644 --- a/tests/examples/c/subscribe.c +++ b/tests/examples/c/subscribe.c @@ -7,135 +7,257 @@ #include // include TDengine header file #include +int nTotalRows; -void showme(); -float calculate_delta_t(size_t size); -int is_lossless_compressed_data(unsigned char* compressedBytes, size_t cmpSize); - -#include -#include -#include -#include -#include // TAOS header file - -static void queryDB(TAOS *taos, char *command) { - - printf("aaa"); - /* - int i; - TAOS_RES *pSql = NULL; - int32_t code = -1; - - for (i = 0; i < 5; i++) { - if (NULL != pSql) { - taos_free_result(pSql); - pSql = NULL; - } - - pSql = taos_query(taos, command); - code = taos_errno(pSql); - if (0 == code) { - break; +void print_result(TAOS_RES* res, int blockFetch) { + TAOS_ROW row = NULL; + int num_fields = taos_num_fields(res); + TAOS_FIELD* fields = taos_fetch_fields(res); + int nRows = 0; + + if (blockFetch) { + nRows = taos_fetch_block(res, &row); + //for (int i = 0; i < nRows; i++) { + // taos_print_row(buf, row + i, fields, num_fields); + // puts(buf); + //} + } else { + while ((row = taos_fetch_row(res))) { + char buf[4096] = {0}; + taos_print_row(buf, row, fields, num_fields); + puts(buf); + nRows++; } } - if (code != 0) { - fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(pSql)); - taos_free_result(pSql); - taos_close(taos); - exit(EXIT_FAILURE); + nTotalRows += nRows; + printf("%d rows consumed.\n", nRows); +} + + +void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) { + print_result(res, *(int*)param); +} + + +void check_row_count(int line, TAOS_RES* res, int expected) { + int actual = 0; + TAOS_ROW row; + while ((row = taos_fetch_row(res))) { + actual++; } + if (actual != expected) { + printf("line %d: row count mismatch, expected: %d, actual: %d\n", line, expected, actual); + } else { + printf("line %d: %d rows consumed as expected\n", line, actual); + } +} + + +void do_query(TAOS* taos, const char* sql) { + TAOS_RES* res = taos_query(taos, sql); + taos_free_result(res); +} + + +void run_test(TAOS* taos) { + do_query(taos, "drop database if exists test;"); + + usleep(100000); + do_query(taos, "create database test;"); + usleep(100000); + do_query(taos, "use test;"); + + usleep(100000); + do_query(taos, "create table meters(ts timestamp, a int) tags(area int);"); + + do_query(taos, "create table t0 using meters tags(0);"); + do_query(taos, "create table t1 using meters tags(1);"); + do_query(taos, "create table t2 using meters tags(2);"); + do_query(taos, "create table t3 using meters tags(3);"); + do_query(taos, "create table t4 using meters tags(4);"); + do_query(taos, "create table t5 using meters tags(5);"); + do_query(taos, "create table t6 using meters tags(6);"); + do_query(taos, "create table t7 using meters tags(7);"); + do_query(taos, "create table t8 using meters tags(8);"); + do_query(taos, "create table t9 using meters tags(9);"); + + do_query(taos, "insert into t0 values('2020-01-01 00:00:00.000', 0);"); + do_query(taos, "insert into t0 values('2020-01-01 00:01:00.000', 0);"); + do_query(taos, "insert into t0 values('2020-01-01 00:02:00.000', 0);"); + do_query(taos, "insert into t1 values('2020-01-01 00:00:00.000', 0);"); + do_query(taos, "insert into t1 values('2020-01-01 00:01:00.000', 0);"); + do_query(taos, "insert into t1 values('2020-01-01 00:02:00.000', 0);"); + do_query(taos, "insert into t1 values('2020-01-01 00:03:00.000', 0);"); + do_query(taos, "insert into t2 values('2020-01-01 00:00:00.000', 0);"); + do_query(taos, "insert into t2 values('2020-01-01 00:01:00.000', 0);"); + do_query(taos, "insert into t2 values('2020-01-01 00:01:01.000', 0);"); + do_query(taos, "insert into t2 values('2020-01-01 00:01:02.000', 0);"); + do_query(taos, "insert into t3 values('2020-01-01 00:01:02.000', 0);"); + do_query(taos, "insert into t4 values('2020-01-01 00:01:02.000', 0);"); + do_query(taos, "insert into t5 values('2020-01-01 00:01:02.000', 0);"); + do_query(taos, "insert into t6 values('2020-01-01 00:01:02.000', 0);"); + do_query(taos, "insert into t7 values('2020-01-01 00:01:02.000', 0);"); + do_query(taos, "insert into t8 values('2020-01-01 00:01:02.000', 0);"); + do_query(taos, "insert into t9 values('2020-01-01 00:01:02.000', 0);"); + + // super tables subscription + usleep(1000000); + + TAOS_SUB* tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0); + TAOS_RES* res = taos_consume(tsub); + check_row_count(__LINE__, res, 18); + + res = taos_consume(tsub); + check_row_count(__LINE__, res, 0); + + do_query(taos, "insert into t0 values('2020-01-01 00:02:00.001', 0);"); + do_query(taos, "insert into t8 values('2020-01-01 00:01:03.000', 0);"); + res = taos_consume(tsub); + check_row_count(__LINE__, res, 2); + + do_query(taos, "insert into t2 values('2020-01-01 00:01:02.001', 0);"); + do_query(taos, "insert into t1 values('2020-01-01 00:03:00.001', 0);"); + res = taos_consume(tsub); + check_row_count(__LINE__, res, 2); + + do_query(taos, "insert into t1 values('2020-01-01 00:03:00.002', 0);"); + res = taos_consume(tsub); + check_row_count(__LINE__, res, 1); + + // keep progress information and restart subscription + taos_unsubscribe(tsub, 1); + do_query(taos, "insert into t0 values('2020-01-01 00:04:00.000', 0);"); + tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0); + res = taos_consume(tsub); + check_row_count(__LINE__, res, 24); - taos_free_result(pSql); - */ + // keep progress information and continue previous subscription + taos_unsubscribe(tsub, 1); + tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0); + res = taos_consume(tsub); + check_row_count(__LINE__, res, 0); + + // don't keep progress information and continue previous subscription + taos_unsubscribe(tsub, 0); + tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0); + res = taos_consume(tsub); + check_row_count(__LINE__, res, 24); + + // single meter subscription + + taos_unsubscribe(tsub, 0); + tsub = taos_subscribe(taos, 0, "test", "select * from t0;", NULL, NULL, 0); + res = taos_consume(tsub); + check_row_count(__LINE__, res, 5); + + res = taos_consume(tsub); + check_row_count(__LINE__, res, 0); + + do_query(taos, "insert into t0 values('2020-01-01 00:04:00.001', 0);"); + res = taos_consume(tsub); + check_row_count(__LINE__, res, 1); + + taos_unsubscribe(tsub, 0); } -void Test(TAOS *taos, char *qstr, int i); int main(int argc, char *argv[]) { - //char qstr[1024]; - - is_lossless_compressed_data(NULL,0); - - // connect to server - if (argc < 2) { - printf("please input server-ip \n"); - return 0; + const char* host = "127.0.0.1"; + const char* user = "root"; + const char* passwd = "taosdata"; + const char* sql = "select * from meters;"; + const char* topic = "test-multiple"; + int async = 1, restart = 0, keep = 1, test = 0, blockFetch = 0; + + for (int i = 1; i < argc; i++) { + if (strncmp(argv[i], "-h=", 3) == 0) { + host = argv[i] + 3; + continue; + } + if (strncmp(argv[i], "-u=", 3) == 0) { + user = argv[i] + 3; + continue; + } + if (strncmp(argv[i], "-p=", 3) == 0) { + passwd = argv[i] + 3; + continue; + } + if (strcmp(argv[i], "-sync") == 0) { + async = 0; + continue; + } + if (strcmp(argv[i], "-restart") == 0) { + restart = 1; + continue; + } + if (strcmp(argv[i], "-single") == 0) { + sql = "select * from t0;"; + topic = "test-single"; + continue; + } + if (strcmp(argv[i], "-nokeep") == 0) { + keep = 0; + continue; + } + if (strncmp(argv[i], "-sql=", 5) == 0) { + sql = argv[i] + 5; + topic = "test-custom"; + continue; + } + if (strcmp(argv[i], "-test") == 0) { + test = 1; + continue; + } + if (strcmp(argv[i], "-block-fetch") == 0) { + blockFetch = 1; + continue; + } } - TAOS *taos = taos_connect(argv[1], "root", "taosdata", NULL, 0); + TAOS* taos = taos_connect(host, user, passwd, "", 0); if (taos == NULL) { - printf("failed to connect to server, reason:%s\n", "null taos"/*taos_errstr(taos)*/); + printf("failed to connect to db, reason:%s\n", taos_errstr(taos)); exit(1); } - /* - for (int i = 0; i < 100; i++) { - Test(taos, qstr, i); + + if (test) { + run_test(taos); + taos_close(taos); + exit(0); } - taos_close(taos); - taos_cleanup(); - */ -} -void Test(TAOS *taos, char *qstr, int index) { - - printf("==================test at %d\n================================", index); - - queryDB(taos, "drop database if exists demo"); - queryDB(taos, "create database demo"); - //TAOS_RES *result; - queryDB(taos, "use demo"); - - queryDB(taos, "create table m1 (ts timestamp, ti tinyint, si smallint, i int, bi bigint, f float, d double, b binary(10))"); - printf("success to create table\n"); - - /* - int i = 0; - for (i = 0; i < 10; ++i) { - sprintf(qstr, "insert into m1 values (%" PRId64 ", %d, %d, %d, %d, %f, %lf, '%s')", (uint64_t)(1546300800000 + i * 1000), i, i, i, i*10000000, i*1.0, i*2.0, "hello"); - printf("qstr: %s\n", qstr); - - // note: how do you wanna do if taos_query returns non-NULL - // if (taos_query(taos, qstr)) { - // printf("insert row: %i, reason:%s\n", i, taos_errstr(taos)); - // } - TAOS_RES *result1 = taos_query(taos, qstr); - if (result1 == NULL || taos_errno(result1) != 0) { - printf("failed to insert row, reason:%s\n", taos_errstr(result1)); - taos_free_result(result1); - exit(1); + + taos_select_db(taos, "test"); + TAOS_SUB* tsub = NULL; + if (async) { + // create an asynchronized subscription, the callback function will be called every 1s + tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, &blockFetch, 1000); + } else { + // create an synchronized subscription, need to call 'taos_consume' manually + tsub = taos_subscribe(taos, restart, topic, sql, NULL, NULL, 0); + } + + if (tsub == NULL) { + printf("failed to create subscription.\n"); + exit(0); + } + + if (async) { + getchar(); + } else while(1) { + TAOS_RES* res = taos_consume(tsub); + if (res == NULL) { + printf("failed to consume data."); + break; } else { - printf("insert row: %i\n", i); + print_result(res, blockFetch); + getchar(); } - taos_free_result(result1); - } - printf("success to insert rows, total %d rows\n", i); - - // query the records - sprintf(qstr, "SELECT * FROM m1"); - result = taos_query(taos, qstr); - if (result == NULL || taos_errno(result) != 0) { - printf("failed to select, reason:%s\n", taos_errstr(result)); - taos_free_result(result); - exit(1); } - TAOS_ROW row; - int rows = 0; - int num_fields = taos_field_count(result); - TAOS_FIELD *fields = taos_fetch_fields(result); - - printf("num_fields = %d\n", num_fields); - printf("select * from table, result:\n"); - // fetch the records row by row - while ((row = taos_fetch_row(result))) { - char temp[1024] = {0}; - rows++; - taos_print_row(temp, row, fields, num_fields); - printf("%s\n", temp); - } + printf("total rows consumed: %d\n", nTotalRows); + taos_unsubscribe(tsub, keep); + taos_close(taos); - taos_free_result(result); - printf("====demo end====\n\n"); - */ + return 0; } - -- GitLab