subscribe.c 8.0 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6
// sample code for TDengine subscribe/consume API
// to compile: gcc -o subscribe subscribe.c -ltaos

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
7
#include <taos.h>  // include TDengine header file
weixin_48148422's avatar
weixin_48148422 已提交
8
#include <unistd.h>
9

B
Bomin Zhang 已提交
10 11
int nTotalRows;

12
void print_result(TAOS_RES* res, int blockFetch) {
weixin_48148422's avatar
weixin_48148422 已提交
13
  TAOS_ROW    row = NULL;
14 15
  int         num_fields = taos_num_fields(res);
  TAOS_FIELD* fields = taos_fetch_fields(res);
weixin_48148422's avatar
weixin_48148422 已提交
16
  int         nRows = 0;
Z
zhaoyanggh 已提交
17

18
  if (blockFetch) {
weixin_48148422's avatar
weixin_48148422 已提交
19
    nRows = taos_fetch_block(res, &row);
Z
zhaoyanggh 已提交
20
    // for (int i = 0; i < nRows; i++) {
21 22 23
    //  taos_print_row(buf, row + i, fields, num_fields);
    //  puts(buf);
    //}
24 25
  } else {
    while ((row = taos_fetch_row(res))) {
J
jiaoqiyuan 已提交
26
      char buf[4096] = {0};
B
Bomin Zhang 已提交
27 28
      taos_print_row(buf, row, fields, num_fields);
      puts(buf);
weixin_48148422's avatar
weixin_48148422 已提交
29
      nRows++;
30
    }
31
  }
weixin_48148422's avatar
weixin_48148422 已提交
32

B
Bomin Zhang 已提交
33
  nTotalRows += nRows;
weixin_48148422's avatar
weixin_48148422 已提交
34
  printf("%d rows consumed.\n", nRows);
35 36
}

Z
zhaoyanggh 已提交
37
void subscribe_callback(TAOS_SUB* tsub, TAOS_RES* res, void* param, int code) { print_result(res, *(int*)param); }
38

weixin_48148422's avatar
weixin_48148422 已提交
39
void check_row_count(int line, TAOS_RES* res, int expected) {
Z
zhaoyanggh 已提交
40 41
  int      actual = 0;
  TAOS_ROW row;
weixin_48148422's avatar
weixin_48148422 已提交
42 43 44 45 46 47 48 49 50 51
  while ((row = taos_fetch_row(res))) {
    actual++;
  }
  if (actual != expected) {
    printf("line %d: row count mismatch, expected: %d, actual: %d\n", line, expected, actual);
  } else {
    printf("line %d: %d rows consumed as expected\n", line, actual);
  }
}

B
Bomin Zhang 已提交
52
void do_query(TAOS* taos, const char* sql) {
B
Bomin Zhang 已提交
53
  TAOS_RES* res = taos_query(taos, sql);
B
Bomin Zhang 已提交
54 55 56
  taos_free_result(res);
}

