subscribe.c 6.5 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6
// sample code for TDengine subscribe/consume API
// to compile: gcc -o subscribe subscribe.c -ltaos

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
7
#include <taos.h>  // include TDengine header file
weixin_48148422's avatar
weixin_48148422 已提交
8
#include <unistd.h>
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26

void print_result(TAOS_RES* res) {
  TAOS_ROW    row;
  int         num_fields = taos_num_fields(res);
  TAOS_FIELD* fields = taos_fetch_fields(res);

  while ((row = taos_fetch_row(res))) {
    char temp[256];
    taos_print_row(temp, row, fields, num_fields);
    puts(temp);
  }
}

void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
  print_result(res);
}


weixin_48148422's avatar
weixin_48148422 已提交
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
void check_row_count(int line, TAOS_RES* res, int expected) {
  int actual = 0;
  TAOS_ROW    row;
  while ((row = taos_fetch_row(res))) {
    actual++;
  }
  if (actual != expected) {
    printf("line %d: row count mismatch, expected: %d, actual: %d\n", line, expected, actual);
  } else {
    printf("line %d: %d rows consumed as expected\n", line, actual);
  }
}

void run_test(TAOS* taos) {
  taos_query(taos, "drop database test;");
  
  usleep(100000);
  taos_query(taos, "create database test;");
  usleep(100000);
  taos_query(taos, "use test;");
  usleep(100000);
  taos_query(taos, "create table meters(ts timestamp, a int, b binary(20)) tags(loc binary(20), area int);");

  taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:00:00.000', 0, 'china');");
  taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:01:00.000', 0, 'china');");
  taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:02:00.000', 0, 'china');");
  taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:00:00.000', 0, 'china');");
  taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:01:00.000', 0, 'china');");
  taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:02:00.000', 0, 'china');");
  taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:03:00.000', 0, 'china');");
  taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:00:00.000', 0, 'UK');");
  taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:00.000', 0, 'UK');");
  taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:01.000', 0, 'UK');");
  taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:02.000', 0, 'UK');");

  // super tables subscription

  TAOS_SUB* tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
  TAOS_RES* res = taos_consume(tsub);
  check_row_count(__LINE__, res, 11);

  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 0);

  taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:02.001', 0, 'UK');");
  taos_query(taos, "insert into t1 using meters tags('london', 0) values('2020-01-01 00:03:00.001', 0, 'UK');");
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 2);

  taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:03:00.002', 0, 'china');");
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 1);

  // keep progress information and continue previous subscription
  taos_unsubscribe(tsub, 1);
  taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:03:00.000', 0, 'china');");
  tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0);
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 15);

  // don't keep progress information and continue previous subscription
  taos_unsubscribe(tsub, 0);
  tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 15);

  // single meter subscription

  taos_unsubscribe(tsub, 0);
  tsub = taos_subscribe(taos, 0, "test", "select * from t0;", NULL, NULL, 0);
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 4);

  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 0);

  taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:03:00.001', 0, 'china');");
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 1);

  taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:03:00.002', 0, 'china');");
  taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:04:00.000', 0, 'china');");
  res = taos_consume(tsub);
  check_row_count(__LINE__, res, 2);

  taos_unsubscribe(tsub, 0);
}


weixin_48148422's avatar
weixin_48148422 已提交
116
int main(int argc, char *argv[]) {
117 118 119
  const char* host = "127.0.0.1";
  const char* user = "root";
  const char* passwd = "taosdata";
120
  const char* sql = "select * from meters;";
weixin_48148422's avatar
weixin_48148422 已提交
121
  const char* topic = "test-multiple";
weixin_48148422's avatar
weixin_48148422 已提交
122
  int async = 1, restart = 0, keep = 1, test = 0;
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
  TAOS_SUB* tsub = NULL;

  for (int i = 1; i < argc; i++) {
    if (strncmp(argv[i], "-h=", 3) == 0) {
      host = argv[i] + 3;
      continue;
    }
    if (strncmp(argv[i], "-u=", 3) == 0) {
      user = argv[i] + 3;
      continue;
    }
    if (strncmp(argv[i], "-p=", 3) == 0) {
      passwd = argv[i] + 3;
      continue;
    }
138 139 140 141 142 143
    if (strcmp(argv[i], "-sync") == 0) {
      async = 0;
      continue;
    }
    if (strcmp(argv[i], "-restart") == 0) {
      restart = 1;
144 145
      continue;
    }
146 147
    if (strcmp(argv[i], "-single") == 0) {
      sql = "select * from t0;";
weixin_48148422's avatar
weixin_48148422 已提交
148 149 150 151 152
      topic = "test-single";
      continue;
    }
    if (strcmp(argv[i], "-nokeep") == 0) {
      keep = 0;
153 154
      continue;
    }
155 156 157 158 159
    if (strncmp(argv[i], "-sql=", 5) == 0) {
      sql = argv[i] + 5;
      topic = "test-custom";
      continue;
    }
weixin_48148422's avatar
weixin_48148422 已提交
160 161 162 163
    if (strcmp(argv[i], "-test") == 0) {
      test = 1;
      continue;
    }
164 165
  }

weixin_48148422's avatar
weixin_48148422 已提交
166 167
  // init TAOS
  taos_init();
H
hzcheng 已提交
168

169
  TAOS* taos = taos_connect(host, user, passwd, "test", 0);
weixin_48148422's avatar
weixin_48148422 已提交
170 171
  if (taos == NULL) {
    printf("failed to connect to db, reason:%s\n", taos_errstr(taos));
H
hzcheng 已提交
172 173 174
    exit(1);
  }

weixin_48148422's avatar
weixin_48148422 已提交
175 176 177 178 179 180
  if (test) {
    run_test(taos);
    taos_close(taos);
    exit(0);
  }

181
  if (async) {
weixin_48148422's avatar
weixin_48148422 已提交
182
    tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, NULL, 1000);
183
  } else {
weixin_48148422's avatar
weixin_48148422 已提交
184
    tsub = taos_subscribe(taos, restart, topic, sql, NULL, NULL, 0);
185 186 187
  }

  if (tsub == NULL) {
weixin_48148422's avatar
weixin_48148422 已提交
188 189 190
    printf("failed to create subscription.\n");
    exit(0);
  } 
H
hzcheng 已提交
191

192 193 194
  if (async) {
    getchar();
  } else while(1) {
weixin_48148422's avatar
weixin_48148422 已提交
195
    TAOS_RES* res = taos_consume(tsub);
196 197
    print_result(res);
    getchar();
H
hzcheng 已提交
198 199
  }

weixin_48148422's avatar
weixin_48148422 已提交
200
  taos_unsubscribe(tsub, keep);
weixin_48148422's avatar
weixin_48148422 已提交
201
  taos_close(taos);
H
hzcheng 已提交
202 203 204 205

  return 0;
}