subscribe.c 8.0 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6
// sample code for TDengine subscribe/consume API
// to compile: gcc -o subscribe subscribe.c -ltaos

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
7
#include <taos.h>  // include TDengine header file
weixin_48148422's avatar
weixin_48148422 已提交
8
#include <unistd.h>
9

10
void print_result(TAOS_RES* res, int blockFetch) {
weixin_48148422's avatar
weixin_48148422 已提交
11
  TAOS_ROW    row = NULL;
12 13
  int         num_fields = taos_num_fields(res);
  TAOS_FIELD* fields = taos_fetch_fields(res);
weixin_48148422's avatar
weixin_48148422 已提交
14
  int         nRows = 0;
15

16
  if (blockFetch) {
weixin_48148422's avatar
weixin_48148422 已提交
17
    nRows = taos_fetch_block(res, &row);
18 19 20 21 22 23 24 25 26 27
    for (int i = 0; i < nRows; i++) {
      char temp[256];
      taos_print_row(temp, row + i, fields, num_fields);
      puts(temp);
    }
  } else {
    while ((row = taos_fetch_row(res))) {
      char temp[256];
      taos_print_row(temp, row, fields, num_fields);
      puts(temp);
weixin_48148422's avatar
weixin_48148422 已提交
28
      nRows++;
29
    }
30
  }
weixin_48148422's avatar
weixin_48148422 已提交
31 32

  printf("%d rows consumed.\n", nRows);
33 34
}

35

36
void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
37
  print_result(res, *(int*)param);
38 39 40
}


weixin_48148422's avatar
weixin_48148422 已提交
41 42 43 44 45 46 47 48 49 50 51 52 53
void check_row_count(int line, TAOS_RES* res, int expected) {
  int actual = 0;
  TAOS_ROW    row;
  while ((row = taos_fetch_row(res))) {
    actual++;
  }
  if (actual != expected) {
    printf("line %d: row count mismatch, expected: %d, actual: %d\n", line, expected, actual);
  } else {
    printf("line %d: %d rows consumed as expected\n", line, actual);
  }
}

54

weixin_48148422's avatar
weixin_48148422 已提交
55
void run_test(TAOS* taos) {
56
  taos_query(taos, "drop database if exists test;");
weixin_48148422's avatar
weixin_48148422 已提交
57 58
  
  usleep(100000);
weixin_48148422's avatar
weixin_48148422 已提交
59 60
  //taos_query(taos, "create database test tables 5;");
  taos_query(taos, "create database test;");
weixin_48148422's avatar
weixin_48148422 已提交
61 62
  usleep(100000);
  taos_query(taos, "use test;");
weixin_48148422's avatar
weixin_48148422 已提交
63

weixin_48148422's avatar
weixin_48148422 已提交
64
  usleep(100000);
weixin_48148422's avatar
weixin_48148422 已提交
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
  taos_query(taos, "create table meters(ts timestamp, a int) tags(area int);");

  taos_query(taos, "create table t0 using meters tags(0);");
  taos_query(taos, "create table t1 using meters tags(1);");
  taos_query(taos, "create table t2 using meters tags(2);");
  taos_query(taos, "create table t3 using meters tags(3);");
  taos_query(taos, "create table t4 using meters tags(4);");
  taos_query(taos, "create table t5 using meters tags(5);");
  taos_query(taos, "create table t6 using meters tags(6);");
  taos_query(taos, "create table t7 using meters tags(7);");
  taos_query(taos, "create table t8 using meters tags(8);");
  taos_query(taos, "create table t9 using meters tags(9);");

  taos_query(taos, "insert into t0 values('2020-01-01 00:00:00.000', 0);");
  taos_query(taos, "insert into t0 values('2020-01-01 00:01:00.000', 0);");
  taos_query(taos, "insert into t0 values('2020-01-01 00:02:00.000', 0);");
  taos_query(taos, "insert into t1 values('2020-01-01 00:00:00.000', 0);");
  taos_query(taos, "insert into t1 values('2020-01-01 00:01:00.000', 0);");
  taos_query(taos, "insert into t1 values('2020-01-01 00:02:00.000', 0);");
  taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.000', 0);");
  taos_query(taos, "insert into t2 values('2020-01-01 00:00:00.000', 0);");
  taos_query(taos, "insert into t2 values('2020-01-01 00:01:00.000', 0);");
  taos_query(taos, "insert into t2 values('2020-01-01 00:01:01.000', 0);");
  taos_query(taos, "insert into t2 values('2020-01-01 00:01:02.000', 0);");
  taos_query(taos, "insert into t3 values('2020-01-01 00:01:02.000', 0);");
  taos_query(taos, "insert into t4 values('2020-01-01 00:01:02.000', 0);");
  taos_query(taos, "insert into t5 values('2020-01-01 00:01:02.000', 0);");
  taos_query(taos, "insert into t6 values('2020-01-01 00:01:02.000', 0);");
  taos_query(taos, "insert into t7 values('2020-01-01 00:01:02.000', 0);");
  taos_query(taos, "insert into t8 values('2020-01-01 00:01:02.000', 0);");
  taos_query(taos, "insert into t9 values('2020-01-01 00:01:02.000', 0);");
weixin_48148422's avatar
weixin_48148422 已提交
96 97

  // super tables subscription
weixin_48148422's avatar
weixin_48148422 已提交
98
  usleep(1000000);
weixin_48148422's avatar
weixin_48148422 已提交
99 100 101

  TAOS_SUB* tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
  TAOS_RES* res = taos_consume(tsub);
102
  check_row_count(__LINE__, res, 18);
weixin_48148422's avatar
weixin_48148422 已提交
103 104 105 106

  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 0);

