subscribe.c 8.0 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6
// sample code for TDengine subscribe/consume API
// to compile: gcc -o subscribe subscribe.c -ltaos

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
7
#include <taos.h>  // include TDengine header file
weixin_48148422's avatar
weixin_48148422 已提交
8
#include <unistd.h>
9

B
Bomin Zhang 已提交
10 11
int nTotalRows;

12
void print_result(TAOS_RES* res, int blockFetch) {
weixin_48148422's avatar
weixin_48148422 已提交
13
  TAOS_ROW    row = NULL;
14 15
  int         num_fields = taos_num_fields(res);
  TAOS_FIELD* fields = taos_fetch_fields(res);
weixin_48148422's avatar
weixin_48148422 已提交
16
  int         nRows = 0;
B
Bomin Zhang 已提交
17
  
18
  if (blockFetch) {
weixin_48148422's avatar
weixin_48148422 已提交
19
    nRows = taos_fetch_block(res, &row);
20 21 22 23
    //for (int i = 0; i < nRows; i++) {
    //  taos_print_row(buf, row + i, fields, num_fields);
    //  puts(buf);
    //}
24 25
  } else {
    while ((row = taos_fetch_row(res))) {
J
jiaoqiyuan 已提交
26
      char buf[4096] = {0};
B
Bomin Zhang 已提交
27 28
      taos_print_row(buf, row, fields, num_fields);
      puts(buf);
weixin_48148422's avatar
weixin_48148422 已提交
29
      nRows++;
30
    }
31
  }
weixin_48148422's avatar
weixin_48148422 已提交
32

B
Bomin Zhang 已提交
33
  nTotalRows += nRows;
weixin_48148422's avatar
weixin_48148422 已提交
34
  printf("%d rows consumed.\n", nRows);
35 36
}

37

38
void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
39
  print_result(res, *(int*)param);
40 41 42
}


weixin_48148422's avatar
weixin_48148422 已提交
43 44 45 46 47 48 49 50 51 52 53 54 55
void check_row_count(int line, TAOS_RES* res, int expected) {
  int actual = 0;
  TAOS_ROW    row;
  while ((row = taos_fetch_row(res))) {
    actual++;
  }
  if (actual != expected) {
    printf("line %d: row count mismatch, expected: %d, actual: %d\n", line, expected, actual);
  } else {
    printf("line %d: %d rows consumed as expected\n", line, actual);
  }
}

56

B
Bomin Zhang 已提交
57
void do_query(TAOS* taos, const char* sql) {
B
Bomin Zhang 已提交
58
  TAOS_RES* res = taos_query(taos, sql);
B
Bomin Zhang 已提交
59 60 61 62
  taos_free_result(res);
}


