subscribe.c 8.0 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6
// sample code for TDengine subscribe/consume API
// to compile: gcc -o subscribe subscribe.c -ltaos

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
7
#include <taos.h>  // include TDengine header file
weixin_48148422's avatar
weixin_48148422 已提交
8
#include <unistd.h>
9

B
Bomin Zhang 已提交
10 11
int nTotalRows;

12
void print_result(TAOS_RES* res, int blockFetch) {
weixin_48148422's avatar
weixin_48148422 已提交
13
  TAOS_ROW    row = NULL;
14 15
  int         num_fields = taos_num_fields(res);
  TAOS_FIELD* fields = taos_fetch_fields(res);
weixin_48148422's avatar
weixin_48148422 已提交
16
  int         nRows = 0;
B
Bomin Zhang 已提交
17
  char        buf[4096];
18

B
Bomin Zhang 已提交
19
  
20
  if (blockFetch) {
weixin_48148422's avatar
weixin_48148422 已提交
21
    nRows = taos_fetch_block(res, &row);
22 23 24 25
    //for (int i = 0; i < nRows; i++) {
    //  taos_print_row(buf, row + i, fields, num_fields);
    //  puts(buf);
    //}
26 27
  } else {
    while ((row = taos_fetch_row(res))) {
B
Bomin Zhang 已提交
28 29
      taos_print_row(buf, row, fields, num_fields);
      puts(buf);
weixin_48148422's avatar
weixin_48148422 已提交
30
      nRows++;
31
    }
32
  }
weixin_48148422's avatar
weixin_48148422 已提交
33

B
Bomin Zhang 已提交
34
  nTotalRows += nRows;
weixin_48148422's avatar
weixin_48148422 已提交
35
  printf("%d rows consumed.\n", nRows);
36 37
}

38

39
void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
40
  print_result(res, *(int*)param);
41 42 43
}


weixin_48148422's avatar
weixin_48148422 已提交
44 45 46 47 48 49 50 51 52 53 54 55 56
void check_row_count(int line, TAOS_RES* res, int expected) {
  int actual = 0;
  TAOS_ROW    row;
  while ((row = taos_fetch_row(res))) {
    actual++;
  }
  if (actual != expected) {
    printf("line %d: row count mismatch, expected: %d, actual: %d\n", line, expected, actual);
  } else {
    printf("line %d: %d rows consumed as expected\n", line, actual);
  }
}

57

B
Bomin Zhang 已提交
58
void do_query(TAOS* taos, const char* sql) {
B
Bomin Zhang 已提交
59
  TAOS_RES* res = taos_query(taos, sql);
B
Bomin Zhang 已提交
60 61 62 63
  taos_free_result(res);
}