weixin_48148422's avatar
weixin_48148422 已提交
107 108
  taos_query(taos, "insert into t0 values('2020-01-01 00:02:00.001', 0);");
  taos_query(taos, "insert into t8 values('2020-01-01 00:01:03.000', 0);");
weixin_48148422's avatar
weixin_48148422 已提交
109 110 111
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 2);

weixin_48148422's avatar
weixin_48148422 已提交
112 113
  taos_query(taos, "insert into t2 values('2020-01-01 00:01:02.001', 0);");
  taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.001', 0);");
114 115 116
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 2);

weixin_48148422's avatar
weixin_48148422 已提交
117
  taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.002', 0);");
weixin_48148422's avatar
weixin_48148422 已提交
118 119 120
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 1);

121
  // keep progress information and restart subscription
weixin_48148422's avatar
weixin_48148422 已提交
122
  taos_unsubscribe(tsub, 1);
weixin_48148422's avatar
weixin_48148422 已提交
123
  taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.000', 0);");
weixin_48148422's avatar
weixin_48148422 已提交
124 125
  tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0);
  res = taos_consume(tsub);
126
  check_row_count(__LINE__, res, 24);
127 128 129 130 131 132

  // keep progress information and continue previous subscription
  taos_unsubscribe(tsub, 1);
  tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 0);
weixin_48148422's avatar
weixin_48148422 已提交
133 134 135 136 137

  // don't keep progress information and continue previous subscription
  taos_unsubscribe(tsub, 0);
  tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
  res = taos_consume(tsub);
138
  check_row_count(__LINE__, res, 24);
weixin_48148422's avatar
weixin_48148422 已提交
139 140 141 142 143 144

  // single meter subscription

  taos_unsubscribe(tsub, 0);
  tsub = taos_subscribe(taos, 0, "test", "select * from t0;", NULL, NULL, 0);
  res = taos_consume(tsub);
145
  check_row_count(__LINE__, res, 5);
weixin_48148422's avatar
weixin_48148422 已提交
146 147 148 149

  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 0);

weixin_48148422's avatar
weixin_48148422 已提交
150
  taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.001', 0);");
weixin_48148422's avatar
weixin_48148422 已提交
151 152 153 154 155 156 157
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 1);

  taos_unsubscribe(tsub, 0);
}


