subscribe.c 8.0 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6
// sample code for TDengine subscribe/consume API
// to compile: gcc -o subscribe subscribe.c -ltaos

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
7
#include <taos.h>  // include TDengine header file
weixin_48148422's avatar
weixin_48148422 已提交
8
#include <unistd.h>
9

10
int nTotalRows;
B
Bomin Zhang 已提交
11

12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
void print_result(TAOS_RES* res, int blockFetch) {
  TAOS_ROW    row = NULL;
  int         num_fields = taos_num_fields(res);
  TAOS_FIELD* fields = taos_fetch_fields(res);
  int         nRows = 0;
  
  if (blockFetch) {
    nRows = taos_fetch_block(res, &row);
    //for (int i = 0; i < nRows; i++) {
    //  taos_print_row(buf, row + i, fields, num_fields);
    //  puts(buf);
    //}
  } else {
    while ((row = taos_fetch_row(res))) {
      char buf[4096] = {0};
      taos_print_row(buf, row, fields, num_fields);
      puts(buf);
      nRows++;
30
    }
31
  }
weixin_48148422's avatar
weixin_48148422 已提交
32

33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
  nTotalRows += nRows;
  printf("%d rows consumed.\n", nRows);
}


void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
  print_result(res, *(int*)param);
}


void check_row_count(int line, TAOS_RES* res, int expected) {
  int actual = 0;
  TAOS_ROW    row;
  while ((row = taos_fetch_row(res))) {
    actual++;
weixin_48148422's avatar
weixin_48148422 已提交
48
  }
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
  if (actual != expected) {
    printf("line %d: row count mismatch, expected: %d, actual: %d\n", line, expected, actual);
  } else {
    printf("line %d: %d rows consumed as expected\n", line, actual);
  }
}


void do_query(TAOS* taos, const char* sql) {
  TAOS_RES* res = taos_query(taos, sql);
  taos_free_result(res);
}


void run_test(TAOS* taos) {
  do_query(taos, "drop database if exists test;");
  
  usleep(100000);
  do_query(taos, "create database test;");
  usleep(100000);
  do_query(taos, "use test;");

  usleep(100000);
  do_query(taos, "create table meters(ts timestamp, a int) tags(area int);");

  do_query(taos, "create table t0 using meters tags(0);");
  do_query(taos, "create table t1 using meters tags(1);");
  do_query(taos, "create table t2 using meters tags(2);");
  do_query(taos, "create table t3 using meters tags(3);");
  do_query(taos, "create table t4 using meters tags(4);");
  do_query(taos, "create table t5 using meters tags(5);");
  do_query(taos, "create table t6 using meters tags(6);");
  do_query(taos, "create table t7 using meters tags(7);");
  do_query(taos, "create table t8 using meters tags(8);");
  do_query(taos, "create table t9 using meters tags(9);");

  do_query(taos, "insert into t0 values('2020-01-01 00:00:00.000', 0);");
  do_query(taos, "insert into t0 values('2020-01-01 00:01:00.000', 0);");
  do_query(taos, "insert into t0 values('2020-01-01 00:02:00.000', 0);");
  do_query(taos, "insert into t1 values('2020-01-01 00:00:00.000', 0);");
  do_query(taos, "insert into t1 values('2020-01-01 00:01:00.000', 0);");
  do_query(taos, "insert into t1 values('2020-01-01 00:02:00.000', 0);");
  do_query(taos, "insert into t1 values('2020-01-01 00:03:00.000', 0);");
  do_query(taos, "insert into t2 values('2020-01-01 00:00:00.000', 0);");
  do_query(taos, "insert into t2 values('2020-01-01 00:01:00.000', 0);");
  do_query(taos, "insert into t2 values('2020-01-01 00:01:01.000', 0);");
  do_query(taos, "insert into t2 values('2020-01-01 00:01:02.000', 0);");
  do_query(taos, "insert into t3 values('2020-01-01 00:01:02.000', 0);");
  do_query(taos, "insert into t4 values('2020-01-01 00:01:02.000', 0);");
  do_query(taos, "insert into t5 values('2020-01-01 00:01:02.000', 0);");
  do_query(taos, "insert into t6 values('2020-01-01 00:01:02.000', 0);");
  do_query(taos, "insert into t7 values('2020-01-01 00:01:02.000', 0);");
  do_query(taos, "insert into t8 values('2020-01-01 00:01:02.000', 0);");
  do_query(taos, "insert into t9 values('2020-01-01 00:01:02.000', 0);");

  // super tables subscription
  usleep(1000000);

  TAOS_SUB* tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
  TAOS_RES* res = taos_consume(tsub);
  check_row_count(__LINE__, res, 18);

  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 0);

  do_query(taos, "insert into t0 values('2020-01-01 00:02:00.001', 0);");
  do_query(taos, "insert into t8 values('2020-01-01 00:01:03.000', 0);");
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 2);

  do_query(taos, "insert into t2 values('2020-01-01 00:01:02.001', 0);");
  do_query(taos, "insert into t1 values('2020-01-01 00:03:00.001', 0);");
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 2);

  do_query(taos, "insert into t1 values('2020-01-01 00:03:00.002', 0);");
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 1);

  // keep progress information and restart subscription
  taos_unsubscribe(tsub, 1);
  do_query(taos, "insert into t0 values('2020-01-01 00:04:00.000', 0);");
  tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0);
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 24);
134

