tmq.c 13.6 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include <assert.h>
L
Liu Jicong 已提交
17 18
#include <stdio.h>
#include <string.h>
L
Liu Jicong 已提交
19
#include <time.h>
L
Liu Jicong 已提交
20
#include "taos.h"
wmmhello's avatar
wmmhello 已提交
21
#include <stdlib.h>
L
Liu Jicong 已提交
22

L
Liu Jicong 已提交
23 24 25
static int  running = 1;
static void msg_process(TAOS_RES* msg) {
  char buf[1024];
L
Liu Jicong 已提交
26
  /*memset(buf, 0, 1024);*/
L
Liu Jicong 已提交
27
  printf("topic: %s\n", tmq_get_topic_name(msg));
L
Liu Jicong 已提交
28
  printf("db: %s\n", tmq_get_db_name(msg));
L
Liu Jicong 已提交
29
  printf("vg: %d\n", tmq_get_vgroup_id(msg));
L
Liu Jicong 已提交
30
  if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) {
wmmhello's avatar
wmmhello 已提交
31 32
    tmq_raw_data *raw = tmq_get_raw_meta(msg);
    if(raw){
wmmhello's avatar
wmmhello 已提交
33
      TAOS* pConn = taos_connect("192.168.1.86", "root", "taosdata", NULL, 0);
wmmhello's avatar
wmmhello 已提交
34 35 36
      if (pConn == NULL) {
        return;
      }
wmmhello's avatar
wmmhello 已提交
37

wmmhello's avatar
wmmhello 已提交
38
      TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 5");
wmmhello's avatar
wmmhello 已提交
39 40 41 42 43 44 45 46 47 48 49 50 51
      if (taos_errno(pRes) != 0) {
        printf("error in create db, reason:%s\n", taos_errstr(pRes));
        return;
      }
      taos_free_result(pRes);

      pRes = taos_query(pConn, "use abc1");
      if (taos_errno(pRes) != 0) {
        printf("error in use db, reason:%s\n", taos_errstr(pRes));
        return;
      }
      taos_free_result(pRes);

wmmhello's avatar
wmmhello 已提交
52 53 54 55
      int32_t ret = taos_write_raw_meta(pConn, raw);
      printf("write raw data: %s\n", tmq_err2str(ret));
      taos_close(pConn);
    }
wmmhello's avatar
wmmhello 已提交
56
    tmq_free_raw_meta(raw);
wmmhello's avatar
wmmhello 已提交
57 58 59 60
    char* result = tmq_get_json_meta(msg);
    if(result){
      printf("meta result: %s\n", result);
    }
wmmhello's avatar
wmmhello 已提交
61
    tmq_free_json_meta(result);
L
Liu Jicong 已提交
62 63
    return;
  }
L
Liu Jicong 已提交
64 65 66 67 68 69 70
  while (1) {
    TAOS_ROW row = taos_fetch_row(msg);
    if (row == NULL) break;
    TAOS_FIELD* fields = taos_fetch_fields(msg);
    int32_t     numOfFields = taos_field_count(msg);
    taos_print_row(buf, row, fields, numOfFields);
    printf("%s\n", buf);
L
Liu Jicong 已提交
71 72 73 74 75

    const char* tbName = tmq_get_table_name(msg);
    if (tbName) {
      printf("from tb: %s\n", tbName);
    }
L
Liu Jicong 已提交
76 77
  }
}
L
Liu Jicong 已提交
78 79 80 81 82 83 84

