tmq.c 9.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "taos.h"

23 24
static int  running = 1;
const char* topic_name = "topicname";
25 26

static int32_t msg_process(TAOS_RES* msg) {
L
Liu Jicong 已提交
27
  char    buf[1024];
28 29 30
  int32_t rows = 0;

  const char* topicName = tmq_get_topic_name(msg);
L
Liu Jicong 已提交
31 32
  const char* dbName = tmq_get_db_name(msg);
  int32_t     vgroupId = tmq_get_vgroup_id(msg);
33 34 35 36 37 38 39 40 41

  printf("topic: %s\n", topicName);
  printf("db: %s\n", dbName);
  printf("vgroup id: %d\n", vgroupId);

  while (1) {
    TAOS_ROW row = taos_fetch_row(msg);
    if (row == NULL) break;

L
Liu Jicong 已提交
42
    TAOS_FIELD* fields = taos_fetch_fields(msg);
43
    int32_t     numOfFields = taos_field_count(msg);
44 45
    // int32_t*    length = taos_fetch_lengths(msg);
    int32_t precision = taos_result_precision(msg);
L
Liu Jicong 已提交
46
    rows++;
47
    taos_print_row(buf, row, fields, numOfFields);
L
Liu Jicong 已提交
48
    printf("precision: %d, row content: %s\n", precision, buf);
49 50 51 52 53 54 55 56 57 58 59 60 61 62
  }

  return rows;
}

static int32_t init_env() {
  TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
  if (pConn == NULL) {
    return -1;
  }

  TAOS_RES* pRes;
  // drop database if exists
  printf("create database\n");
D
dapan1121 已提交
63 64
  pRes = taos_query(pConn, "drop topic topicname");
  if (taos_errno(pRes) != 0) {
wmmhello's avatar
wmmhello 已提交
65
    printf("error in drop topicname, reason:%s\n", taos_errstr(pRes));
D
dapan1121 已提交
66 67 68
  }
  taos_free_result(pRes);

69 70 71 72 73 74 75
  pRes = taos_query(pConn, "drop database if exists tmqdb");
  if (taos_errno(pRes) != 0) {
    printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes));
  }
  taos_free_result(pRes);

  // create database
T
t_max 已提交
76
  pRes = taos_query(pConn, "create database tmqdb precision 'ns' WAL_RETENTION_PERIOD 3600");
77 78
  if (taos_errno(pRes) != 0) {
    printf("error in create tmqdb, reason:%s\n", taos_errstr(pRes));
79
    goto END;
80 81 82 83 84
  }
  taos_free_result(pRes);

  // create super table
  printf("create super table\n");
L
Liu Jicong 已提交
85 86
  pRes = taos_query(
      pConn, "create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))");
87 88
  if (taos_errno(pRes) != 0) {
    printf("failed to create super table stb, reason:%s\n", taos_errstr(pRes));
89
    goto END;
90 91 92 93 94 95 96 97
  }
  taos_free_result(pRes);

  // create sub tables
  printf("create sub tables\n");
  pRes = taos_query(pConn, "create table tmqdb.ctb0 using tmqdb.stb tags(0, 'subtable0')");
  if (taos_errno(pRes) != 0) {
    printf("failed to create super table ctb0, reason:%s\n", taos_errstr(pRes));
98
    goto END;
99 100 101 102 103 104
  }
  taos_free_result(pRes);

  pRes = taos_query(pConn, "create table tmqdb.ctb1 using tmqdb.stb tags(1, 'subtable1')");
  if (taos_errno(pRes) != 0) {
    printf("failed to create super table ctb1, reason:%s\n", taos_errstr(pRes));
105
    goto END;
106 107 108 109 110 111
  }
  taos_free_result(pRes);

  pRes = taos_query(pConn, "create table tmqdb.ctb2 using tmqdb.stb tags(2, 'subtable2')");
  if (taos_errno(pRes) != 0) {
    printf("failed to create super table ctb2, reason:%s\n", taos_errstr(pRes));
112
    goto END;
113 114 115 116 117 118
  }
  taos_free_result(pRes);

  pRes = taos_query(pConn, "create table tmqdb.ctb3 using tmqdb.stb tags(3, 'subtable3')");
  if (taos_errno(pRes) != 0) {
    printf("failed to create super table ctb3, reason:%s\n", taos_errstr(pRes));
119
    goto END;
120 121 122 123 124 125 126 127
  }
  taos_free_result(pRes);

  // insert data
  printf("insert data into sub tables\n");
  pRes = taos_query(pConn, "insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00')");
  if (taos_errno(pRes) != 0) {
    printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
128
    goto END;
129 130 131 132 133 134
  }
  taos_free_result(pRes);

  pRes = taos_query(pConn, "insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11')");
  if (taos_errno(pRes) != 0) {
    printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
135
    goto END;
136 137 138 139 140 141
  }
  taos_free_result(pRes);

  pRes = taos_query(pConn, "insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22')");
  if (taos_errno(pRes) != 0) {
    printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
142
    goto END;
143 144 145 146 147 148
  }
  taos_free_result(pRes);

  pRes = taos_query(pConn, "insert into tmqdb.ctb3 values(now, 3, 3, 'a1')(now+1s, 33, 33, 'a33')");
  if (taos_errno(pRes) != 0) {
    printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
149
    goto END;
150 151 152 153
  }
  taos_free_result(pRes);
  taos_close(pConn);
  return 0;
