subscribe.c 8.0 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6
// sample code for TDengine subscribe/consume API
// to compile: gcc -o subscribe subscribe.c -ltaos

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
7
#include <taos.h>  // include TDengine header file
weixin_48148422's avatar
weixin_48148422 已提交
8
#include <unistd.h>
9

10
void print_result(TAOS_RES* res, int blockFetch) {
weixin_48148422's avatar
weixin_48148422 已提交
11
  TAOS_ROW    row = NULL;
12 13 14
  int         num_fields = taos_num_fields(res);
  TAOS_FIELD* fields = taos_fetch_fields(res);

15 16 17 18 19 20 21 22 23 24 25 26 27
  if (blockFetch) {
    int nRows = taos_fetch_block(res, &row);
    for (int i = 0; i < nRows; i++) {
      char temp[256];
      taos_print_row(temp, row + i, fields, num_fields);
      puts(temp);
    }
  } else {
    while ((row = taos_fetch_row(res))) {
      char temp[256];
      taos_print_row(temp, row, fields, num_fields);
      puts(temp);
    }
28 29 30
  }
}

31

32
void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
33
  print_result(res, *(int*)param);
34 35 36
}


weixin_48148422's avatar
weixin_48148422 已提交
37 38 39 40 41 42 43 44 45 46 47 48 49
void check_row_count(int line, TAOS_RES* res, int expected) {
  int actual = 0;
  TAOS_ROW    row;
  while ((row = taos_fetch_row(res))) {
    actual++;
  }
  if (actual != expected) {
    printf("line %d: row count mismatch, expected: %d, actual: %d\n", line, expected, actual);
  } else {
    printf("line %d: %d rows consumed as expected\n", line, actual);
  }
}

50

weixin_48148422's avatar
weixin_48148422 已提交
51
void run_test(TAOS* taos) {
52
  taos_query(taos, "drop database if exists test;");
weixin_48148422's avatar
weixin_48148422 已提交
53 54
  
  usleep(100000);
55
  taos_query(taos, "create database test tables 5;");
weixin_48148422's avatar
weixin_48148422 已提交
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
  usleep(100000);
  taos_query(taos, "use test;");
  usleep(100000);
  taos_query(taos, "create table meters(ts timestamp, a int, b binary(20)) tags(loc binary(20), area int);");

  taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:00:00.000', 0, 'china');");
  taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:01:00.000', 0, 'china');");
  taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:02:00.000', 0, 'china');");
  taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:00:00.000', 0, 'china');");
  taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:01:00.000', 0, 'china');");
  taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:02:00.000', 0, 'china');");
  taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:03:00.000', 0, 'china');");
  taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:00:00.000', 0, 'UK');");
  taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:00.000', 0, 'UK');");
  taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:01.000', 0, 'UK');");
  taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:02.000', 0, 'UK');");
72 73 74 75 76 77 78
  taos_query(taos, "insert into t3 using meters tags('tianjin', 0) values('2020-01-01 00:01:02.000', 0, 'china');");
  taos_query(taos, "insert into t4 using meters tags('wuhan', 0) values('2020-01-01 00:01:02.000', 0, 'china');");
  taos_query(taos, "insert into t5 using meters tags('jinan', 0) values('2020-01-01 00:01:02.000', 0, 'china');");
  taos_query(taos, "insert into t6 using meters tags('haikou', 0) values('2020-01-01 00:01:02.000', 0, 'china');");
  taos_query(taos, "insert into t7 using meters tags('nanjing', 0) values('2020-01-01 00:01:02.000', 0, 'china');");
  taos_query(taos, "insert into t8 using meters tags('lanzhou', 0) values('2020-01-01 00:01:02.000', 0, 'china');");
  taos_query(taos, "insert into t9 using meters tags('tokyo', 0) values('2020-01-01 00:01:02.000', 0, 'japan');");
weixin_48148422's avatar
weixin_48148422 已提交
79 80 81 82 83

  // super tables subscription

  TAOS_SUB* tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
  TAOS_RES* res = taos_consume(tsub);
84
  check_row_count(__LINE__, res, 18);
weixin_48148422's avatar
weixin_48148422 已提交
85 86 87 88

  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 0);

89 90
  taos_query(taos, "insert into t0 values('2020-01-01 00:03:00.000', 0, 'china');");
  taos_query(taos, "insert into t8 values('2020-01-01 00:01:03.000', 0, 'china');");
weixin_48148422's avatar
weixin_48148422 已提交
91 92 93
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 2);

94 95 96 97 98 99
  taos_query(taos, "insert into t2 values('2020-01-01 00:01:02.001', 0, 'UK');");
  taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.001', 0, 'UK');");
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 2);

  taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.002', 0, 'china');");
weixin_48148422's avatar
weixin_48148422 已提交
100 101 102
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 1);

103
  // keep progress information and restart subscription
weixin_48148422's avatar
weixin_48148422 已提交
104
  taos_unsubscribe(tsub, 1);
105
  taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.000', 0, 'china');");
weixin_48148422's avatar
weixin_48148422 已提交
106 107
  tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0);
  res = taos_consume(tsub);
