tmq.c 8.6 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include <assert.h>
L
Liu Jicong 已提交
17 18
#include <stdio.h>
#include <string.h>
L
Liu Jicong 已提交
19
#include <time.h>
L
Liu Jicong 已提交
20 21
#include "taos.h"

L
Liu Jicong 已提交
22 23 24
static int  running = 1;
static void msg_process(TAOS_RES* msg) {
  char buf[1024];
L
Liu Jicong 已提交
25
  /*memset(buf, 0, 1024);*/
L
Liu Jicong 已提交
26
  printf("topic: %s\n", tmq_get_topic_name(msg));
L
Liu Jicong 已提交
27
  printf("db: %s\n", tmq_get_db_name(msg));
L
Liu Jicong 已提交
28
  printf("vg: %d\n", tmq_get_vgroup_id(msg));
L
Liu Jicong 已提交
29
  if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) {
30 31 32 33 34
    void*   meta;
    int32_t metaLen;
    tmq_get_raw_meta(msg, &meta, &metaLen);

    printf("meta, len is %d\n", metaLen);
L
Liu Jicong 已提交
35 36
    return;
  }
L
Liu Jicong 已提交
37 38 39 40 41 42 43
  while (1) {
    TAOS_ROW row = taos_fetch_row(msg);
    if (row == NULL) break;
    TAOS_FIELD* fields = taos_fetch_fields(msg);
    int32_t     numOfFields = taos_field_count(msg);
    taos_print_row(buf, row, fields, numOfFields);
    printf("%s\n", buf);
L
Liu Jicong 已提交
44 45 46 47 48

    const char* tbName = tmq_get_table_name(msg);
    if (tbName) {
      printf("from tb: %s\n", tbName);
    }
L
Liu Jicong 已提交
49 50
  }
}
L
Liu Jicong 已提交
51 52 53 54 55 56 57