weixin_48148422's avatar
weixin_48148422 已提交
63
void run_test(TAOS* taos) {
B
Bomin Zhang 已提交
64
  do_query(taos, "drop database if exists test;");
weixin_48148422's avatar
weixin_48148422 已提交
65 66
  
  usleep(100000);
B
Bomin Zhang 已提交
67
  do_query(taos, "create database test;");
weixin_48148422's avatar
weixin_48148422 已提交
68
  usleep(100000);
B
Bomin Zhang 已提交
69
  do_query(taos, "use test;");
weixin_48148422's avatar
weixin_48148422 已提交
70

weixin_48148422's avatar
weixin_48148422 已提交
71
  usleep(100000);
B
Bomin Zhang 已提交
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
  do_query(taos, "create table meters(ts timestamp, a int) tags(area int);");

  do_query(taos, "create table t0 using meters tags(0);");
  do_query(taos, "create table t1 using meters tags(1);");
  do_query(taos, "create table t2 using meters tags(2);");
  do_query(taos, "create table t3 using meters tags(3);");
  do_query(taos, "create table t4 using meters tags(4);");
  do_query(taos, "create table t5 using meters tags(5);");
  do_query(taos, "create table t6 using meters tags(6);");
  do_query(taos, "create table t7 using meters tags(7);");
  do_query(taos, "create table t8 using meters tags(8);");
  do_query(taos, "create table t9 using meters tags(9);");

  do_query(taos, "insert into t0 values('2020-01-01 00:00:00.000', 0);");
  do_query(taos, "insert into t0 values('2020-01-01 00:01:00.000', 0);");
  do_query(taos, "insert into t0 values('2020-01-01 00:02:00.000', 0);");
  do_query(taos, "insert into t1 values('2020-01-01 00:00:00.000', 0);");
  do_query(taos, "insert into t1 values('2020-01-01 00:01:00.000', 0);");
  do_query(taos, "insert into t1 values('2020-01-01 00:02:00.000', 0);");
  do_query(taos, "insert into t1 values('2020-01-01 00:03:00.000', 0);");
  do_query(taos, "insert into t2 values('2020-01-01 00:00:00.000', 0);");
  do_query(taos, "insert into t2 values('2020-01-01 00:01:00.000', 0);");
  do_query(taos, "insert into t2 values('2020-01-01 00:01:01.000', 0);");
  do_query(taos, "insert into t2 values('2020-01-01 00:01:02.000', 0);");
  do_query(taos, "insert into t3 values('2020-01-01 00:01:02.000', 0);");
  do_query(taos, "insert into t4 values('2020-01-01 00:01:02.000', 0);");
  do_query(taos, "insert into t5 values('2020-01-01 00:01:02.000', 0);");
  do_query(taos, "insert into t6 values('2020-01-01 00:01:02.000', 0);");
  do_query(taos, "insert into t7 values('2020-01-01 00:01:02.000', 0);");
  do_query(taos, "insert into t8 values('2020-01-01 00:01:02.000', 0);");
  do_query(taos, "insert into t9 values('2020-01-01 00:01:02.000', 0);");
weixin_48148422's avatar
weixin_48148422 已提交
103 104

  // super tables subscription
weixin_48148422's avatar
weixin_48148422 已提交
105
  usleep(1000000);
weixin_48148422's avatar
weixin_48148422 已提交
106 107 108

  TAOS_SUB* tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
  TAOS_RES* res = taos_consume(tsub);
109
  check_row_count(__LINE__, res, 18);
weixin_48148422's avatar
weixin_48148422 已提交
110 111 112 113

  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 0);

B
Bomin Zhang 已提交
114 115
  do_query(taos, "insert into t0 values('2020-01-01 00:02:00.001', 0);");
  do_query(taos, "insert into t8 values('2020-01-01 00:01:03.000', 0);");
weixin_48148422's avatar
weixin_48148422 已提交
116 117 118
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 2);

B
Bomin Zhang 已提交
119 120
  do_query(taos, "insert into t2 values('2020-01-01 00:01:02.001', 0);");
  do_query(taos, "insert into t1 values('2020-01-01 00:03:00.001', 0);");
121 122 123
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 2);

B
Bomin Zhang 已提交
124
  do_query(taos, "insert into t1 values('2020-01-01 00:03:00.002', 0);");
weixin_48148422's avatar
weixin_48148422 已提交
125 126 127
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 1);

128
  // keep progress information and restart subscription
weixin_48148422's avatar
weixin_48148422 已提交
129
  taos_unsubscribe(tsub, 1);
B
Bomin Zhang 已提交
130
  do_query(taos, "insert into t0 values('2020-01-01 00:04:00.000', 0);");
weixin_48148422's avatar
weixin_48148422 已提交
131 132
  tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0);
  res = taos_consume(tsub);
133
  check_row_count(__LINE__, res, 24);
134 135 136 137 138 139

  // keep progress information and continue previous subscription
  taos_unsubscribe(tsub, 1);
  tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 0);
weixin_48148422's avatar
weixin_48148422 已提交
140 141 142 143 144

  // don't keep progress information and continue previous subscription
  taos_unsubscribe(tsub, 0);
  tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
  res = taos_consume(tsub);
145
  check_row_count(__LINE__, res, 24);
weixin_48148422's avatar
weixin_48148422 已提交
146 147 148 149 150 151

  // single meter subscription

  taos_unsubscribe(tsub, 0);
  tsub = taos_subscribe(taos, 0, "test", "select * from t0;", NULL, NULL, 0);
  res = taos_consume(tsub);
152
  check_row_count(__LINE__, res, 5);
weixin_48148422's avatar
weixin_48148422 已提交
153 154 155 156

  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 0);

B
Bomin Zhang 已提交
157
  do_query(taos, "insert into t0 values('2020-01-01 00:04:00.001', 0);");