135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
  // keep progress information and continue previous subscription
  taos_unsubscribe(tsub, 1);
  tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 0);

  // don't keep progress information and continue previous subscription
  taos_unsubscribe(tsub, 0);
  tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 24);

  // single meter subscription

  taos_unsubscribe(tsub, 0);
  tsub = taos_subscribe(taos, 0, "test", "select * from t0;", NULL, NULL, 0);
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 5);

  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 0);

  do_query(taos, "insert into t0 values('2020-01-01 00:04:00.001', 0);");
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 1);

  taos_unsubscribe(tsub, 0);
weixin_48148422's avatar
weixin_48148422 已提交
162 163 164
}


weixin_48148422's avatar
weixin_48148422 已提交
165
int main(int argc, char *argv[]) {
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
  const char* host = "127.0.0.1";
  const char* user = "root";
  const char* passwd = "taosdata";
  const char* sql = "select * from meters;";
  const char* topic = "test-multiple";
  int async = 1, restart = 0, keep = 1, test = 0, blockFetch = 0;

  for (int i = 1; i < argc; i++) {
    if (strncmp(argv[i], "-h=", 3) == 0) {
      host = argv[i] + 3;
      continue;
    }
    if (strncmp(argv[i], "-u=", 3) == 0) {
      user = argv[i] + 3;
      continue;
    }
    if (strncmp(argv[i], "-p=", 3) == 0) {
      passwd = argv[i] + 3;
      continue;
    }
    if (strcmp(argv[i], "-sync") == 0) {
      async = 0;
      continue;
    }
    if (strcmp(argv[i], "-restart") == 0) {
      restart = 1;
      continue;
    }
    if (strcmp(argv[i], "-single") == 0) {
      sql = "select * from t0;";
      topic = "test-single";
      continue;
    }
    if (strcmp(argv[i], "-nokeep") == 0) {
      keep = 0;
      continue;
    }
    if (strncmp(argv[i], "-sql=", 5) == 0) {
      sql = argv[i] + 5;
      topic = "test-custom";
      continue;
    }
    if (strcmp(argv[i], "-test") == 0) {
      test = 1;
      continue;
    }
    if (strcmp(argv[i], "-block-fetch") == 0) {
      blockFetch = 1;
      continue;
    }
216 217
  }

218
  TAOS* taos = taos_connect(host, user, passwd, "", 0);
weixin_48148422's avatar
weixin_48148422 已提交
219
  if (taos == NULL) {
220
    printf("failed to connect to db, reason:%s\n", taos_errstr(taos));
H
hzcheng 已提交
221 222
    exit(1);
  }
223 224 225 226 227

  if (test) {
    run_test(taos);
    taos_close(taos);
    exit(0);
228
  }
229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251

  taos_select_db(taos, "test");
  TAOS_SUB* tsub = NULL;
  if (async) {
    // create an asynchronized subscription, the callback function will be called every 1s
    tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, &blockFetch, 1000);
  } else {
    // create an synchronized subscription, need to call 'taos_consume' manually
    tsub = taos_subscribe(taos, restart, topic, sql, NULL, NULL, 0);
  }

  if (tsub == NULL) {
    printf("failed to create subscription.\n");
    exit(0);
  } 

  if (async) {
    getchar();
  } else while(1) {
    TAOS_RES* res = taos_consume(tsub);
    if (res == NULL) {
      printf("failed to consume data.");
      break;
252
    } else {
253 254
      print_result(res, blockFetch);
      getchar();
255
    }
H
hzcheng 已提交
256 257
  }

258 259 260
  printf("total rows consumed: %d\n", nTotalRows);
  taos_unsubscribe(tsub, keep);
  taos_close(taos);
H
hzcheng 已提交
261

262
  return 0;
H
hzcheng 已提交
263
}