154 155 156 157 158

END:
  taos_free_result(pRes);
  taos_close(pConn);
  return -1;
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
}

int32_t create_topic() {
  printf("create topic\n");
  TAOS_RES* pRes;
  TAOS*     pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
  if (pConn == NULL) {
    return -1;
  }

  pRes = taos_query(pConn, "use tmqdb");
  if (taos_errno(pRes) != 0) {
    printf("error in use tmqdb, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

176
  pRes = taos_query(pConn, "create topic topicname as select ts, c1, c2, c3, tbname from tmqdb.stb where c1 > 1");
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
  if (taos_errno(pRes) != 0) {
    printf("failed to create topic topicname, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

  taos_close(pConn);
  return 0;
}

void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
  printf("tmq_commit_cb_print() code: %d, tmq: %p, param: %p\n", code, tmq, param);
}

tmq_t* build_consumer() {
  tmq_conf_res_t code;
X
Xiaoyu Wang 已提交
193
  tmq_t*         tmq = NULL;
H
Haojun Liao 已提交
194

X
Xiaoyu Wang 已提交
195
  tmq_conf_t* conf = tmq_conf_new();
196
  code = tmq_conf_set(conf, "enable.auto.commit", "true");
H
Haojun Liao 已提交
197
  if (TMQ_CONF_OK != code) {
H
Haojun Liao 已提交
198 199
    tmq_conf_destroy(conf);
    return NULL;
H
Haojun Liao 已提交
200
  }
201
  code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
H
Haojun Liao 已提交
202
  if (TMQ_CONF_OK != code) {
H
Haojun Liao 已提交
203 204
    tmq_conf_destroy(conf);
    return NULL;
H
Haojun Liao 已提交
205
  }
206
  code = tmq_conf_set(conf, "group.id", "cgrpName");
H
Haojun Liao 已提交
207
  if (TMQ_CONF_OK != code) {
H
Haojun Liao 已提交
208 209
    tmq_conf_destroy(conf);
    return NULL;
H
Haojun Liao 已提交
210
  }
L
Liu Jicong 已提交
211
  code = tmq_conf_set(conf, "client.id", "user defined name");
H
Haojun Liao 已提交
212
  if (TMQ_CONF_OK != code) {
H
Haojun Liao 已提交
213 214
    tmq_conf_destroy(conf);
    return NULL;
H
Haojun Liao 已提交
215
  }
216
  code = tmq_conf_set(conf, "td.connect.user", "root");
H
Haojun Liao 已提交
217
  if (TMQ_CONF_OK != code) {
H
Haojun Liao 已提交
218 219
    tmq_conf_destroy(conf);
    return NULL;
H
Haojun Liao 已提交
220
  }
221
  code = tmq_conf_set(conf, "td.connect.pass", "taosdata");
H
Haojun Liao 已提交
222
  if (TMQ_CONF_OK != code) {
H
Haojun Liao 已提交
223 224
    tmq_conf_destroy(conf);
    return NULL;
H
Haojun Liao 已提交
225
  }
L
Liu Jicong 已提交
226
  code = tmq_conf_set(conf, "auto.offset.reset", "earliest");
H
Haojun Liao 已提交
227
  if (TMQ_CONF_OK != code) {
H
Haojun Liao 已提交
228 229
    tmq_conf_destroy(conf);
    return NULL;
H
Haojun Liao 已提交
230
  }
L
Liu Jicong 已提交
231 232

  tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
H
Haojun Liao 已提交
233
  tmq = tmq_consumer_new(conf, NULL, 0);
L
Liu Jicong 已提交
234

X
Xiaoyu Wang 已提交
235
_end:
236 237 238 239 240 241
  tmq_conf_destroy(conf);
  return tmq;
}

tmq_list_t* build_topic_list() {
  tmq_list_t* topicList = tmq_list_new();
242
  int32_t     code = tmq_list_append(topicList, topic_name);
243
  if (code) {
G
Ganlin Zhao 已提交
244
    tmq_list_destroy(topicList);
245 246 247 248 249
    return NULL;
  }
  return topicList;
}

250
void basic_consume_loop(tmq_t* tmq) {
251 252
  int32_t totalRows = 0;
  int32_t msgCnt = 0;
L
Liu Jicong 已提交
253
  int32_t timeout = 5000;
254
  while (running) {
L
Liu Jicong 已提交
255
    TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout);
256 257 258 259
    if (tmqmsg) {
      msgCnt++;
      totalRows += msg_process(tmqmsg);
      taos_free_result(tmqmsg);
260 261
    } else {
      break;
L
Liu Jicong 已提交
262
    }
263
  }
L
Liu Jicong 已提交
264

265 266 267
  fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
}

268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
void consume_repeatly(tmq_t* tmq) {
  int32_t               numOfAssignment = 0;
  tmq_topic_assignment* pAssign = NULL;

  int32_t code = tmq_get_topic_assignment(tmq, topic_name, &pAssign, &numOfAssignment);
  if (code != 0) {
    fprintf(stderr, "failed to get assignment, reason:%s", tmq_err2str(code));
  }

  // seek to the earliest offset
  for(int32_t i = 0; i < numOfAssignment; ++i) {
    tmq_topic_assignment* p = &pAssign[i];

    code = tmq_offset_seek(tmq, topic_name, p->vgId, p->begin);
    if (code != 0) {
      fprintf(stderr, "failed to seek to %ld, reason:%s", p->begin, tmq_err2str(code));
    }
  }

T
t_max 已提交
287
  tmq_free_assignment(pAssign);
288 289 290 291 292

  // let's do it again
  basic_consume_loop(tmq);
}

293 294 295 296 297 298 299 300 301 302 303 304 305
int main(int argc, char* argv[]) {
  int32_t code;

  if (init_env() < 0) {
    return -1;
  }

  if (create_topic() < 0) {
    return -1;
  }

  tmq_t* tmq = build_consumer();
  if (NULL == tmq) {
D
dapan1121 已提交
306
    fprintf(stderr, "build_consumer() fail!\n");
307 308 309 310 311 312
    return -1;
  }

  tmq_list_t* topic_list = build_topic_list();
  if (NULL == topic_list) {
    return -1;
L
Liu Jicong 已提交
313 314
  }

315
  if ((code = tmq_subscribe(tmq, topic_list))) {
D
dapan1121 已提交
316
    fprintf(stderr, "Failed to tmq_subscribe(): %s\n", tmq_err2str(code));
317
  }
318

319 320 321
  tmq_list_destroy(topic_list);

  basic_consume_loop(tmq);
322

323 324
  consume_repeatly(tmq);

325 326
  code = tmq_consumer_close(tmq);
  if (code) {
D
dapan1121 已提交
327
    fprintf(stderr, "Failed to close consumer: %s\n", tmq_err2str(code));
L
Liu Jicong 已提交
328
  } else {
D
dapan1121 已提交
329
    fprintf(stderr, "Consumer closed\n");
330
  }
L
Liu Jicong 已提交
331

332 333
  return 0;
}