weixin_48148422's avatar
weixin_48148422 已提交
57
void run_test(TAOS* taos) {
B
Bomin Zhang 已提交
58
  do_query(taos, "drop database if exists test;");
Z
zhaoyanggh 已提交
59

weixin_48148422's avatar
weixin_48148422 已提交
60
  usleep(100000);
B
Bomin Zhang 已提交
61
  do_query(taos, "create database test;");
weixin_48148422's avatar
weixin_48148422 已提交
62
  usleep(100000);
B
Bomin Zhang 已提交
63
  do_query(taos, "use test;");
weixin_48148422's avatar
weixin_48148422 已提交
64

weixin_48148422's avatar
weixin_48148422 已提交
65
  usleep(100000);
B
Bomin Zhang 已提交
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
  do_query(taos, "create table meters(ts timestamp, a int) tags(area int);");

  do_query(taos, "create table t0 using meters tags(0);");
  do_query(taos, "create table t1 using meters tags(1);");
  do_query(taos, "create table t2 using meters tags(2);");
  do_query(taos, "create table t3 using meters tags(3);");
  do_query(taos, "create table t4 using meters tags(4);");
  do_query(taos, "create table t5 using meters tags(5);");
  do_query(taos, "create table t6 using meters tags(6);");
  do_query(taos, "create table t7 using meters tags(7);");
  do_query(taos, "create table t8 using meters tags(8);");
  do_query(taos, "create table t9 using meters tags(9);");

  do_query(taos, "insert into t0 values('2020-01-01 00:00:00.000', 0);");
  do_query(taos, "insert into t0 values('2020-01-01 00:01:00.000', 0);");
  do_query(taos, "insert into t0 values('2020-01-01 00:02:00.000', 0);");
  do_query(taos, "insert into t1 values('2020-01-01 00:00:00.000', 0);");
  do_query(taos, "insert into t1 values('2020-01-01 00:01:00.000', 0);");
  do_query(taos, "insert into t1 values('2020-01-01 00:02:00.000', 0);");
  do_query(taos, "insert into t1 values('2020-01-01 00:03:00.000', 0);");
  do_query(taos, "insert into t2 values('2020-01-01 00:00:00.000', 0);");
  do_query(taos, "insert into t2 values('2020-01-01 00:01:00.000', 0);");
  do_query(taos, "insert into t2 values('2020-01-01 00:01:01.000', 0);");
  do_query(taos, "insert into t2 values('2020-01-01 00:01:02.000', 0);");
  do_query(taos, "insert into t3 values('2020-01-01 00:01:02.000', 0);");
  do_query(taos, "insert into t4 values('2020-01-01 00:01:02.000', 0);");
  do_query(taos, "insert into t5 values('2020-01-01 00:01:02.000', 0);");
  do_query(taos, "insert into t6 values('2020-01-01 00:01:02.000', 0);");
  do_query(taos, "insert into t7 values('2020-01-01 00:01:02.000', 0);");
  do_query(taos, "insert into t8 values('2020-01-01 00:01:02.000', 0);");
  do_query(taos, "insert into t9 values('2020-01-01 00:01:02.000', 0);");
weixin_48148422's avatar
weixin_48148422 已提交
97 98

  // super tables subscription
weixin_48148422's avatar
weixin_48148422 已提交
99
  usleep(1000000);
weixin_48148422's avatar
weixin_48148422 已提交
100 101 102

  TAOS_SUB* tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
  TAOS_RES* res = taos_consume(tsub);
103
  check_row_count(__LINE__, res, 18);
weixin_48148422's avatar
weixin_48148422 已提交
104 105 106 107

  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 0);

B
Bomin Zhang 已提交
108 109
  do_query(taos, "insert into t0 values('2020-01-01 00:02:00.001', 0);");
  do_query(taos, "insert into t8 values('2020-01-01 00:01:03.000', 0);");
weixin_48148422's avatar
weixin_48148422 已提交
110 111 112
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 2);

B
Bomin Zhang 已提交
113 114
  do_query(taos, "insert into t2 values('2020-01-01 00:01:02.001', 0);");
  do_query(taos, "insert into t1 values('2020-01-01 00:03:00.001', 0);");
115 116 117
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 2);

B
Bomin Zhang 已提交
118
  do_query(taos, "insert into t1 values('2020-01-01 00:03:00.002', 0);");
weixin_48148422's avatar
weixin_48148422 已提交
119 120 121
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 1);

122
  // keep progress information and restart subscription
weixin_48148422's avatar
weixin_48148422 已提交
123
  taos_unsubscribe(tsub, 1);
B
Bomin Zhang 已提交
124
  do_query(taos, "insert into t0 values('2020-01-01 00:04:00.000', 0);");
weixin_48148422's avatar
weixin_48148422 已提交
125 126
  tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0);
  res = taos_consume(tsub);
127
  check_row_count(__LINE__, res, 24);
128 129 130 131 132 133

  // keep progress information and continue previous subscription
  taos_unsubscribe(tsub, 1);
  tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 0);
weixin_48148422's avatar
weixin_48148422 已提交
134 135 136 137 138

  // don't keep progress information and continue previous subscription
  taos_unsubscribe(tsub, 0);
  tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
  res = taos_consume(tsub);
139
  check_row_count(__LINE__, res, 24);
weixin_48148422's avatar
weixin_48148422 已提交
140 141 142 143 144 145

  // single meter subscription

  taos_unsubscribe(tsub, 0);
  tsub = taos_subscribe(taos, 0, "test", "select * from t0;", NULL, NULL, 0);
  res = taos_consume(tsub);
146
  check_row_count(__LINE__, res, 5);
weixin_48148422's avatar
weixin_48148422 已提交
147 148 149 150

  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 0);

B
Bomin Zhang 已提交
151
  do_query(taos, "insert into t0 values('2020-01-01 00:04:00.001', 0);");
