tmq.c 8.0 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include <assert.h>
L
Liu Jicong 已提交
17 18
#include <stdio.h>
#include <string.h>
L
Liu Jicong 已提交
19
#include <time.h>
L
Liu Jicong 已提交
20
#include "taos.h"
21
#include "osSleep.h"
L
Liu Jicong 已提交
22

L
Liu Jicong 已提交
23 24 25
static int  running = 1;
static void msg_process(TAOS_RES* msg) {
  char buf[1024];
L
Liu Jicong 已提交
26
  memset(buf, 0, 1024);
L
Liu Jicong 已提交
27 28 29 30 31 32 33 34 35 36 37
  printf("topic: %s\n", tmq_get_topic_name(msg));
  printf("vg:%d\n", tmq_get_vgroup_id(msg));
  while (1) {
    TAOS_ROW row = taos_fetch_row(msg);
    if (row == NULL) break;
    TAOS_FIELD* fields = taos_fetch_fields(msg);
    int32_t     numOfFields = taos_field_count(msg);
    taos_print_row(buf, row, fields, numOfFields);
    printf("%s\n", buf);
  }
}
L
Liu Jicong 已提交
38 39 40 41 42 43 44

int32_t init_env() {
  TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
  if (pConn == NULL) {
    return -1;
  }

L
fix  
Liu Jicong 已提交
45
  TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2");
L
Liu Jicong 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58
  if (taos_errno(pRes) != 0) {
    printf("error in create db, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

  pRes = taos_query(pConn, "use abc1");
  if (taos_errno(pRes) != 0) {
    printf("error in use db, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

L
Liu Jicong 已提交
59 60
  pRes =
      taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)");
61
  if (taos_errno(pRes) != 0) {
L
Liu Jicong 已提交
62
    printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
63 64 65 66
    return -1;
  }
  taos_free_result(pRes);

L
fix  
Liu Jicong 已提交
67
  pRes = taos_query(pConn, "create table if not exists ct0 using st1 tags(1000)");
68 69 70 71 72 73
  if (taos_errno(pRes) != 0) {
    printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

L
fix  
Liu Jicong 已提交
74
  pRes = taos_query(pConn, "create table if not exists ct1 using st1 tags(2000)");
75 76 77 78
  if (taos_errno(pRes) != 0) {
    printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
L
fix  
Liu Jicong 已提交
79 80 81 82 83 84 85

  pRes = taos_query(pConn, "create table if not exists ct3 using st1 tags(3000)");
  if (taos_errno(pRes) != 0) {
    printf("failed to create child table tu3, reason:%s\n", taos_errstr(pRes));
    return -1;
  }

86
  taos_free_result(pRes);
L
Liu Jicong 已提交
87 88 89 90
  return 0;
}

int32_t create_topic() {
L
Liu Jicong 已提交
91
  printf("create topic\n");
L
Liu Jicong 已提交
92 93 94 95 96 97 98 99 100 101 102 103
  TAOS_RES* pRes;
  TAOS*     pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
  if (pConn == NULL) {
    return -1;
  }

  pRes = taos_query(pConn, "use abc1");
  if (taos_errno(pRes) != 0) {
    printf("error in use db, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);
104

L
Liu Jicong 已提交
105 106
  pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");
  /*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");*/
L
fix  
Liu Jicong 已提交
107 108 109 110 111 112 113 114
  if (taos_errno(pRes) != 0) {
    printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

#if 0
  pRes = taos_query(pConn, "insert into tu1 values(now, 1, 1.0, 'bi1')");
115
  if (taos_errno(pRes) != 0) {
L
fix  
Liu Jicong 已提交
116
    printf("failed to insert, reason:%s\n", taos_errstr(pRes));
117 118 119
    return -1;
  }
  taos_free_result(pRes);
L
fix  
Liu Jicong 已提交
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
  pRes = taos_query(pConn, "insert into tu1 values(now+1d, 1, 1.0, 'bi1')");
  if (taos_errno(pRes) != 0) {
    printf("failed to insert, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);
  pRes = taos_query(pConn, "insert into tu2 values(now, 2, 2.0, 'bi2')");
  if (taos_errno(pRes) != 0) {
    printf("failed to insert, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);
  pRes = taos_query(pConn, "insert into tu2 values(now+1d, 2, 2.0, 'bi2')");
  if (taos_errno(pRes) != 0) {
    printf("failed to insert, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);
#endif

L
Liu Jicong 已提交
140 141 142 143
  taos_close(pConn);
  return 0;
}

L
Liu Jicong 已提交
144
void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets) {
L
Liu Jicong 已提交
145 146 147
  printf("commit %d\n", resp);
}

L
Liu Jicong 已提交
148
tmq_t* build_consumer() {
L
Liu Jicong 已提交
149
#if 0
L
Liu Jicong 已提交
150 151 152 153 154 155 156 157
  TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
  assert(pConn != NULL);

  TAOS_RES* pRes = taos_query(pConn, "use abc1");
  if (taos_errno(pRes) != 0) {
    printf("error in use db, reason:%s\n", taos_errstr(pRes));
  }
  taos_free_result(pRes);
L
Liu Jicong 已提交
158
#endif
L
Liu Jicong 已提交
159 160 161

  tmq_conf_t* conf = tmq_conf_new();
  tmq_conf_set(conf, "group.id", "tg2");
L
Liu Jicong 已提交
162 163
  tmq_conf_set(conf, "td.connect.user", "root");
  tmq_conf_set(conf, "td.connect.pass", "taosdata");
L
Liu Jicong 已提交
164
  /*tmq_conf_set(conf, "td.connect.db", "abc1");*/
L
Liu Jicong 已提交
165
  tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print);
L
Liu Jicong 已提交
166
  tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
L
Liu Jicong 已提交
167
  assert(tmq);
L
Liu Jicong 已提交
168 169 170 171 172
  return tmq;
}

tmq_list_t* build_topic_list() {
  tmq_list_t* topic_list = tmq_list_new();
L
fix  
Liu Jicong 已提交
173
  tmq_list_append(topic_list, "topic_ctb_column");
L
Liu Jicong 已提交
174 175 176
  return topic_list;
}

177
void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
L
Liu Jicong 已提交
178 179 180 181 182 183 184
  tmq_resp_err_t err;

  if ((err = tmq_subscribe(tmq, topics))) {
    fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
    printf("subscribe err\n");
    return;
  }
L
Liu Jicong 已提交
185
  int32_t cnt = 0;
L
Liu Jicong 已提交
186
  /*clock_t startTime = clock();*/
L
Liu Jicong 已提交
187
  while (running) {
L
Liu Jicong 已提交
188
    TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 500);
L
Liu Jicong 已提交
189
    if (tmqmessage) {
L
Liu Jicong 已提交
190
      cnt++;
L
Liu Jicong 已提交
191 192
      /*printf("get data\n");*/
      /*msg_process(tmqmessage);*/
L
Liu Jicong 已提交
193
      taos_free_result(tmqmessage);
194
      /*} else {*/
L
Liu Jicong 已提交
195
      /*break;*/
L
Liu Jicong 已提交
196 197
    }
  }
L
Liu Jicong 已提交
198 199
  /*clock_t endTime = clock();*/
  /*printf("log cnt: %d %f s\n", cnt, (double)(endTime - startTime) / CLOCKS_PER_SEC);*/
L
Liu Jicong 已提交
200 201 202 203 204 205 206 207

  err = tmq_consumer_close(tmq);
  if (err)
    fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
  else
    fprintf(stderr, "%% Consumer closed\n");
}

208
void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
L
Liu Jicong 已提交
209
  static const int MIN_COMMIT_COUNT = 1;
L
Liu Jicong 已提交
210

211
  int            msg_count = 0;
L
Liu Jicong 已提交
212 213
  tmq_resp_err_t err;

L
Liu Jicong 已提交
214
  if ((err = tmq_subscribe(tmq, topics))) {
L
Liu Jicong 已提交
215 216 217 218 219
    fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
    return;
  }

  while (running) {
L
Liu Jicong 已提交
220
    TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000);
L
Liu Jicong 已提交
221
    if (tmqmessage) {
L
Liu Jicong 已提交
222
      msg_process(tmqmessage);
L
Liu Jicong 已提交
223
      taos_free_result(tmqmessage);
L
Liu Jicong 已提交
224

L
Liu Jicong 已提交
225
      /*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
L
Liu Jicong 已提交
226
    }
227
  }
L
Liu Jicong 已提交
228

L
Liu Jicong 已提交
229
  err = tmq_consumer_close(tmq);
L
Liu Jicong 已提交
230 231 232 233 234 235
  if (err)
    fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
  else
    fprintf(stderr, "%% Consumer closed\n");
}

L
Liu Jicong 已提交
236 237 238 239 240 241 242 243 244 245 246 247
void perf_loop(tmq_t* tmq, tmq_list_t* topics) {
  tmq_resp_err_t err;

  if ((err = tmq_subscribe(tmq, topics))) {
    fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
    printf("subscribe err\n");
    return;
  }
  int32_t batchCnt = 0;
  int32_t skipLogNum = 0;
  clock_t startTime = clock();
  while (running) {
L
Liu Jicong 已提交
248
    TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 500);
L
Liu Jicong 已提交
249 250
    if (tmqmessage) {
      batchCnt++;
L
Liu Jicong 已提交
251
      /*skipLogNum += tmqGetSkipLogNum(tmqmessage);*/
L
Liu Jicong 已提交
252
      /*msg_process(tmqmessage);*/
L
Liu Jicong 已提交
253
      taos_free_result(tmqmessage);
L
Liu Jicong 已提交
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268
    } else {
      break;
    }
  }
  clock_t endTime = clock();
  printf("log batch cnt: %d, skip log cnt: %d, time used:%f s\n", batchCnt, skipLogNum,
         (double)(endTime - startTime) / CLOCKS_PER_SEC);

  err = tmq_consumer_close(tmq);
  if (err)
    fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
  else
    fprintf(stderr, "%% Consumer closed\n");
}

L
Liu Jicong 已提交
269 270 271
int main(int argc, char* argv[]) {
  if (argc > 1) {
    printf("env init\n");
L
Liu Jicong 已提交
272 273 274
    if (init_env() < 0) {
      return -1;
    }
L
fix  
Liu Jicong 已提交
275
    create_topic();
L
Liu Jicong 已提交
276
  }
277
  tmq_t*      tmq = build_consumer();
L
Liu Jicong 已提交
278
  tmq_list_t* topic_list = build_topic_list();
L
Liu Jicong 已提交
279
  /*perf_loop(tmq, topic_list);*/
L
Liu Jicong 已提交
280 281
  /*basic_consume_loop(tmq, topic_list);*/
  sync_consume_loop(tmq, topic_list);
L
Liu Jicong 已提交
282
}