subscribe.c 8.0 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6
// sample code for TDengine subscribe/consume API
// to compile: gcc -o subscribe subscribe.c -ltaos

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
7
#include <taos.h>  // include TDengine header file
weixin_48148422's avatar
weixin_48148422 已提交
8
#include <unistd.h>
9

10
void print_result(TAOS_RES* res, int blockFetch) {
weixin_48148422's avatar
weixin_48148422 已提交
11
  TAOS_ROW    row = NULL;
12 13
  int         num_fields = taos_num_fields(res);
  TAOS_FIELD* fields = taos_fetch_fields(res);
weixin_48148422's avatar
weixin_48148422 已提交
14
  int         nRows = 0;
15

16
  if (blockFetch) {
weixin_48148422's avatar
weixin_48148422 已提交
17
    nRows = taos_fetch_block(res, &row);
18 19 20 21 22 23 24 25 26 27
    for (int i = 0; i < nRows; i++) {
      char temp[256];
      taos_print_row(temp, row + i, fields, num_fields);
      puts(temp);
    }
  } else {
    while ((row = taos_fetch_row(res))) {
      char temp[256];
      taos_print_row(temp, row, fields, num_fields);
      puts(temp);
weixin_48148422's avatar
weixin_48148422 已提交
28
      nRows++;
29
    }
30
  }
weixin_48148422's avatar
weixin_48148422 已提交
31 32

  printf("%d rows consumed.\n", nRows);
33 34
}

35

36
void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
37
  print_result(res, *(int*)param);
38 39 40
}


weixin_48148422's avatar
weixin_48148422 已提交
41 42 43 44 45 46 47 48 49 50 51 52 53
void check_row_count(int line, TAOS_RES* res, int expected) {
  int actual = 0;
  TAOS_ROW    row;
  while ((row = taos_fetch_row(res))) {
    actual++;
  }
  if (actual != expected) {
    printf("line %d: row count mismatch, expected: %d, actual: %d\n", line, expected, actual);
  } else {
    printf("line %d: %d rows consumed as expected\n", line, actual);
  }
}

54

weixin_48148422's avatar
weixin_48148422 已提交
55
void run_test(TAOS* taos) {
56
  taos_query(taos, "drop database if exists test;");
weixin_48148422's avatar
weixin_48148422 已提交
57 58
  
  usleep(100000);
59
  taos_query(taos, "create database test tables 5;");
weixin_48148422's avatar
weixin_48148422 已提交
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
  usleep(100000);
  taos_query(taos, "use test;");
  usleep(100000);
  taos_query(taos, "create table meters(ts timestamp, a int, b binary(20)) tags(loc binary(20), area int);");

  taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:00:00.000', 0, 'china');");
  taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:01:00.000', 0, 'china');");
  taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:02:00.000', 0, 'china');");
  taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:00:00.000', 0, 'china');");
  taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:01:00.000', 0, 'china');");
  taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:02:00.000', 0, 'china');");
  taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:03:00.000', 0, 'china');");
  taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:00:00.000', 0, 'UK');");
  taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:00.000', 0, 'UK');");
  taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:01.000', 0, 'UK');");
  taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:02.000', 0, 'UK');");
76 77 78 79 80 81 82
  taos_query(taos, "insert into t3 using meters tags('tianjin', 0) values('2020-01-01 00:01:02.000', 0, 'china');");
  taos_query(taos, "insert into t4 using meters tags('wuhan', 0) values('2020-01-01 00:01:02.000', 0, 'china');");
  taos_query(taos, "insert into t5 using meters tags('jinan', 0) values('2020-01-01 00:01:02.000', 0, 'china');");
  taos_query(taos, "insert into t6 using meters tags('haikou', 0) values('2020-01-01 00:01:02.000', 0, 'china');");
  taos_query(taos, "insert into t7 using meters tags('nanjing', 0) values('2020-01-01 00:01:02.000', 0, 'china');");
  taos_query(taos, "insert into t8 using meters tags('lanzhou', 0) values('2020-01-01 00:01:02.000', 0, 'china');");
  taos_query(taos, "insert into t9 using meters tags('tokyo', 0) values('2020-01-01 00:01:02.000', 0, 'japan');");
weixin_48148422's avatar
weixin_48148422 已提交
83 84 85 86 87

  // super tables subscription

  TAOS_SUB* tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
  TAOS_RES* res = taos_consume(tsub);
88
  check_row_count(__LINE__, res, 18);
weixin_48148422's avatar
weixin_48148422 已提交
89 90 91 92

  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 0);

93 94
  taos_query(taos, "insert into t0 values('2020-01-01 00:03:00.000', 0, 'china');");
  taos_query(taos, "insert into t8 values('2020-01-01 00:01:03.000', 0, 'china');");
weixin_48148422's avatar
weixin_48148422 已提交
95 96 97
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 2);

98 99 100 101 102 103
  taos_query(taos, "insert into t2 values('2020-01-01 00:01:02.001', 0, 'UK');");
  taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.001', 0, 'UK');");
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 2);

  taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.002', 0, 'china');");
weixin_48148422's avatar
weixin_48148422 已提交
104 105 106
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 1);

107
  // keep progress information and restart subscription
weixin_48148422's avatar
weixin_48148422 已提交
108
  taos_unsubscribe(tsub, 1);