weixin_48148422's avatar
weixin_48148422 已提交
64
void run_test(TAOS* taos) {
B
Bomin Zhang 已提交
65
  do_query(taos, "drop database if exists test;");
weixin_48148422's avatar
weixin_48148422 已提交
66 67
  
  usleep(100000);
B
Bomin Zhang 已提交
68
  do_query(taos, "create database test;");
weixin_48148422's avatar
weixin_48148422 已提交
69
  usleep(100000);
B
Bomin Zhang 已提交
70
  do_query(taos, "use test;");
weixin_48148422's avatar
weixin_48148422 已提交
71

weixin_48148422's avatar
weixin_48148422 已提交
72
  usleep(100000);
B
Bomin Zhang 已提交
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
  do_query(taos, "create table meters(ts timestamp, a int) tags(area int);");

  do_query(taos, "create table t0 using meters tags(0);");
  do_query(taos, "create table t1 using meters tags(1);");
  do_query(taos, "create table t2 using meters tags(2);");
  do_query(taos, "create table t3 using meters tags(3);");
  do_query(taos, "create table t4 using meters tags(4);");
  do_query(taos, "create table t5 using meters tags(5);");
  do_query(taos, "create table t6 using meters tags(6);");
  do_query(taos, "create table t7 using meters tags(7);");
  do_query(taos, "create table t8 using meters tags(8);");
  do_query(taos, "create table t9 using meters tags(9);");

  do_query(taos, "insert into t0 values('2020-01-01 00:00:00.000', 0);");
  do_query(taos, "insert into t0 values('2020-01-01 00:01:00.000', 0);");
  do_query(taos, "insert into t0 values('2020-01-01 00:02:00.000', 0);");
  do_query(taos, "insert into t1 values('2020-01-01 00:00:00.000', 0);");
  do_query(taos, "insert into t1 values('2020-01-01 00:01:00.000', 0);");
  do_query(taos, "insert into t1 values('2020-01-01 00:02:00.000', 0);");
  do_query(taos, "insert into t1 values('2020-01-01 00:03:00.000', 0);");
  do_query(taos, "insert into t2 values('2020-01-01 00:00:00.000', 0);");
  do_query(taos, "insert into t2 values('2020-01-01 00:01:00.000', 0);");
  do_query(taos, "insert into t2 values('2020-01-01 00:01:01.000', 0);");
  do_query(taos, "insert into t2 values('2020-01-01 00:01:02.000', 0);");
  do_query(taos, "insert into t3 values('2020-01-01 00:01:02.000', 0);");
  do_query(taos, "insert into t4 values('2020-01-01 00:01:02.000', 0);");
  do_query(taos, "insert into t5 values('2020-01-01 00:01:02.000', 0);");
  do_query(taos, "insert into t6 values('2020-01-01 00:01:02.000', 0);");
  do_query(taos, "insert into t7 values('2020-01-01 00:01:02.000', 0);");
  do_query(taos, "insert into t8 values('2020-01-01 00:01:02.000', 0);");
  do_query(taos, "insert into t9 values('2020-01-01 00:01:02.000', 0);");
weixin_48148422's avatar
weixin_48148422 已提交
104 105

  // super tables subscription
weixin_48148422's avatar
weixin_48148422 已提交
106
  usleep(1000000);
weixin_48148422's avatar
weixin_48148422 已提交
107 108 109

  TAOS_SUB* tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
  TAOS_RES* res = taos_consume(tsub);
110
  check_row_count(__LINE__, res, 18);
weixin_48148422's avatar
weixin_48148422 已提交
111 112 113 114

  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 0);

B
Bomin Zhang 已提交
115 116
  do_query(taos, "insert into t0 values('2020-01-01 00:02:00.001', 0);");
  do_query(taos, "insert into t8 values('2020-01-01 00:01:03.000', 0);");
weixin_48148422's avatar
weixin_48148422 已提交
117 118 119
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 2);

B
Bomin Zhang 已提交
120 121
  do_query(taos, "insert into t2 values('2020-01-01 00:01:02.001', 0);");
  do_query(taos, "insert into t1 values('2020-01-01 00:03:00.001', 0);");
122 123 124
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 2);

B
Bomin Zhang 已提交
125
  do_query(taos, "insert into t1 values('2020-01-01 00:03:00.002', 0);");
weixin_48148422's avatar
weixin_48148422 已提交
126 127 128
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 1);

129
  // keep progress information and restart subscription
weixin_48148422's avatar
weixin_48148422 已提交
130
  taos_unsubscribe(tsub, 1);
B
Bomin Zhang 已提交
131
  do_query(taos, "insert into t0 values('2020-01-01 00:04:00.000', 0);");
weixin_48148422's avatar
weixin_48148422 已提交
132 133
  tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0);
  res = taos_consume(tsub);
134
  check_row_count(__LINE__, res, 24);
135 136 137 138 139 140

  // keep progress information and continue previous subscription
  taos_unsubscribe(tsub, 1);
  tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 0);
weixin_48148422's avatar
weixin_48148422 已提交
141 142 143 144 145

  // don't keep progress information and continue previous subscription
  taos_unsubscribe(tsub, 0);
  tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
  res = taos_consume(tsub);
146
  check_row_count(__LINE__, res, 24);
weixin_48148422's avatar
weixin_48148422 已提交
147 148 149 150 151 152

  // single meter subscription

  taos_unsubscribe(tsub, 0);
  tsub = taos_subscribe(taos, 0, "test", "select * from t0;", NULL, NULL, 0);
  res = taos_consume(tsub);
153
  check_row_count(__LINE__, res, 5);
weixin_48148422's avatar
weixin_48148422 已提交
154 155 156 157

  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 0);

B
Bomin Zhang 已提交
158
  do_query(taos, "insert into t0 values('2020-01-01 00:04:00.001', 0);");
weixin_48148422's avatar
weixin_48148422 已提交
159 160 161 162 163 164 165
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 1);

  taos_unsubscribe(tsub, 0);
}


