tmq.c 8.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "taos.h"

23
static int running = 1;
24 25

static int32_t msg_process(TAOS_RES* msg) {
L
Liu Jicong 已提交
26
  char    buf[1024];
27 28 29
  int32_t rows = 0;

  const char* topicName = tmq_get_topic_name(msg);
L
Liu Jicong 已提交
30 31
  const char* dbName = tmq_get_db_name(msg);
  int32_t     vgroupId = tmq_get_vgroup_id(msg);
32 33 34 35 36 37 38 39 40

  printf("topic: %s\n", topicName);
  printf("db: %s\n", dbName);
  printf("vgroup id: %d\n", vgroupId);

  while (1) {
    TAOS_ROW row = taos_fetch_row(msg);
    if (row == NULL) break;

L
Liu Jicong 已提交
41
    TAOS_FIELD* fields = taos_fetch_fields(msg);
42
    int32_t     numOfFields = taos_field_count(msg);
43 44
    // int32_t*    length = taos_fetch_lengths(msg);
    int32_t precision = taos_result_precision(msg);
L
Liu Jicong 已提交
45
    rows++;
46
    taos_print_row(buf, row, fields, numOfFields);
L
Liu Jicong 已提交
47
    printf("precision: %d, row content: %s\n", precision, buf);
48 49 50 51 52 53 54 55 56 57 58 59 60 61
  }

  return rows;
}

static int32_t init_env() {
  TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
  if (pConn == NULL) {
    return -1;
  }

  TAOS_RES* pRes;
  // drop database if exists
  printf("create database\n");
D
dapan1121 已提交
62 63 64 65 66 67
  pRes = taos_query(pConn, "drop topic topicname");
  if (taos_errno(pRes) != 0) {
    printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes));
  }
  taos_free_result(pRes);

68 69 70 71 72 73 74
  pRes = taos_query(pConn, "drop database if exists tmqdb");
  if (taos_errno(pRes) != 0) {
    printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes));
  }
  taos_free_result(pRes);

  // create database
L
Liu Jicong 已提交
75
  pRes = taos_query(pConn, "create database tmqdb precision 'ns'");
76 77
  if (taos_errno(pRes) != 0) {
    printf("error in create tmqdb, reason:%s\n", taos_errstr(pRes));
78
    goto END;
79 80 81 82 83
  }
  taos_free_result(pRes);

  // create super table
  printf("create super table\n");
L
Liu Jicong 已提交
84 85
  pRes = taos_query(
      pConn, "create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))");
86 87
  if (taos_errno(pRes) != 0) {
    printf("failed to create super table stb, reason:%s\n", taos_errstr(pRes));
88
    goto END;
89 90 91 92 93 94 95 96
  }
  taos_free_result(pRes);

  // create sub tables
  printf("create sub tables\n");
  pRes = taos_query(pConn, "create table tmqdb.ctb0 using tmqdb.stb tags(0, 'subtable0')");
  if (taos_errno(pRes) != 0) {
    printf("failed to create super table ctb0, reason:%s\n", taos_errstr(pRes));
97
    goto END;
98 99 100 101 102 103
  }
  taos_free_result(pRes);

  pRes = taos_query(pConn, "create table tmqdb.ctb1 using tmqdb.stb tags(1, 'subtable1')");
  if (taos_errno(pRes) != 0) {
    printf("failed to create super table ctb1, reason:%s\n", taos_errstr(pRes));
104
    goto END;
105 106 107 108 109 110
  }
  taos_free_result(pRes);

  pRes = taos_query(pConn, "create table tmqdb.ctb2 using tmqdb.stb tags(2, 'subtable2')");
  if (taos_errno(pRes) != 0) {
    printf("failed to create super table ctb2, reason:%s\n", taos_errstr(pRes));
111
    goto END;
112 113 114 115 116 117
  }
  taos_free_result(pRes);

  pRes = taos_query(pConn, "create table tmqdb.ctb3 using tmqdb.stb tags(3, 'subtable3')");
  if (taos_errno(pRes) != 0) {
    printf("failed to create super table ctb3, reason:%s\n", taos_errstr(pRes));
118
    goto END;
119 120 121 122 123 124 125 126
  }
  taos_free_result(pRes);

  // insert data
  printf("insert data into sub tables\n");
  pRes = taos_query(pConn, "insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00')");
  if (taos_errno(pRes) != 0) {
    printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
127
    goto END;
128 129 130 131 132 133
  }
  taos_free_result(pRes);

  pRes = taos_query(pConn, "insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11')");
  if (taos_errno(pRes) != 0) {
    printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
134
    goto END;
135 136 137 138 139 140
  }
  taos_free_result(pRes);

  pRes = taos_query(pConn, "insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22')");
  if (taos_errno(pRes) != 0) {
    printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
141
    goto END;
142 143 144 145 146 147
  }
  taos_free_result(pRes);

  pRes = taos_query(pConn, "insert into tmqdb.ctb3 values(now, 3, 3, 'a1')(now+1s, 33, 33, 'a33')");
  if (taos_errno(pRes) != 0) {
    printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
148
    goto END;
149 150 151 152
  }
  taos_free_result(pRes);
  taos_close(pConn);
  return 0;
153 154 155 156 157

END:
  taos_free_result(pRes);
  taos_close(pConn);
  return -1;
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
}