int32_t init_env() {
  TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
  if (pConn == NULL) {
    return -1;
  }

L
Liu Jicong 已提交
58
  TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
L
Liu Jicong 已提交
59 60 61 62 63 64 65 66 67 68 69 70 71
  if (taos_errno(pRes) != 0) {
    printf("error in create db, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

  pRes = taos_query(pConn, "use abc1");
  if (taos_errno(pRes) != 0) {
    printf("error in use db, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

L
Liu Jicong 已提交
72
  pRes =
L
Liu Jicong 已提交
73
      taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int)");
74
  if (taos_errno(pRes) != 0) {
L
Liu Jicong 已提交
75
    printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
76 77 78 79
    return -1;
  }
  taos_free_result(pRes);

L
fix  
Liu Jicong 已提交
80
  pRes = taos_query(pConn, "create table if not exists ct0 using st1 tags(1000)");
81 82 83 84 85 86
  if (taos_errno(pRes) != 0) {
    printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

L
Liu Jicong 已提交
87 88 89 90 91 92 93
  pRes = taos_query(pConn, "insert into ct0 values(now, 1, 2, 'a')");
  if (taos_errno(pRes) != 0) {
    printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

L
fix  
Liu Jicong 已提交
94
  pRes = taos_query(pConn, "create table if not exists ct1 using st1 tags(2000)");
95
  if (taos_errno(pRes) != 0) {
L
Liu Jicong 已提交
96 97 98 99 100 101 102 103
    printf("failed to create child table ct1, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

  pRes = taos_query(pConn, "insert into ct1 values(now, 3, 4, 'b')");
  if (taos_errno(pRes) != 0) {
    printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes));
104 105
    return -1;
  }
L
Liu Jicong 已提交
106
  taos_free_result(pRes);
L
fix  
Liu Jicong 已提交
107 108 109

  pRes = taos_query(pConn, "create table if not exists ct3 using st1 tags(3000)");
  if (taos_errno(pRes) != 0) {
L
Liu Jicong 已提交
110
    printf("failed to create child table ct3, reason:%s\n", taos_errstr(pRes));
L
fix  
Liu Jicong 已提交
111 112
    return -1;
  }
L
Liu Jicong 已提交
113
  taos_free_result(pRes);
L
fix  
Liu Jicong 已提交
114

L
Liu Jicong 已提交
115 116 117 118 119
  pRes = taos_query(pConn, "insert into ct3 values(now, 5, 6, 'c')");
  if (taos_errno(pRes) != 0) {
    printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
120
  taos_free_result(pRes);
L
Liu Jicong 已提交
121

L
Liu Jicong 已提交
122 123 124 125
  return 0;
}

int32_t create_topic() {
L
Liu Jicong 已提交
126
  printf("create topic\n");
L
Liu Jicong 已提交
127 128 129 130 131 132 133 134 135 136 137 138
  TAOS_RES* pRes;
  TAOS*     pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
  if (pConn == NULL) {
    return -1;
  }

  pRes = taos_query(pConn, "use abc1");
  if (taos_errno(pRes) != 0) {
    printf("error in use db, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);
139

L
Liu Jicong 已提交
140
  /*pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");*/
L
Liu Jicong 已提交
141
  pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");
L
fix  
Liu Jicong 已提交
142 143 144 145 146 147 148 149
  if (taos_errno(pRes) != 0) {
    printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

#if 0
  pRes = taos_query(pConn, "insert into tu1 values(now, 1, 1.0, 'bi1')");
150
  if (taos_errno(pRes) != 0) {
L
fix  
Liu Jicong 已提交
151
    printf("failed to insert, reason:%s\n", taos_errstr(pRes));
152 153 154
    return -1;
  }
  taos_free_result(pRes);
L
fix  
Liu Jicong 已提交
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
  pRes = taos_query(pConn, "insert into tu1 values(now+1d, 1, 1.0, 'bi1')");
  if (taos_errno(pRes) != 0) {
    printf("failed to insert, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);
  pRes = taos_query(pConn, "insert into tu2 values(now, 2, 2.0, 'bi2')");
  if (taos_errno(pRes) != 0) {
    printf("failed to insert, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);
  pRes = taos_query(pConn, "insert into tu2 values(now+1d, 2, 2.0, 'bi2')");
  if (taos_errno(pRes) != 0) {
    printf("failed to insert, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);
#endif

L
Liu Jicong 已提交
175 176 177 178
  taos_close(pConn);
  return 0;
}

L
Liu Jicong 已提交
179 180
void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
  printf("commit %d tmq %p param %p\n", code, tmq, param);
L
Liu Jicong 已提交
181 182
}

L
Liu Jicong 已提交
183
tmq_t* build_consumer() {
L
Liu Jicong 已提交
184
#if 0
L
Liu Jicong 已提交
185 186 187 188 189 190 191 192
  TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
  assert(pConn != NULL);

  TAOS_RES* pRes = taos_query(pConn, "use abc1");
  if (taos_errno(pRes) != 0) {
    printf("error in use db, reason:%s\n", taos_errstr(pRes));
  }
  taos_free_result(pRes);
L
Liu Jicong 已提交
193
#endif
L
Liu Jicong 已提交
194 195 196

  tmq_conf_t* conf = tmq_conf_new();
  tmq_conf_set(conf, "group.id", "tg2");
L
Liu Jicong 已提交
197 198
  tmq_conf_set(conf, "td.connect.user", "root");
  tmq_conf_set(conf, "td.connect.pass", "taosdata");
199
  tmq_conf_set(conf, "msg.with.table.name", "true");
L
Liu Jicong 已提交
200
  tmq_conf_set(conf, "enable.auto.commit", "true");
L
Liu Jicong 已提交
201

L
Liu Jicong 已提交
202
  tmq_conf_set(conf, "experiment.use.snapshot", "false");
L
Liu Jicong 已提交
203

L
Liu Jicong 已提交
204
  tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
L
Liu Jicong 已提交
205
  tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
L
Liu Jicong 已提交
206
  assert(tmq);
L
Liu Jicong 已提交
207
  tmq_conf_destroy(conf);
L
Liu Jicong 已提交
208 209 210 211 212
  return tmq;
}

tmq_list_t* build_topic_list() {
  tmq_list_t* topic_list = tmq_list_new();
L
Liu Jicong 已提交
213 214
  tmq_list_append(topic_list, "topic_ctb_column");
  /*tmq_list_append(topic_list, "tmq_test_db_multi_insert_topic");*/
L
Liu Jicong 已提交
215 216 217
  return topic_list;
}

218
void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
L
Liu Jicong 已提交
219
  int32_t code;
L
Liu Jicong 已提交
220

L
Liu Jicong 已提交
221 222
  if ((code = tmq_subscribe(tmq, topics))) {
    fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code));
L
Liu Jicong 已提交
223 224 225
    printf("subscribe err\n");
    return;
  }
L
Liu Jicong 已提交
226
  int32_t cnt = 0;
L
Liu Jicong 已提交
227
  while (running) {
L
Liu Jicong 已提交
228
    TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, -1);
L
Liu Jicong 已提交
229
    if (tmqmessage) {
L
Liu Jicong 已提交
230
      cnt++;
231
      msg_process(tmqmessage);
L
Liu Jicong 已提交
232
      /*if (cnt >= 2) break;*/
L
Liu Jicong 已提交
233
      /*printf("get data\n");*/
L
Liu Jicong 已提交
234
      taos_free_result(tmqmessage);
235
      /*} else {*/
L
Liu Jicong 已提交
236
      /*break;*/
L
Liu Jicong 已提交
237
      /*tmq_commit_sync(tmq, NULL);*/
L
Liu Jicong 已提交
238 239 240
    }
  }

L
Liu Jicong 已提交
241 242 243
  code = tmq_consumer_close(tmq);
  if (code)
    fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
L
Liu Jicong 已提交
244 245 246 247
  else
    fprintf(stderr, "%% Consumer closed\n");
}

248
void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
L
Liu Jicong 已提交
249
  static const int MIN_COMMIT_COUNT = 1;
L
Liu Jicong 已提交
250

L
Liu Jicong 已提交
251 252
  int     msg_count = 0;
  int32_t code;
L
Liu Jicong 已提交
253

L
Liu Jicong 已提交
254 255
  if ((code = tmq_subscribe(tmq, topics))) {
    fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code));
L
Liu Jicong 已提交
256 257 258
    return;
  }

L
Liu Jicong 已提交
259 260 261 262 263 264 265 266 267
  tmq_list_t* subList = NULL;
  tmq_subscription(tmq, &subList);
  char**  subTopics = tmq_list_to_c_array(subList);
  int32_t sz = tmq_list_get_size(subList);
  printf("subscribed topics: ");
  for (int32_t i = 0; i < sz; i++) {
    printf("%s, ", subTopics[i]);
  }
  printf("\n");
L
Liu Jicong 已提交
268
  tmq_list_destroy(subList);
L
Liu Jicong 已提交
269

L
Liu Jicong 已提交
270
  while (running) {
L
Liu Jicong 已提交
271
    TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000);
L
Liu Jicong 已提交
272
    if (tmqmessage) {
L
Liu Jicong 已提交
273
      msg_process(tmqmessage);
L
Liu Jicong 已提交
274
      taos_free_result(tmqmessage);
L
Liu Jicong 已提交
275

L
Liu Jicong 已提交
276
      /*tmq_commit_sync(tmq, NULL);*/
L
Liu Jicong 已提交
277
      /*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
L
Liu Jicong 已提交
278
    }
279
  }
L
Liu Jicong 已提交
280

L
Liu Jicong 已提交
281 282 283
  code = tmq_consumer_close(tmq);
  if (code)
    fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
L
Liu Jicong 已提交
284 285 286 287
  else
    fprintf(stderr, "%% Consumer closed\n");
}

L
Liu Jicong 已提交
288 289 290
int main(int argc, char* argv[]) {
  if (argc > 1) {
    printf("env init\n");
L
Liu Jicong 已提交
291 292 293
    if (init_env() < 0) {
      return -1;
    }
L
fix  
Liu Jicong 已提交
294
    create_topic();
L
Liu Jicong 已提交
295
  }
296
  tmq_t*      tmq = build_consumer();
L
Liu Jicong 已提交
297
  tmq_list_t* topic_list = build_topic_list();
298 299
  basic_consume_loop(tmq, topic_list);
  /*sync_consume_loop(tmq, topic_list);*/
L
Liu Jicong 已提交
300
}