weixin_48148422's avatar
weixin_48148422 已提交
166
int main(int argc, char *argv[]) {
167 168 169
  const char* host = "127.0.0.1";
  const char* user = "root";
  const char* passwd = "taosdata";
170
  const char* sql = "select * from meters;";
weixin_48148422's avatar
weixin_48148422 已提交
171
  const char* topic = "test-multiple";
172
  int async = 1, restart = 0, keep = 1, test = 0, blockFetch = 0;
173 174 175 176 177 178 179 180 181 182 183 184 185 186

  for (int i = 1; i < argc; i++) {
    if (strncmp(argv[i], "-h=", 3) == 0) {
      host = argv[i] + 3;
      continue;
    }
    if (strncmp(argv[i], "-u=", 3) == 0) {
      user = argv[i] + 3;
      continue;
    }
    if (strncmp(argv[i], "-p=", 3) == 0) {
      passwd = argv[i] + 3;
      continue;
    }
187 188 189 190 191 192
    if (strcmp(argv[i], "-sync") == 0) {
      async = 0;
      continue;
    }
    if (strcmp(argv[i], "-restart") == 0) {
      restart = 1;
193 194
      continue;
    }
195 196
    if (strcmp(argv[i], "-single") == 0) {
      sql = "select * from t0;";
weixin_48148422's avatar
weixin_48148422 已提交
197 198 199 200 201
      topic = "test-single";
      continue;
    }
    if (strcmp(argv[i], "-nokeep") == 0) {
      keep = 0;
202 203
      continue;
    }
204 205 206 207 208
    if (strncmp(argv[i], "-sql=", 5) == 0) {
      sql = argv[i] + 5;
      topic = "test-custom";
      continue;
    }
weixin_48148422's avatar
weixin_48148422 已提交
209 210 211 212
    if (strcmp(argv[i], "-test") == 0) {
      test = 1;
      continue;
    }
213 214 215 216
    if (strcmp(argv[i], "-block-fetch") == 0) {
      blockFetch = 1;
      continue;
    }
217 218
  }

weixin_48148422's avatar
weixin_48148422 已提交
219 220
  // init TAOS
  taos_init();
H
hzcheng 已提交
221

weixin_48148422's avatar
weixin_48148422 已提交
222
  TAOS* taos = taos_connect(host, user, passwd, "", 0);
weixin_48148422's avatar
weixin_48148422 已提交
223 224
  if (taos == NULL) {
    printf("failed to connect to db, reason:%s\n", taos_errstr(taos));
H
hzcheng 已提交
225 226 227
    exit(1);
  }

weixin_48148422's avatar
weixin_48148422 已提交
228 229 230 231 232 233
  if (test) {
    run_test(taos);
    taos_close(taos);
    exit(0);
  }

B
Bomin Zhang 已提交
234
  taos_select_db(taos, "test");
235
  TAOS_SUB* tsub = NULL;
236
  if (async) {
237 238
    // create an asynchronized subscription, the callback function will be called every 1s
    tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, &blockFetch, 1000);
239
  } else {
240
    // create an synchronized subscription, need to call 'taos_consume' manually
weixin_48148422's avatar
weixin_48148422 已提交
241
    tsub = taos_subscribe(taos, restart, topic, sql, NULL, NULL, 0);
242 243 244
  }

  if (tsub == NULL) {
weixin_48148422's avatar
weixin_48148422 已提交
245 246 247
    printf("failed to create subscription.\n");
    exit(0);
  } 
H
hzcheng 已提交
248

249 250 251
  if (async) {
    getchar();
  } else while(1) {
weixin_48148422's avatar
weixin_48148422 已提交
252
    TAOS_RES* res = taos_consume(tsub);
253 254 255 256 257 258 259
    if (res == NULL) {
      printf("failed to consume data.");
      break;
    } else {
      print_result(res, blockFetch);
      getchar();
    }
H
hzcheng 已提交
260 261
  }

B
Bomin Zhang 已提交
262
  printf("total rows consumed: %d\n", nTotalRows);
weixin_48148422's avatar
weixin_48148422 已提交
263
  taos_unsubscribe(tsub, keep);
weixin_48148422's avatar
weixin_48148422 已提交
264
  taos_close(taos);
H
hzcheng 已提交
265 266 267

  return 0;
}