weixin_48148422's avatar
weixin_48148422 已提交
158
int main(int argc, char *argv[]) {
159 160 161
  const char* host = "127.0.0.1";
  const char* user = "root";
  const char* passwd = "taosdata";
162
  const char* sql = "select * from meters;";
weixin_48148422's avatar
weixin_48148422 已提交
163
  const char* topic = "test-multiple";
164
  int async = 1, restart = 0, keep = 1, test = 0, blockFetch = 0;
165 166 167 168 169 170 171 172 173 174 175 176 177 178

  for (int i = 1; i < argc; i++) {
    if (strncmp(argv[i], "-h=", 3) == 0) {
      host = argv[i] + 3;
      continue;
    }
    if (strncmp(argv[i], "-u=", 3) == 0) {
      user = argv[i] + 3;
      continue;
    }
    if (strncmp(argv[i], "-p=", 3) == 0) {
      passwd = argv[i] + 3;
      continue;
    }
179 180 181 182 183 184
    if (strcmp(argv[i], "-sync") == 0) {
      async = 0;
      continue;
    }
    if (strcmp(argv[i], "-restart") == 0) {
      restart = 1;
185 186
      continue;
    }
187 188
    if (strcmp(argv[i], "-single") == 0) {
      sql = "select * from t0;";
weixin_48148422's avatar
weixin_48148422 已提交
189 190 191 192 193
      topic = "test-single";
      continue;
    }
    if (strcmp(argv[i], "-nokeep") == 0) {
      keep = 0;
194 195
      continue;
    }
196 197 198 199 200
    if (strncmp(argv[i], "-sql=", 5) == 0) {
      sql = argv[i] + 5;
      topic = "test-custom";
      continue;
    }
weixin_48148422's avatar
weixin_48148422 已提交
201 202 203 204
    if (strcmp(argv[i], "-test") == 0) {
      test = 1;
      continue;
    }
205 206 207 208
    if (strcmp(argv[i], "-block-fetch") == 0) {
      blockFetch = 1;
      continue;
    }
209 210
  }

weixin_48148422's avatar
weixin_48148422 已提交
211 212
  // init TAOS
  taos_init();
H
hzcheng 已提交
213

weixin_48148422's avatar
weixin_48148422 已提交
214
  TAOS* taos = taos_connect(host, user, passwd, "", 0);
weixin_48148422's avatar
weixin_48148422 已提交
215 216
  if (taos == NULL) {
    printf("failed to connect to db, reason:%s\n", taos_errstr(taos));
H
hzcheng 已提交
217 218 219
    exit(1);
  }

weixin_48148422's avatar
weixin_48148422 已提交
220 221 222 223 224 225
  if (test) {
    run_test(taos);
    taos_close(taos);
    exit(0);
  }

weixin_48148422's avatar
weixin_48148422 已提交
226
  taos_query(taos, "use test;");
227
  TAOS_SUB* tsub = NULL;
228
  if (async) {
229 230
    // create an asynchronized subscription, the callback function will be called every 1s
    tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, &blockFetch, 1000);
231
  } else {
232
    // create an synchronized subscription, need to call 'taos_consume' manually
weixin_48148422's avatar
weixin_48148422 已提交
233
    tsub = taos_subscribe(taos, restart, topic, sql, NULL, NULL, 0);
234 235 236
  }

  if (tsub == NULL) {
weixin_48148422's avatar
weixin_48148422 已提交
237 238 239
    printf("failed to create subscription.\n");
    exit(0);
  } 
H
hzcheng 已提交
240

241 242 243
  if (async) {
    getchar();
  } else while(1) {
weixin_48148422's avatar
weixin_48148422 已提交
244
    TAOS_RES* res = taos_consume(tsub);
245 246 247 248 249 250 251
    if (res == NULL) {
      printf("failed to consume data.");
      break;
    } else {
      print_result(res, blockFetch);
      getchar();
    }
H
hzcheng 已提交
252 253
  }

weixin_48148422's avatar
weixin_48148422 已提交
254
  taos_unsubscribe(tsub, keep);
weixin_48148422's avatar
weixin_48148422 已提交
255
  taos_close(taos);
H
hzcheng 已提交
256 257 258

  return 0;
}