int32_t create_topic() {
  printf("create topic\n");
  TAOS_RES* pRes;
  TAOS*     pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
  if (pConn == NULL) {
    return -1;
  }

  pRes = taos_query(pConn, "use tmqdb");
  if (taos_errno(pRes) != 0) {
    printf("error in use tmqdb, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

175
  pRes = taos_query(pConn, "create topic topicname as select ts, c1, c2, c3, tbname from tmqdb.stb where c1 > 1");
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
  if (taos_errno(pRes) != 0) {
    printf("failed to create topic topicname, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

  taos_close(pConn);
  return 0;
}

void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
  printf("tmq_commit_cb_print() code: %d, tmq: %p, param: %p\n", code, tmq, param);
}

tmq_t* build_consumer() {
  tmq_conf_res_t code;
X
Xiaoyu Wang 已提交
192
  tmq_t*         tmq = NULL;
H
Haojun Liao 已提交
193

X
Xiaoyu Wang 已提交
194
  tmq_conf_t* conf = tmq_conf_new();
195
  code = tmq_conf_set(conf, "enable.auto.commit", "true");
H
Haojun Liao 已提交
196
  if (TMQ_CONF_OK != code) {
H
Haojun Liao 已提交
197 198
    tmq_conf_destroy(conf);
    return NULL;
H
Haojun Liao 已提交
199
  }
200
  code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
H
Haojun Liao 已提交
201
  if (TMQ_CONF_OK != code) {
H
Haojun Liao 已提交
202 203
    tmq_conf_destroy(conf);
    return NULL;
H
Haojun Liao 已提交
204
  }
205
  code = tmq_conf_set(conf, "group.id", "cgrpName");
H
Haojun Liao 已提交
206
  if (TMQ_CONF_OK != code) {
H
Haojun Liao 已提交
207 208
    tmq_conf_destroy(conf);
    return NULL;
H
Haojun Liao 已提交
209
  }
L
Liu Jicong 已提交
210
  code = tmq_conf_set(conf, "client.id", "user defined name");
H
Haojun Liao 已提交
211
  if (TMQ_CONF_OK != code) {
H
Haojun Liao 已提交
212 213
    tmq_conf_destroy(conf);
    return NULL;
H
Haojun Liao 已提交
214
  }
215
  code = tmq_conf_set(conf, "td.connect.user", "root");
H
Haojun Liao 已提交
216
  if (TMQ_CONF_OK != code) {
H
Haojun Liao 已提交
217 218
    tmq_conf_destroy(conf);
    return NULL;
H
Haojun Liao 已提交
219
  }
220
  code = tmq_conf_set(conf, "td.connect.pass", "taosdata");
H
Haojun Liao 已提交
221
  if (TMQ_CONF_OK != code) {
H
Haojun Liao 已提交
222 223
    tmq_conf_destroy(conf);
    return NULL;
H
Haojun Liao 已提交
224
  }
L
Liu Jicong 已提交
225
  code = tmq_conf_set(conf, "auto.offset.reset", "earliest");
H
Haojun Liao 已提交
226
  if (TMQ_CONF_OK != code) {
H
Haojun Liao 已提交
227 228
    tmq_conf_destroy(conf);
    return NULL;
H
Haojun Liao 已提交
229
  }
230
  code = tmq_conf_set(conf, "experimental.snapshot.enable", "false");
H
Haojun Liao 已提交
231
  if (TMQ_CONF_OK != code) {
H
Haojun Liao 已提交
232 233
    tmq_conf_destroy(conf);
    return NULL;
H
Haojun Liao 已提交
234
  }
L
Liu Jicong 已提交
235 236

  tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
H
Haojun Liao 已提交
237
  tmq = tmq_consumer_new(conf, NULL, 0);
L
Liu Jicong 已提交
238

X
Xiaoyu Wang 已提交
239
_end:
240 241 242 243 244 245
  tmq_conf_destroy(conf);
  return tmq;
}

tmq_list_t* build_topic_list() {
  tmq_list_t* topicList = tmq_list_new();
L
Liu Jicong 已提交
246
  int32_t     code = tmq_list_append(topicList, "topicname");
247
  if (code) {
G
Ganlin Zhao 已提交
248
    tmq_list_destroy(topicList);
249 250 251 252 253
    return NULL;
  }
  return topicList;
}

254
void basic_consume_loop(tmq_t* tmq) {
255 256
  int32_t totalRows = 0;
  int32_t msgCnt = 0;
L
Liu Jicong 已提交
257
  int32_t timeout = 5000;
258
  while (running) {
L
Liu Jicong 已提交
259
    TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout);
260 261 262 263
    if (tmqmsg) {
      msgCnt++;
      totalRows += msg_process(tmqmsg);
      taos_free_result(tmqmsg);
264 265
    } else {
      break;
L
Liu Jicong 已提交
266
    }
267
  }
L
Liu Jicong 已提交
268

269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
  fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
}

int main(int argc, char* argv[]) {
  int32_t code;

  if (init_env() < 0) {
    return -1;
  }

  if (create_topic() < 0) {
    return -1;
  }

  tmq_t* tmq = build_consumer();
  if (NULL == tmq) {
D
dapan1121 已提交
285
    fprintf(stderr, "build_consumer() fail!\n");
286 287 288 289 290 291
    return -1;
  }

  tmq_list_t* topic_list = build_topic_list();
  if (NULL == topic_list) {
    return -1;
L
Liu Jicong 已提交
292 293
  }

294
  if ((code = tmq_subscribe(tmq, topic_list))) {
D
dapan1121 已提交
295
    fprintf(stderr, "Failed to tmq_subscribe(): %s\n", tmq_err2str(code));
296
  }
297 298 299
  tmq_list_destroy(topic_list);

  basic_consume_loop(tmq);
300 301 302

  code = tmq_consumer_close(tmq);
  if (code) {
D
dapan1121 已提交
303
    fprintf(stderr, "Failed to close consumer: %s\n", tmq_err2str(code));
L
Liu Jicong 已提交
304
  } else {
D
dapan1121 已提交
305
    fprintf(stderr, "Consumer closed\n");
306
  }
L
Liu Jicong 已提交
307

308 309
  return 0;
}