weixin_48148422's avatar
weixin_48148422 已提交
158 159 160 161 162 163 164
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 1);

  taos_unsubscribe(tsub, 0);
}


weixin_48148422's avatar
weixin_48148422 已提交
165
int main(int argc, char *argv[]) {
166 167 168
  const char* host = "127.0.0.1";
  const char* user = "root";
  const char* passwd = "taosdata";
169
  const char* sql = "select * from meters;";
weixin_48148422's avatar
weixin_48148422 已提交
170
  const char* topic = "test-multiple";
171
  int async = 1, restart = 0, keep = 1, test = 0, blockFetch = 0;
172 173 174 175 176 177 178 179 180 181 182 183 184 185

  for (int i = 1; i < argc; i++) {
    if (strncmp(argv[i], "-h=", 3) == 0) {
      host = argv[i] + 3;
      continue;
    }
    if (strncmp(argv[i], "-u=", 3) == 0) {
      user = argv[i] + 3;
      continue;
    }
    if (strncmp(argv[i], "-p=", 3) == 0) {
      passwd = argv[i] + 3;
      continue;
    }
186 187 188 189 190 191
    if (strcmp(argv[i], "-sync") == 0) {
      async = 0;
      continue;
    }
    if (strcmp(argv[i], "-restart") == 0) {
      restart = 1;
192 193
      continue;
    }
194 195
    if (strcmp(argv[i], "-single") == 0) {
      sql = "select * from t0;";
weixin_48148422's avatar
weixin_48148422 已提交
196 197 198 199 200
      topic = "test-single";
      continue;
    }
    if (strcmp(argv[i], "-nokeep") == 0) {
      keep = 0;
201 202
      continue;
    }
203 204 205 206 207
    if (strncmp(argv[i], "-sql=", 5) == 0) {
      sql = argv[i] + 5;
      topic = "test-custom";
      continue;
    }
weixin_48148422's avatar
weixin_48148422 已提交
208 209 210 211
    if (strcmp(argv[i], "-test") == 0) {
      test = 1;
      continue;
    }
212 213 214 215
    if (strcmp(argv[i], "-block-fetch") == 0) {
      blockFetch = 1;
      continue;
    }
216 217
  }

weixin_48148422's avatar
weixin_48148422 已提交
218
  TAOS* taos = taos_connect(host, user, passwd, "", 0);
weixin_48148422's avatar
weixin_48148422 已提交
219 220
  if (taos == NULL) {
    printf("failed to connect to db, reason:%s\n", taos_errstr(taos));
H
hzcheng 已提交
221 222 223
    exit(1);
  }

weixin_48148422's avatar
weixin_48148422 已提交
224 225 226 227 228 229
  if (test) {
    run_test(taos);
    taos_close(taos);
    exit(0);
  }

B
Bomin Zhang 已提交
230
  taos_select_db(taos, "test");
231
  TAOS_SUB* tsub = NULL;
232
  if (async) {
233 234
    // create an asynchronized subscription, the callback function will be called every 1s
    tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, &blockFetch, 1000);
235
  } else {
236
    // create an synchronized subscription, need to call 'taos_consume' manually
weixin_48148422's avatar
weixin_48148422 已提交
237
    tsub = taos_subscribe(taos, restart, topic, sql, NULL, NULL, 0);
238 239 240
  }

  if (tsub == NULL) {
weixin_48148422's avatar
weixin_48148422 已提交
241 242 243
    printf("failed to create subscription.\n");
    exit(0);
  } 
H
hzcheng 已提交
244

245 246 247
  if (async) {
    getchar();
  } else while(1) {
weixin_48148422's avatar
weixin_48148422 已提交
248
    TAOS_RES* res = taos_consume(tsub);
249 250 251 252 253 254 255
    if (res == NULL) {
      printf("failed to consume data.");
      break;
    } else {
      print_result(res, blockFetch);
      getchar();
    }
H
hzcheng 已提交
256 257
  }

B
Bomin Zhang 已提交
258
  printf("total rows consumed: %d\n", nTotalRows);
weixin_48148422's avatar
weixin_48148422 已提交
259
  taos_unsubscribe(tsub, keep);
weixin_48148422's avatar
weixin_48148422 已提交
260
  taos_close(taos);
H
hzcheng 已提交
261 262 263

  return 0;
}