weixin_48148422's avatar
weixin_48148422 已提交
152 153 154 155 156 157
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 1);

  taos_unsubscribe(tsub, 0);
}

Z
zhaoyanggh 已提交
158
int main(int argc, char* argv[]) {
159 160 161
  const char* host = "127.0.0.1";
  const char* user = "root";
  const char* passwd = "taosdata";
162
  const char* sql = "select * from meters;";
weixin_48148422's avatar
weixin_48148422 已提交
163
  const char* topic = "test-multiple";
Z
zhaoyanggh 已提交
164
  int         async = 1, restart = 0, keep = 1, test = 0, blockFetch = 0;
165 166 167 168 169 170 171 172 173 174 175 176 177 178

  for (int i = 1; i < argc; i++) {
    if (strncmp(argv[i], "-h=", 3) == 0) {
      host = argv[i] + 3;
      continue;
    }
    if (strncmp(argv[i], "-u=", 3) == 0) {
      user = argv[i] + 3;
      continue;
    }
    if (strncmp(argv[i], "-p=", 3) == 0) {
      passwd = argv[i] + 3;
      continue;
    }
179 180 181 182 183 184
    if (strcmp(argv[i], "-sync") == 0) {
      async = 0;
      continue;
    }
    if (strcmp(argv[i], "-restart") == 0) {
      restart = 1;
185 186
      continue;
    }
187 188
    if (strcmp(argv[i], "-single") == 0) {
      sql = "select * from t0;";
weixin_48148422's avatar
weixin_48148422 已提交
189 190 191 192 193
      topic = "test-single";
      continue;
    }
    if (strcmp(argv[i], "-nokeep") == 0) {
      keep = 0;
194 195
      continue;
    }
196 197 198 199 200
    if (strncmp(argv[i], "-sql=", 5) == 0) {
      sql = argv[i] + 5;
      topic = "test-custom";
      continue;
    }
weixin_48148422's avatar
weixin_48148422 已提交
201 202 203 204
    if (strcmp(argv[i], "-test") == 0) {
      test = 1;
      continue;
    }
205 206 207 208
    if (strcmp(argv[i], "-block-fetch") == 0) {
      blockFetch = 1;
      continue;
    }
209 210
  }

weixin_48148422's avatar
weixin_48148422 已提交
211
  TAOS* taos = taos_connect(host, user, passwd, "", 0);
weixin_48148422's avatar
weixin_48148422 已提交
212 213
  if (taos == NULL) {
    printf("failed to connect to db, reason:%s\n", taos_errstr(taos));
H
hzcheng 已提交
214 215 216
    exit(1);
  }

weixin_48148422's avatar
weixin_48148422 已提交
217 218 219 220 221 222
  if (test) {
    run_test(taos);
    taos_close(taos);
    exit(0);
  }

B
Bomin Zhang 已提交
223
  taos_select_db(taos, "test");
224
  TAOS_SUB* tsub = NULL;
225
  if (async) {
226 227
    // create an asynchronized subscription, the callback function will be called every 1s
    tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, &blockFetch, 1000);
228
  } else {
229
    // create an synchronized subscription, need to call 'taos_consume' manually
weixin_48148422's avatar
weixin_48148422 已提交
230
    tsub = taos_subscribe(taos, restart, topic, sql, NULL, NULL, 0);
231 232 233
  }

  if (tsub == NULL) {
weixin_48148422's avatar
weixin_48148422 已提交
234 235
    printf("failed to create subscription.\n");
    exit(0);
Z
zhaoyanggh 已提交
236
  }
H
hzcheng 已提交
237

238 239
  if (async) {
    getchar();
Z
zhaoyanggh 已提交
240 241 242 243 244 245 246 247 248 249
  } else
    while (1) {
      TAOS_RES* res = taos_consume(tsub);
      if (res == NULL) {
        printf("failed to consume data.");
        break;
      } else {
        print_result(res, blockFetch);
        getchar();
      }
250
    }
H
hzcheng 已提交
251

B
Bomin Zhang 已提交
252
  printf("total rows consumed: %d\n", nTotalRows);
weixin_48148422's avatar
weixin_48148422 已提交
253
  taos_unsubscribe(tsub, keep);
weixin_48148422's avatar
weixin_48148422 已提交
254
  taos_close(taos);
H
hzcheng 已提交
255 256 257

  return 0;
}