subscribe.c 6.7 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6
// sample code for TDengine subscribe/consume API
// to compile: gcc -o subscribe subscribe.c -ltaos

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
7
#include <taos.h>  // include TDengine header file
weixin_48148422's avatar
weixin_48148422 已提交
8
#include <unistd.h>
9 10

void print_result(TAOS_RES* res) {
weixin_48148422's avatar
weixin_48148422 已提交
11
  TAOS_ROW    row = NULL;
12 13 14
  int         num_fields = taos_num_fields(res);
  TAOS_FIELD* fields = taos_fetch_fields(res);

weixin_48148422's avatar
weixin_48148422 已提交
15 16 17 18 19 20 21 22 23 24 25
#if 0

  int nRows = taos_fetch_block(res, &row);
  for (int i = 0; i < nRows; i++) {
    char temp[256];
    taos_print_row(temp, row + i, fields, num_fields);
    puts(temp);
  }

#else

26 27 28 29 30
  while ((row = taos_fetch_row(res))) {
    char temp[256];
    taos_print_row(temp, row, fields, num_fields);
    puts(temp);
  }
weixin_48148422's avatar
weixin_48148422 已提交
31 32

#endif
33 34 35 36 37 38 39
}

void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
  print_result(res);
}


weixin_48148422's avatar
weixin_48148422 已提交
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
void check_row_count(int line, TAOS_RES* res, int expected) {
  int actual = 0;
  TAOS_ROW    row;
  while ((row = taos_fetch_row(res))) {
    actual++;
  }
  if (actual != expected) {
    printf("line %d: row count mismatch, expected: %d, actual: %d\n", line, expected, actual);
  } else {
    printf("line %d: %d rows consumed as expected\n", line, actual);
  }
}

void run_test(TAOS* taos) {
  taos_query(taos, "drop database test;");
  
  usleep(100000);
  taos_query(taos, "create database test;");
  usleep(100000);
  taos_query(taos, "use test;");
  usleep(100000);
  taos_query(taos, "create table meters(ts timestamp, a int, b binary(20)) tags(loc binary(20), area int);");

  taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:00:00.000', 0, 'china');");
  taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:01:00.000', 0, 'china');");
  taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:02:00.000', 0, 'china');");
  taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:00:00.000', 0, 'china');");
  taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:01:00.000', 0, 'china');");
  taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:02:00.000', 0, 'china');");
  taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:03:00.000', 0, 'china');");
  taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:00:00.000', 0, 'UK');");
  taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:00.000', 0, 'UK');");
  taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:01.000', 0, 'UK');");
  taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:02.000', 0, 'UK');");

  // super tables subscription

  TAOS_SUB* tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
  TAOS_RES* res = taos_consume(tsub);
  check_row_count(__LINE__, res, 11);

  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 0);

  taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:02.001', 0, 'UK');");
  taos_query(taos, "insert into t1 using meters tags('london', 0) values('2020-01-01 00:03:00.001', 0, 'UK');");
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 2);

  taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:03:00.002', 0, 'china');");
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 1);

  // keep progress information and continue previous subscription
  taos_unsubscribe(tsub, 1);
  taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:03:00.000', 0, 'china');");
  tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0);
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 15);

  // don't keep progress information and continue previous subscription
  taos_unsubscribe(tsub, 0);
  tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 15);

  // single meter subscription

  taos_unsubscribe(tsub, 0);
  tsub = taos_subscribe(taos, 0, "test", "select * from t0;", NULL, NULL, 0);
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 4);

  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 0);

  taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:03:00.001', 0, 'china');");
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 1);

  taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:03:00.002', 0, 'china');");
  taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:04:00.000', 0, 'china');");
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 2);

  taos_unsubscribe(tsub, 0);
}


weixin_48148422's avatar
weixin_48148422 已提交
129
int main(int argc, char *argv[]) {
130 131 132
  const char* host = "127.0.0.1";
  const char* user = "root";
  const char* passwd = "taosdata";
133
  const char* sql = "select * from meters;";
weixin_48148422's avatar
weixin_48148422 已提交
134
  const char* topic = "test-multiple";
weixin_48148422's avatar
weixin_48148422 已提交
135
  int async = 1, restart = 0, keep = 1, test = 0;
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
  TAOS_SUB* tsub = NULL;

  for (int i = 1; i < argc; i++) {
    if (strncmp(argv[i], "-h=", 3) == 0) {
      host = argv[i] + 3;
      continue;
    }
    if (strncmp(argv[i], "-u=", 3) == 0) {
      user = argv[i] + 3;
      continue;
    }
    if (strncmp(argv[i], "-p=", 3) == 0) {
      passwd = argv[i] + 3;
      continue;
    }
151 152 153 154 155 156
    if (strcmp(argv[i], "-sync") == 0) {
      async = 0;
      continue;
    }
    if (strcmp(argv[i], "-restart") == 0) {
      restart = 1;
157 158
      continue;
    }
159 160
    if (strcmp(argv[i], "-single") == 0) {
      sql = "select * from t0;";
weixin_48148422's avatar
weixin_48148422 已提交
161 162 163 164 165
      topic = "test-single";
      continue;
    }
    if (strcmp(argv[i], "-nokeep") == 0) {
      keep = 0;
166 167
      continue;
    }
168 169 170 171 172
    if (strncmp(argv[i], "-sql=", 5) == 0) {
      sql = argv[i] + 5;
      topic = "test-custom";
      continue;
    }
weixin_48148422's avatar
weixin_48148422 已提交
173 174 175 176
    if (strcmp(argv[i], "-test") == 0) {
      test = 1;
      continue;
    }
177 178
  }

weixin_48148422's avatar
weixin_48148422 已提交
179 180
  // init TAOS
  taos_init();
H
hzcheng 已提交
181

182
  TAOS* taos = taos_connect(host, user, passwd, "test", 0);
weixin_48148422's avatar
weixin_48148422 已提交
183 184
  if (taos == NULL) {
    printf("failed to connect to db, reason:%s\n", taos_errstr(taos));
H
hzcheng 已提交
185 186 187
    exit(1);
  }

weixin_48148422's avatar
weixin_48148422 已提交
188 189 190 191 192 193
  if (test) {
    run_test(taos);
    taos_close(taos);
    exit(0);
  }

194
  if (async) {
weixin_48148422's avatar
weixin_48148422 已提交
195
    tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, NULL, 1000);
196
  } else {
weixin_48148422's avatar
weixin_48148422 已提交
197
    tsub = taos_subscribe(taos, restart, topic, sql, NULL, NULL, 0);
198 199 200
  }

  if (tsub == NULL) {
weixin_48148422's avatar
weixin_48148422 已提交
201 202 203
    printf("failed to create subscription.\n");
    exit(0);
  } 
H
hzcheng 已提交
204

205 206 207
  if (async) {
    getchar();
  } else while(1) {
weixin_48148422's avatar
weixin_48148422 已提交
208
    TAOS_RES* res = taos_consume(tsub);
209 210
    print_result(res);
    getchar();
H
hzcheng 已提交
211 212
  }

weixin_48148422's avatar
weixin_48148422 已提交
213
  taos_unsubscribe(tsub, keep);
weixin_48148422's avatar
weixin_48148422 已提交
214
  taos_close(taos);
H
hzcheng 已提交
215 216 217 218

  return 0;
}