int32_t init_env() {
  TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
  if (pConn == NULL) {
    return -1;
  }

wmmhello's avatar
wmmhello 已提交
85
  TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 5");
L
Liu Jicong 已提交
86 87 88 89 90 91 92 93 94 95 96 97 98
  if (taos_errno(pRes) != 0) {
    printf("error in create db, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

  pRes = taos_query(pConn, "use abc1");
  if (taos_errno(pRes) != 0) {
    printf("error in use db, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

99
  pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 nchar(8), t4 bool)");
100
  if (taos_errno(pRes) != 0) {
L
Liu Jicong 已提交
101
    printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
102 103 104 105
    return -1;
  }
  taos_free_result(pRes);

wmmhello's avatar
wmmhello 已提交
106
  pRes = taos_query(pConn, "create table if not exists ct0 using st1 tags(1000, \"ttt\", true)");
107 108 109 110 111 112
  if (taos_errno(pRes) != 0) {
    printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

L
Liu Jicong 已提交
113 114 115 116 117 118 119
  pRes = taos_query(pConn, "insert into ct0 values(now, 1, 2, 'a')");
  if (taos_errno(pRes) != 0) {
    printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

wmmhello's avatar
wmmhello 已提交
120
  pRes = taos_query(pConn, "create table if not exists ct1 using st1(t1) tags(2000)");
121
  if (taos_errno(pRes) != 0) {
L
Liu Jicong 已提交
122 123 124 125 126
    printf("failed to create child table ct1, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

wmmhello's avatar
wmmhello 已提交
127
  pRes = taos_query(pConn, "create table if not exists ct2 using st1(t1) tags(NULL)");
wmmhello's avatar
wmmhello 已提交
128 129 130 131 132 133
  if (taos_errno(pRes) != 0) {
    printf("failed to create child table ct2, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

L
Liu Jicong 已提交
134 135 136
  pRes = taos_query(pConn, "insert into ct1 values(now, 3, 4, 'b')");
  if (taos_errno(pRes) != 0) {
    printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes));
137 138
    return -1;
  }
L
Liu Jicong 已提交
139
  taos_free_result(pRes);
L
fix  
Liu Jicong 已提交
140

wmmhello's avatar
wmmhello 已提交
141
  pRes = taos_query(pConn, "create table if not exists ct3 using st1(t1) tags(3000)");
L
fix  
Liu Jicong 已提交
142
  if (taos_errno(pRes) != 0) {
L
Liu Jicong 已提交
143
    printf("failed to create child table ct3, reason:%s\n", taos_errstr(pRes));
L
fix  
Liu Jicong 已提交
144 145
    return -1;
  }
L
Liu Jicong 已提交
146
  taos_free_result(pRes);
L
fix  
Liu Jicong 已提交
147

L
Liu Jicong 已提交
148 149 150 151 152
  pRes = taos_query(pConn, "insert into ct3 values(now, 5, 6, 'c')");
  if (taos_errno(pRes) != 0) {
    printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
153
  taos_free_result(pRes);
L
Liu Jicong 已提交
154

wmmhello's avatar
wmmhello 已提交
155 156 157 158 159 160
  pRes = taos_query(pConn, "alter table st1 add column c4 bigint");
  if (taos_errno(pRes) != 0) {
    printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);
wmmhello's avatar
wmmhello 已提交
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182

  pRes = taos_query(pConn, "alter table st1 modify column c3 binary(64)");
  if (taos_errno(pRes) != 0) {
    printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

  pRes = taos_query(pConn, "alter table st1 add tag t2 binary(64)");
  if (taos_errno(pRes) != 0) {
    printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

  pRes = taos_query(pConn, "alter table ct3 set tag t1=5000");
  if (taos_errno(pRes) != 0) {
    printf("failed to slter child table ct3, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

wmmhello's avatar
wmmhello 已提交
183 184 185 186 187 188 189 190 191 192 193 194 195
  pRes = taos_query(pConn, "drop table ct3 ct1");
  if (taos_errno(pRes) != 0) {
    printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

  pRes = taos_query(pConn, "drop table st1");
  if (taos_errno(pRes) != 0) {
    printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);
wmmhello's avatar
wmmhello 已提交
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223

  pRes = taos_query(pConn, "create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))");
  if (taos_errno(pRes) != 0) {
    printf("failed to create normal table n1, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

  pRes = taos_query(pConn, "alter table n1 add column c3 bigint");
  if (taos_errno(pRes) != 0) {
    printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

  pRes = taos_query(pConn, "alter table n1 modify column c2 nchar(8)");
  if (taos_errno(pRes) != 0) {
    printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

  pRes = taos_query(pConn, "alter table n1 rename column c3 cc3");
  if (taos_errno(pRes) != 0) {
    printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);
wmmhello's avatar
wmmhello 已提交
224 225 226 227 228 229 230

  pRes = taos_query(pConn, "alter table n1 comment 'hello'");
  if (taos_errno(pRes) != 0) {
    printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);
wmmhello's avatar
wmmhello 已提交
231 232 233 234 235 236 237 238

  pRes = taos_query(pConn, "alter table n1 drop column c1");
  if (taos_errno(pRes) != 0) {
    printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

wmmhello's avatar
wmmhello 已提交
239 240 241 242 243 244
  pRes = taos_query(pConn, "drop table n1");
  if (taos_errno(pRes) != 0) {
    printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);
wmmhello's avatar
wmmhello 已提交
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259

  pRes = taos_query(pConn, "create table jt(ts timestamp, i int) tags(t json)");
  if (taos_errno(pRes) != 0) {
    printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

  pRes = taos_query(pConn, "create table jt1 using jt tags('{\"k1\":1, \"k2\":\"hello\"}')");
  if (taos_errno(pRes) != 0) {
    printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

wmmhello's avatar
wmmhello 已提交
260 261 262 263 264 265 266
  pRes = taos_query(pConn, "create table jt2 using jt tags('')");
  if (taos_errno(pRes) != 0) {
    printf("failed to create super table jt2, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

267 268 269 270 271 272 273 274 275 276 277 278 279 280
  pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 nchar(8), t4 bool)");
  if (taos_errno(pRes) != 0) {
    printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

  pRes = taos_query(pConn, "drop table st1");
  if (taos_errno(pRes) != 0) {
    printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

L
Liu Jicong 已提交
281 282 283 284
  return 0;
}

int32_t create_topic() {
L
Liu Jicong 已提交
285
  printf("create topic\n");
L
Liu Jicong 已提交
286 287 288 289 290 291 292 293 294 295 296 297
  TAOS_RES* pRes;
  TAOS*     pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
  if (pConn == NULL) {
    return -1;
  }

  pRes = taos_query(pConn, "use abc1");
  if (taos_errno(pRes) != 0) {
    printf("error in use db, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);
298

299 300
  pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");
  /*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/
L
fix  
Liu Jicong 已提交
301 302 303 304 305 306 307 308
  if (taos_errno(pRes) != 0) {
    printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

#if 0
  pRes = taos_query(pConn, "insert into tu1 values(now, 1, 1.0, 'bi1')");
309
  if (taos_errno(pRes) != 0) {
L
fix  
Liu Jicong 已提交
310
    printf("failed to insert, reason:%s\n", taos_errstr(pRes));
311 312 313
    return -1;
  }
  taos_free_result(pRes);
L
fix  
Liu Jicong 已提交
314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333
  pRes = taos_query(pConn, "insert into tu1 values(now+1d, 1, 1.0, 'bi1')");
  if (taos_errno(pRes) != 0) {
    printf("failed to insert, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);
  pRes = taos_query(pConn, "insert into tu2 values(now, 2, 2.0, 'bi2')");
  if (taos_errno(pRes) != 0) {
    printf("failed to insert, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);
  pRes = taos_query(pConn, "insert into tu2 values(now+1d, 2, 2.0, 'bi2')");
  if (taos_errno(pRes) != 0) {
    printf("failed to insert, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);
#endif

L
Liu Jicong 已提交
334 335 336 337
  taos_close(pConn);
  return 0;
}

L
Liu Jicong 已提交
338 339
void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
  printf("commit %d tmq %p param %p\n", code, tmq, param);
L
Liu Jicong 已提交
340 341
}

L
Liu Jicong 已提交
342
tmq_t* build_consumer() {
L
Liu Jicong 已提交
343
#if 0
L
Liu Jicong 已提交
344 345 346 347 348 349 350 351
  TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
  assert(pConn != NULL);

  TAOS_RES* pRes = taos_query(pConn, "use abc1");
  if (taos_errno(pRes) != 0) {
    printf("error in use db, reason:%s\n", taos_errstr(pRes));
  }
  taos_free_result(pRes);
L
Liu Jicong 已提交
352
#endif
L
Liu Jicong 已提交
353 354 355

  tmq_conf_t* conf = tmq_conf_new();
  tmq_conf_set(conf, "group.id", "tg2");
L
Liu Jicong 已提交
356 357
  tmq_conf_set(conf, "td.connect.user", "root");
  tmq_conf_set(conf, "td.connect.pass", "taosdata");
358
  tmq_conf_set(conf, "msg.with.table.name", "true");
L
Liu Jicong 已提交
359
  tmq_conf_set(conf, "enable.auto.commit", "true");
L
Liu Jicong 已提交
360

361
  /*tmq_conf_set(conf, "experimental.snapshot.enable", "true");*/
L
Liu Jicong 已提交
362

L
Liu Jicong 已提交
363
  tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
L
Liu Jicong 已提交
364
  tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
L
Liu Jicong 已提交
365
  assert(tmq);
L
Liu Jicong 已提交
366
  tmq_conf_destroy(conf);
L
Liu Jicong 已提交
367 368 369 370 371
  return tmq;
}

tmq_list_t* build_topic_list() {
  tmq_list_t* topic_list = tmq_list_new();
L
Liu Jicong 已提交
372 373
  tmq_list_append(topic_list, "topic_ctb_column");
  /*tmq_list_append(topic_list, "tmq_test_db_multi_insert_topic");*/
L
Liu Jicong 已提交
374 375 376
  return topic_list;
}

377
void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
L
Liu Jicong 已提交
378
  int32_t code;
L
Liu Jicong 已提交
379

L
Liu Jicong 已提交
380 381
  if ((code = tmq_subscribe(tmq, topics))) {
    fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code));
L
Liu Jicong 已提交
382 383 384
    printf("subscribe err\n");
    return;
  }
L
Liu Jicong 已提交
385
  int32_t cnt = 0;
L
Liu Jicong 已提交
386
  while (running) {
L
Liu Jicong 已提交
387
    TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, -1);
L
Liu Jicong 已提交
388
    if (tmqmessage) {
L
Liu Jicong 已提交
389
      cnt++;
390
      msg_process(tmqmessage);
L
Liu Jicong 已提交
391
      /*if (cnt >= 2) break;*/
L
Liu Jicong 已提交
392
      /*printf("get data\n");*/
L
Liu Jicong 已提交
393
      taos_free_result(tmqmessage);
394
      /*} else {*/
L
Liu Jicong 已提交
395
      /*break;*/
L
Liu Jicong 已提交
396
      /*tmq_commit_sync(tmq, NULL);*/
L
Liu Jicong 已提交
397 398 399
    }
  }

L
Liu Jicong 已提交
400 401 402
  code = tmq_consumer_close(tmq);
  if (code)
    fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
L
Liu Jicong 已提交
403 404 405 406
  else
    fprintf(stderr, "%% Consumer closed\n");
}

407
void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
L
Liu Jicong 已提交
408
  static const int MIN_COMMIT_COUNT = 1;
L
Liu Jicong 已提交
409

L
Liu Jicong 已提交
410 411
  int     msg_count = 0;
  int32_t code;
L
Liu Jicong 已提交
412

L
Liu Jicong 已提交
413 414
  if ((code = tmq_subscribe(tmq, topics))) {
    fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code));
L
Liu Jicong 已提交
415 416 417
    return;
  }

L
Liu Jicong 已提交
418 419 420 421 422 423 424 425 426
  tmq_list_t* subList = NULL;
  tmq_subscription(tmq, &subList);
  char**  subTopics = tmq_list_to_c_array(subList);
  int32_t sz = tmq_list_get_size(subList);
  printf("subscribed topics: ");
  for (int32_t i = 0; i < sz; i++) {
    printf("%s, ", subTopics[i]);
  }
  printf("\n");
L
Liu Jicong 已提交
427
  tmq_list_destroy(subList);
L
Liu Jicong 已提交
428

L
Liu Jicong 已提交
429
  while (running) {
L
Liu Jicong 已提交
430
    TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000);
L
Liu Jicong 已提交
431
    if (tmqmessage) {
L
Liu Jicong 已提交
432
      msg_process(tmqmessage);
L
Liu Jicong 已提交
433
      taos_free_result(tmqmessage);
L
Liu Jicong 已提交
434

L
Liu Jicong 已提交
435
      /*tmq_commit_sync(tmq, NULL);*/
L
Liu Jicong 已提交
436
      /*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
L
Liu Jicong 已提交
437
    }
438
  }
L
Liu Jicong 已提交
439

L
Liu Jicong 已提交
440 441 442
  code = tmq_consumer_close(tmq);
  if (code)
    fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
L
Liu Jicong 已提交
443 444 445 446
  else
    fprintf(stderr, "%% Consumer closed\n");
}

L
Liu Jicong 已提交
447 448 449
int main(int argc, char* argv[]) {
  if (argc > 1) {
    printf("env init\n");
L
Liu Jicong 已提交
450 451 452
    if (init_env() < 0) {
      return -1;
    }
L
fix  
Liu Jicong 已提交
453
    create_topic();
L
Liu Jicong 已提交
454
  }
455
  tmq_t*      tmq = build_consumer();
L
Liu Jicong 已提交
456
  tmq_list_t* topic_list = build_topic_list();
457 458
  basic_consume_loop(tmq, topic_list);
  /*sync_consume_loop(tmq, topic_list);*/
L
Liu Jicong 已提交
459
}