109
  taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.000', 0, 'china');");
weixin_48148422's avatar
weixin_48148422 已提交
110 111
  tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0);
  res = taos_consume(tsub);
112
  check_row_count(__LINE__, res, 24);
113 114 115 116 117 118

  // keep progress information and continue previous subscription
  taos_unsubscribe(tsub, 1);
  tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 0);
weixin_48148422's avatar
weixin_48148422 已提交
119 120 121 122 123

  // don't keep progress information and continue previous subscription
  taos_unsubscribe(tsub, 0);
  tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
  res = taos_consume(tsub);
124
  check_row_count(__LINE__, res, 24);
weixin_48148422's avatar
weixin_48148422 已提交
125 126 127 128 129 130

  // single meter subscription

  taos_unsubscribe(tsub, 0);
  tsub = taos_subscribe(taos, 0, "test", "select * from t0;", NULL, NULL, 0);
  res = taos_consume(tsub);
131
  check_row_count(__LINE__, res, 5);
weixin_48148422's avatar
weixin_48148422 已提交
132 133 134 135

  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 0);

136
  taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.001', 0, 'china');");
weixin_48148422's avatar
weixin_48148422 已提交
137 138 139 140 141 142 143
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 1);

  taos_unsubscribe(tsub, 0);
}


weixin_48148422's avatar
weixin_48148422 已提交
144
int main(int argc, char *argv[]) {
145 146 147
  const char* host = "127.0.0.1";
  const char* user = "root";
  const char* passwd = "taosdata";
148
  const char* sql = "select * from meters;";
weixin_48148422's avatar
weixin_48148422 已提交
149
  const char* topic = "test-multiple";
150
  int async = 1, restart = 0, keep = 1, test = 0, blockFetch = 0;
151 152 153 154 155 156 157 158 159 160 161 162 163 164

  for (int i = 1; i < argc; i++) {
    if (strncmp(argv[i], "-h=", 3) == 0) {
      host = argv[i] + 3;
      continue;
    }
    if (strncmp(argv[i], "-u=", 3) == 0) {
      user = argv[i] + 3;
      continue;
    }
    if (strncmp(argv[i], "-p=", 3) == 0) {
      passwd = argv[i] + 3;
      continue;
    }
165 166 167 168 169 170
    if (strcmp(argv[i], "-sync") == 0) {
      async = 0;
      continue;
    }
    if (strcmp(argv[i], "-restart") == 0) {
      restart = 1;
171 172
      continue;
    }
173 174
    if (strcmp(argv[i], "-single") == 0) {
      sql = "select * from t0;";
weixin_48148422's avatar
weixin_48148422 已提交
175 176 177 178 179
      topic = "test-single";
      continue;
    }
    if (strcmp(argv[i], "-nokeep") == 0) {
      keep = 0;
180 181
      continue;
    }
182 183 184 185 186
    if (strncmp(argv[i], "-sql=", 5) == 0) {
      sql = argv[i] + 5;
      topic = "test-custom";
      continue;
    }
weixin_48148422's avatar
weixin_48148422 已提交
187 188 189 190
    if (strcmp(argv[i], "-test") == 0) {
      test = 1;
      continue;
    }
191 192 193 194
    if (strcmp(argv[i], "-block-fetch") == 0) {
      blockFetch = 1;
      continue;
    }
195 196
  }

weixin_48148422's avatar
weixin_48148422 已提交
197 198
  // init TAOS
  taos_init();
H
hzcheng 已提交
199

200
  TAOS* taos = taos_connect(host, user, passwd, "test", 0);
weixin_48148422's avatar
weixin_48148422 已提交
201 202
  if (taos == NULL) {
    printf("failed to connect to db, reason:%s\n", taos_errstr(taos));
H
hzcheng 已提交
203 204 205
    exit(1);
  }

weixin_48148422's avatar
weixin_48148422 已提交
206 207 208 209 210 211
  if (test) {
    run_test(taos);
    taos_close(taos);
    exit(0);
  }

212
  TAOS_SUB* tsub = NULL;
213
  if (async) {
214 215
    // create an asynchronized subscription, the callback function will be called every 1s
    tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, &blockFetch, 1000);
216
  } else {
217
    // create an synchronized subscription, need to call 'taos_consume' manually
weixin_48148422's avatar
weixin_48148422 已提交
218
    tsub = taos_subscribe(taos, restart, topic, sql, NULL, NULL, 0);
219 220 221
  }

  if (tsub == NULL) {
weixin_48148422's avatar
weixin_48148422 已提交
222 223 224
    printf("failed to create subscription.\n");
    exit(0);
  } 
H
hzcheng 已提交
225

226 227 228
  if (async) {
    getchar();
  } else while(1) {
weixin_48148422's avatar
weixin_48148422 已提交
229
    TAOS_RES* res = taos_consume(tsub);
230 231 232 233 234 235 236
    if (res == NULL) {
      printf("failed to consume data.");
      break;
    } else {
      print_result(res, blockFetch);
      getchar();
    }
H
hzcheng 已提交
237 238
  }

weixin_48148422's avatar
weixin_48148422 已提交
239
  taos_unsubscribe(tsub, keep);
weixin_48148422's avatar
weixin_48148422 已提交
240
  taos_close(taos);
H
hzcheng 已提交
241 242 243

  return 0;
}