108
  check_row_count(__LINE__, res, 24);
109 110 111 112 113 114

  // keep progress information and continue previous subscription
  taos_unsubscribe(tsub, 1);
  tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 0);
weixin_48148422's avatar
weixin_48148422 已提交
115 116 117 118 119

  // don't keep progress information and continue previous subscription
  taos_unsubscribe(tsub, 0);
  tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
  res = taos_consume(tsub);
120
  check_row_count(__LINE__, res, 24);
weixin_48148422's avatar
weixin_48148422 已提交
121 122 123 124 125 126

  // single meter subscription

  taos_unsubscribe(tsub, 0);
  tsub = taos_subscribe(taos, 0, "test", "select * from t0;", NULL, NULL, 0);
  res = taos_consume(tsub);
127
  check_row_count(__LINE__, res, 5);
weixin_48148422's avatar
weixin_48148422 已提交
128 129 130 131

  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 0);

132
  taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.001', 0, 'china');");
weixin_48148422's avatar
weixin_48148422 已提交
133 134 135 136 137 138 139
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 1);

  taos_unsubscribe(tsub, 0);
}


weixin_48148422's avatar
weixin_48148422 已提交
140
int main(int argc, char *argv[]) {
141 142 143
  const char* host = "127.0.0.1";
  const char* user = "root";
  const char* passwd = "taosdata";
144
  const char* sql = "select * from meters;";
weixin_48148422's avatar
weixin_48148422 已提交
145
  const char* topic = "test-multiple";
146
  int async = 1, restart = 0, keep = 1, test = 0, blockFetch = 0;
147 148 149 150 151 152 153 154 155 156 157 158 159 160

  for (int i = 1; i < argc; i++) {
    if (strncmp(argv[i], "-h=", 3) == 0) {
      host = argv[i] + 3;
      continue;
    }
    if (strncmp(argv[i], "-u=", 3) == 0) {
      user = argv[i] + 3;
      continue;
    }
    if (strncmp(argv[i], "-p=", 3) == 0) {
      passwd = argv[i] + 3;
      continue;
    }
161 162 163 164 165 166
    if (strcmp(argv[i], "-sync") == 0) {
      async = 0;
      continue;
    }
    if (strcmp(argv[i], "-restart") == 0) {
      restart = 1;
167 168
      continue;
    }
169 170
    if (strcmp(argv[i], "-single") == 0) {
      sql = "select * from t0;";
weixin_48148422's avatar
weixin_48148422 已提交
171 172 173 174 175
      topic = "test-single";
      continue;
    }
    if (strcmp(argv[i], "-nokeep") == 0) {
      keep = 0;
176 177
      continue;
    }
178 179 180 181 182
    if (strncmp(argv[i], "-sql=", 5) == 0) {
      sql = argv[i] + 5;
      topic = "test-custom";
      continue;
    }
weixin_48148422's avatar
weixin_48148422 已提交
183 184 185 186
    if (strcmp(argv[i], "-test") == 0) {
      test = 1;
      continue;
    }
187 188 189 190
    if (strcmp(argv[i], "-block-fetch") == 0) {
      blockFetch = 1;
      continue;
    }
191 192
  }

weixin_48148422's avatar
weixin_48148422 已提交
193 194
  // init TAOS
  taos_init();
H
hzcheng 已提交
195

196
  TAOS* taos = taos_connect(host, user, passwd, "test", 0);
weixin_48148422's avatar
weixin_48148422 已提交
197 198
  if (taos == NULL) {
    printf("failed to connect to db, reason:%s\n", taos_errstr(taos));
H
hzcheng 已提交
199 200 201
    exit(1);
  }

weixin_48148422's avatar
weixin_48148422 已提交
202 203 204 205 206 207
  if (test) {
    run_test(taos);
    taos_close(taos);
    exit(0);
  }

208
  TAOS_SUB* tsub = NULL;
209
  if (async) {
210 211
    // create an asynchronized subscription, the callback function will be called every 1s
    tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, &blockFetch, 1000);
212
  } else {
213
    // create an synchronized subscription, need to call 'taos_consume' manually
weixin_48148422's avatar
weixin_48148422 已提交
214
    tsub = taos_subscribe(taos, restart, topic, sql, NULL, NULL, 0);
215 216 217
  }

  if (tsub == NULL) {
weixin_48148422's avatar
weixin_48148422 已提交
218 219 220
    printf("failed to create subscription.\n");
    exit(0);
  } 
H
hzcheng 已提交
221

222 223 224
  if (async) {
    getchar();
  } else while(1) {
weixin_48148422's avatar
weixin_48148422 已提交
225
    TAOS_RES* res = taos_consume(tsub);
226 227 228 229 230 231 232
    if (res == NULL) {
      printf("failed to consume data.");
      break;
    } else {
      print_result(res, blockFetch);
      getchar();
    }
H
hzcheng 已提交
233 234
  }

weixin_48148422's avatar
weixin_48148422 已提交
235
  taos_unsubscribe(tsub, keep);
weixin_48148422's avatar
weixin_48148422 已提交
236
  taos_close(taos);
H
hzcheng 已提交
237 238 239

  return 0;
}