tmq.c 90.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "cJSON.h"
17 18 19
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22 23
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
24
#include "tqueue.h"
25
#include "tref.h"
L
Liu Jicong 已提交
26 27
#include "ttimer.h"

L
Liu Jicong 已提交
28 29 30 31 32 33 34 35
int32_t tmqAskEp(tmq_t* tmq, bool async);

typedef struct {
  int8_t inited;
  tmr_h  timer;
} SMqMgmt;

static SMqMgmt tmqMgmt = {0};
36

L
Liu Jicong 已提交
37 38 39 40 41 42
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
43 44 45
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
46 47
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
48
struct tmq_list_t {
L
Liu Jicong 已提交
49
  SArray container;
L
Liu Jicong 已提交
50
};
L
Liu Jicong 已提交
51

L
Liu Jicong 已提交
52
struct tmq_conf_t {
53 54 55 56 57 58 59 60 61 62
  char    clientId[256];
  char    groupId[TSDB_CGROUP_LEN];
  int8_t  autoCommit;
  int8_t  resetOffset;
  int8_t  withTbName;
  int8_t  ssEnable;
  int32_t ssBatchSize;

  bool hbBgEnable;

63 64 65 66 67
  uint16_t       port;
  int32_t        autoCommitInterval;
  char*          ip;
  char*          user;
  char*          pass;
68
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
69
  void*          commitCbUserParam;
L
Liu Jicong 已提交
70 71 72
};

struct tmq_t {
L
Liu Jicong 已提交
73
  // conf
74 75 76 77 78 79 80 81 82 83 84
  char    groupId[TSDB_CGROUP_LEN];
  char    clientId[256];
  int8_t  withTbName;
  int8_t  useSnapshot;
  int8_t  autoCommit;
  int32_t autoCommitInterval;
  int32_t resetOffsetCfg;
  int64_t consumerId;

  bool hbBgEnable;

L
Liu Jicong 已提交
85 86
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
87 88 89 90

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
91 92
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
93
  int32_t epSkipCnt;
L
Liu Jicong 已提交
94
#endif
L
Liu Jicong 已提交
95 96
  int64_t pollCnt;

L
Liu Jicong 已提交
97
  // timer
98 99
  tmr_h hbLiveTimer;
  tmr_h epTimer;
L
Liu Jicong 已提交
100 101 102
  tmr_h reportTimer;
  tmr_h commitTimer;

L
Liu Jicong 已提交
103 104 105 106
  // connection
  STscObj* pTscObj;

  // container
L
Liu Jicong 已提交
107
  SArray*     clientTopics;  // SArray<SMqClientTopic>
L
Liu Jicong 已提交
108
  STaosQueue* mqueue;        // queue of rsp
L
Liu Jicong 已提交
109
  STaosQall*  qall;
L
Liu Jicong 已提交
110 111 112 113
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit

  // ctl
  tsem_t rspSem;
L
Liu Jicong 已提交
114 115
};

X
Xiaoyu Wang 已提交
116 117 118 119 120 121 122 123
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
124
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
125 126
};

L
Liu Jicong 已提交
127
enum {
128
  TMQ_DELAYED_TASK__ASK_EP = 1,
L
Liu Jicong 已提交
129 130 131 132
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

L
Liu Jicong 已提交
133
typedef struct {
134 135 136
  // statistics
  int64_t pollCnt;
  // offset
L
Liu Jicong 已提交
137 138 139 140
  /*int64_t      committedOffset;*/
  /*int64_t      currentOffset;*/
  STqOffsetVal committedOffsetNew;
  STqOffsetVal currentOffsetNew;
L
Liu Jicong 已提交
141
  // connection info
142
  int32_t vgId;
X
Xiaoyu Wang 已提交
143
  int32_t vgStatus;
L
Liu Jicong 已提交
144
  int32_t vgSkipCnt;
145 146 147
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
148
typedef struct {
149
  // subscribe info
L
Liu Jicong 已提交
150
  char* topicName;
L
Liu Jicong 已提交
151
  char  db[TSDB_DB_FNAME_LEN];
L
Liu Jicong 已提交
152 153 154

  SArray* vgs;  // SArray<SMqClientVg>

L
Liu Jicong 已提交
155 156
  int8_t         isSchemaAdaptive;
  SSchemaWrapper schema;
157 158
} SMqClientTopic;

L
Liu Jicong 已提交
159 160 161 162 163
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
164
  union {
L
Liu Jicong 已提交
165 166
    SMqDataRsp dataRsp;
    SMqMetaRsp metaRsp;
L
Liu Jicong 已提交
167
  };
L
Liu Jicong 已提交
168 169
} SMqPollRspWrapper;

L
Liu Jicong 已提交
170
typedef struct {
L
Liu Jicong 已提交
171 172 173
  tmq_t*  tmq;
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
174
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
175

L
Liu Jicong 已提交
176
typedef struct {
177
  tmq_t*  tmq;
L
Liu Jicong 已提交
178
  int32_t code;
L
Liu Jicong 已提交
179
  int32_t async;
X
Xiaoyu Wang 已提交
180
  tsem_t  rspSem;
181 182
} SMqAskEpCbParam;

L
Liu Jicong 已提交
183
typedef struct {
L
Liu Jicong 已提交
184 185
  tmq_t*          tmq;
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
186
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
187
  int32_t         epoch;
L
Liu Jicong 已提交
188
  int32_t         vgId;
L
Liu Jicong 已提交
189
  tsem_t          rspSem;
X
Xiaoyu Wang 已提交
190
} SMqPollCbParam;
191

L
Liu Jicong 已提交
192
#if 0
L
Liu Jicong 已提交
193
typedef struct {
L
Liu Jicong 已提交
194
  tmq_t*         tmq;
L
Liu Jicong 已提交
195 196
  int8_t         async;
  int8_t         automatic;
L
Liu Jicong 已提交
197
  int8_t         freeOffsets;
L
Liu Jicong 已提交
198
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
199
  tsem_t         rspSem;
L
Liu Jicong 已提交
200
  int32_t        rspErr;
L
Liu Jicong 已提交
201
  SArray*        offsets;
L
Liu Jicong 已提交
202
  void*          userParam;
L
Liu Jicong 已提交
203
} SMqCommitCbParam;
L
Liu Jicong 已提交
204
#endif
L
Liu Jicong 已提交
205

206
typedef struct {
L
Liu Jicong 已提交
207 208 209 210
  tmq_t* tmq;
  int8_t automatic;
  int8_t async;
  /*int8_t         freeOffsets;*/
L
Liu Jicong 已提交
211 212
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
L
Liu Jicong 已提交
213
  int32_t        rspErr;
214
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
215 216 217 218
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
  void*  userParam;
  tsem_t rspSem;
219 220 221 222 223 224 225
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
  STqOffset*           pOffset;
} SMqCommitCbParam2;

226
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
227
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
228
  conf->withTbName = false;
L
Liu Jicong 已提交
229
  conf->autoCommit = true;
L
Liu Jicong 已提交
230
  conf->autoCommitInterval = 5000;
X
Xiaoyu Wang 已提交
231
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
232 233 234
  return conf;
}

L
Liu Jicong 已提交
235
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
236 237 238 239 240 241
  if (conf) {
    if (conf->ip) taosMemoryFree(conf->ip);
    if (conf->user) taosMemoryFree(conf->user);
    if (conf->pass) taosMemoryFree(conf->pass);
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
242 243 244
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
245 246
  if (strcmp(key, "group.id") == 0) {
    strcpy(conf->groupId, value);
L
Liu Jicong 已提交
247
    return TMQ_CONF_OK;
248
  }
L
Liu Jicong 已提交
249

250 251
  if (strcmp(key, "client.id") == 0) {
    strcpy(conf->clientId, value);
L
Liu Jicong 已提交
252 253
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
254

L
Liu Jicong 已提交
255 256
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
257
      conf->autoCommit = true;
L
Liu Jicong 已提交
258 259
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
260
      conf->autoCommit = false;
L
Liu Jicong 已提交
261 262 263 264
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
265
  }
L
Liu Jicong 已提交
266

L
Liu Jicong 已提交
267 268 269 270 271
  if (strcmp(key, "auto.commit.interval.ms") == 0) {
    conf->autoCommitInterval = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
272 273 274 275 276 277 278 279 280 281 282 283 284 285
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
286

287 288
  if (strcmp(key, "msg.with.table.name") == 0) {
    if (strcmp(value, "true") == 0) {
289
      conf->withTbName = true;
L
Liu Jicong 已提交
290
      return TMQ_CONF_OK;
291
    } else if (strcmp(value, "false") == 0) {
292
      conf->withTbName = false;
L
Liu Jicong 已提交
293
      return TMQ_CONF_OK;
294 295 296 297 298
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
299
  if (strcmp(key, "experimental.snapshot.enable") == 0) {
L
Liu Jicong 已提交
300
    if (strcmp(value, "true") == 0) {
301 302 303 304 305 306 307 308 309 310 311 312 313
      conf->ssEnable = true;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
      conf->ssEnable = false;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

  if (strcmp(key, "enable.heartbeat.background") == 0) {
    if (strcmp(value, "true") == 0) {
      conf->hbBgEnable = true;
L
Liu Jicong 已提交
314 315
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
316
      conf->hbBgEnable = false;
L
Liu Jicong 已提交
317 318 319 320
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
321
    return TMQ_CONF_OK;
L
Liu Jicong 已提交
322 323
  }

L
Liu Jicong 已提交
324
  if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
325
    conf->ssBatchSize = atoi(value);
L
Liu Jicong 已提交
326 327 328
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
329
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
330 331 332
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
333
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
334 335 336
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
337
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
338 339 340
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
341
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
342 343 344
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
345
  if (strcmp(key, "td.connect.db") == 0) {
346
    /*conf->db = strdup(value);*/
L
Liu Jicong 已提交
347 348 349
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
350
  return TMQ_CONF_UNKNOWN;
351 352 353
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
354 355
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
356 357
}

L
Liu Jicong 已提交
358 359
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
360 361 362 363 364
  if (src == NULL || src[0] == 0) return -1;
  char* topic = strdup(src);
  if (topic[0] != '`') {
    strtolower(topic, src);
  }
L
fix  
Liu Jicong 已提交
365
  if (taosArrayPush(container, &topic) == NULL) return -1;
366 367 368
  return 0;
}

L
Liu Jicong 已提交
369
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
370
  SArray* container = &list->container;
L
Liu Jicong 已提交
371
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
372 373
}

L
Liu Jicong 已提交
374 375 376 377 378 379 380 381 382 383
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

L
Liu Jicong 已提交
384 385 386 387
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

L
Liu Jicong 已提交
388
#if 0
L
Liu Jicong 已提交
389 390
int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
  SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
L
Liu Jicong 已提交
391
  pParam->rspErr = code;
L
Liu Jicong 已提交
392 393
  if (pParam->async) {
    if (pParam->automatic && pParam->tmq->commitCb) {
L
Liu Jicong 已提交
394
      pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, pParam->tmq->commitCbUserParam);
L
Liu Jicong 已提交
395
    } else if (!pParam->automatic && pParam->userCb) {
L
Liu Jicong 已提交
396
      pParam->userCb(pParam->tmq, pParam->rspErr, pParam->userParam);
L
Liu Jicong 已提交
397 398
    }

L
Liu Jicong 已提交
399
    if (pParam->freeOffsets) {
L
Liu Jicong 已提交
400 401 402 403 404 405 406 407 408
      taosArrayDestroy(pParam->offsets);
    }

    taosMemoryFree(pParam);
  } else {
    tsem_post(&pParam->rspSem);
  }
  return 0;
}
L
Liu Jicong 已提交
409
#endif
L
Liu Jicong 已提交
410

D
dapan1121 已提交
411
int32_t tmqCommitCb2(void* param, SDataBuf* pBuf, int32_t code) {
412 413 414
  SMqCommitCbParam2*   pParam = (SMqCommitCbParam2*)param;
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
  // push into array
L
Liu Jicong 已提交
415
#if 0
416 417 418 419 420
  if (code == 0) {
    taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset);
  } else {
    taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
  }
L
Liu Jicong 已提交
421
#endif
L
Liu Jicong 已提交
422

S
Shengliang Guan 已提交
423
  /*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
L
Liu Jicong 已提交
424 425
   * pOffset->version);*/

426
  // count down waiting rsp
L
Liu Jicong 已提交
427
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
428 429 430 431 432 433 434
  ASSERT(waitingRspNum >= 0);

  if (waitingRspNum == 0) {
    // if no more waiting rsp
    if (pParamSet->async) {
      // call async cb func
      if (pParamSet->automatic && pParamSet->tmq->commitCb) {
L
Liu Jicong 已提交
435
        pParamSet->tmq->commitCb(pParamSet->tmq, pParamSet->rspErr, pParamSet->tmq->commitCbUserParam);
436 437
      } else if (!pParamSet->automatic && pParamSet->userCb) {
        // sem post
L
Liu Jicong 已提交
438
        pParamSet->userCb(pParamSet->tmq, pParamSet->rspErr, pParamSet->userParam);
439
      }
L
Liu Jicong 已提交
440 441
    } else {
      tsem_post(&pParamSet->rspSem);
442 443
    }

L
Liu Jicong 已提交
444
#if 0
445 446
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
L
Liu Jicong 已提交
447
#endif
448 449 450 451
  }
  return 0;
}

L
Liu Jicong 已提交
452 453 454 455 456 457 458
static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pTopic, SMqCommitCbParamSet* pParamSet) {
  STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
  if (pOffset == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pOffset->val = pVg->currentOffsetNew;
459

L
Liu Jicong 已提交
460 461 462 463
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pOffset->subKey, tmq->groupId, groupLen);
  pOffset->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);
L
Liu Jicong 已提交
464

L
Liu Jicong 已提交
465 466 467 468 469 470 471 472 473 474
  int32_t len;
  int32_t code;
  tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
  if (code < 0) {
    ASSERT(0);
    return -1;
  }
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
  if (buf == NULL) return -1;
  ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
475

L
Liu Jicong 已提交
476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
  tEncodeSTqOffset(&encoder, pOffset);

  // build param
  SMqCommitCbParam2* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam2));
  pParam->params = pParamSet;
  pParam->pOffset = pOffset;

  // build send info
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
    return -1;
  }
  pMsgSendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = sizeof(SMsgHead) + len,
      .handle = NULL,
  };

S
Shengliang Guan 已提交
498 499
  tscDebug("consumer:%" PRId64 ", commit offset of %s on vgId:%d, offset is %" PRId64, tmq->consumerId, pOffset->subKey,
           pVg->vgId, pOffset->val.version);
L
Liu Jicong 已提交
500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546

  // TODO: put into cb
  pVg->committedOffsetNew = pVg->currentOffsetNew;

  pMsgSendInfo->requestId = generateRequestId();
  pMsgSendInfo->requestObjRefId = 0;
  pMsgSendInfo->param = pParam;
  pMsgSendInfo->fp = tmqCommitCb2;
  pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
  // send msg

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
  pParamSet->waitingRspNum++;
  pParamSet->totalRspNum++;
  return 0;
}

int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
  char*   topic;
  int32_t vgId;
  ASSERT(msg != NULL);
  if (TD_RES_TMQ(msg)) {
    SMqRspObj* pRspObj = (SMqRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
  } else if (TD_RES_TMQ_META(msg)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
    topic = pMetaRspObj->topic;
    vgId = pMetaRspObj->vgId;
  } else {
    return TSDB_CODE_TMQ_INVALID_MSG;
  }

  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pParamSet->tmq = tmq;
  pParamSet->automatic = 0;
  pParamSet->async = async;
  /*pParamSet->freeOffsets = 1;*/
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

L
Liu Jicong 已提交
547 548
  int32_t code = -1;

L
Liu Jicong 已提交
549 550 551 552 553 554 555 556 557 558
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(pTopic->topicName, topic) != 0) continue;
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      if (pVg->vgId != vgId) continue;

      if (pVg->currentOffsetNew.type > 0 && !tOffsetEqual(&pVg->currentOffsetNew, &pVg->committedOffsetNew)) {
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          goto FAIL;
L
Liu Jicong 已提交
559
        }
L
Liu Jicong 已提交
560
        goto HANDLE_RSP;
L
Liu Jicong 已提交
561 562
      }
    }
L
Liu Jicong 已提交
563
  }
L
Liu Jicong 已提交
564

L
Liu Jicong 已提交
565 566 567 568
HANDLE_RSP:
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
569 570 571
    return 0;
  }

L
Liu Jicong 已提交
572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
    return code;
  } else {
    code = 0;
  }

FAIL:
  if (code != 0 && async) {
    userCb(tmq, code, userParam);
  }
  return 0;
}

int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                        void* userParam) {
  int32_t code = -1;

  if (msg != NULL) {
    return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
  }

596 597 598 599 600 601 602 603
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pParamSet->tmq = tmq;
  pParamSet->automatic = automatic;
  pParamSet->async = async;
L
Liu Jicong 已提交
604
  /*pParamSet->freeOffsets = 1;*/
605 606 607 608 609 610
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
611

S
Shengliang Guan 已提交
612
    tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgNum %d", tmq->consumerId, pTopic->topicName,
L
Liu Jicong 已提交
613 614
             (int32_t)taosArrayGetSize(pTopic->vgs));

615
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
L
Liu Jicong 已提交
616 617
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);

S
Shengliang Guan 已提交
618 619
      tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgId:%d", tmq->consumerId, pTopic->topicName,
               pVg->vgId);
L
Liu Jicong 已提交
620

L
Liu Jicong 已提交
621 622 623 624
      if (pVg->currentOffsetNew.type > 0 && !tOffsetEqual(&pVg->currentOffsetNew, &pVg->committedOffsetNew)) {
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          continue;
        }
625 626 627 628
      }
    }
  }

L
Liu Jicong 已提交
629 630 631 632 633 634
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
    return 0;
  }

635 636 637 638 639 640 641 642 643 644
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
  } else {
    code = 0;
  }

  if (code != 0 && async) {
    if (automatic) {
L
Liu Jicong 已提交
645
      tmq->commitCb(tmq, code, tmq->commitCbUserParam);
646
    } else {
L
Liu Jicong 已提交
647
      userCb(tmq, code, userParam);
648 649 650
    }
  }

L
Liu Jicong 已提交
651
#if 0
652 653 654 655
  if (!async) {
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
  }
L
Liu Jicong 已提交
656
#endif
657 658 659 660

  return 0;
}

L
Liu Jicong 已提交
661 662
#if 0
int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async,
L
Liu Jicong 已提交
663 664 665 666 667 668
                       tmq_commit_cb* userCb, void* userParam) {
  SMqCMCommitOffsetReq req;
  SArray*              pOffsets = NULL;
  void*                buf = NULL;
  SMqCommitCbParam*    pParam = NULL;
  SMsgSendInfo*        sendInfo = NULL;
L
Liu Jicong 已提交
669 670
  int8_t               freeOffsets;
  int32_t              code = -1;
L
Liu Jicong 已提交
671

L
Liu Jicong 已提交
672
  if (msg == NULL) {
L
Liu Jicong 已提交
673
    freeOffsets = 1;
L
Liu Jicong 已提交
674 675 676 677 678 679 680 681 682 683 684 685 686 687
    pOffsets = taosArrayInit(0, sizeof(SMqOffset));
    for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
      for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
        SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
        SMqOffset    offset;
        tstrncpy(offset.topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
        tstrncpy(offset.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
        offset.vgId = pVg->vgId;
        offset.offset = pVg->currentOffset;
        taosArrayPush(pOffsets, &offset);
      }
    }
  } else {
L
Liu Jicong 已提交
688
    freeOffsets = 0;
L
Liu Jicong 已提交
689
    pOffsets = (SArray*)&msg->container;
L
Liu Jicong 已提交
690 691 692 693 694
  }

  req.num = (int32_t)pOffsets->size;
  req.offsets = pOffsets->pData;

L
Liu Jicong 已提交
695 696 697 698
  SEncoder encoder;

  tEncoderInit(&encoder, NULL, 0);
  code = tEncodeSMqCMCommitOffsetReq(&encoder, &req);
L
Liu Jicong 已提交
699 700 701
  if (code < 0) {
    goto END;
  }
L
Liu Jicong 已提交
702
  int32_t tlen = encoder.pos;
L
Liu Jicong 已提交
703 704
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) {
L
Liu Jicong 已提交
705
    tEncoderClear(&encoder);
L
Liu Jicong 已提交
706 707
    goto END;
  }
L
Liu Jicong 已提交
708 709
  tEncoderClear(&encoder);

L
Liu Jicong 已提交
710 711 712 713 714 715 716 717 718 719 720 721
  tEncoderInit(&encoder, buf, tlen);
  tEncodeSMqCMCommitOffsetReq(&encoder, &req);
  tEncoderClear(&encoder);

  pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
  if (pParam == NULL) {
    goto END;
  }
  pParam->tmq = tmq;
  pParam->automatic = automatic;
  pParam->async = async;
  pParam->offsets = pOffsets;
L
Liu Jicong 已提交
722
  pParam->freeOffsets = freeOffsets;
L
Liu Jicong 已提交
723 724 725 726
  pParam->userCb = userCb;
  pParam->userParam = userParam;
  if (!async) tsem_init(&pParam->rspSem, 0, 0);

727
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750
  if (sendInfo == NULL) goto END;
  sendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqCommitCb;
  sendInfo->msgType = TDMT_MND_MQ_COMMIT_OFFSET;

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

  if (!async) {
    tsem_wait(&pParam->rspSem);
    code = pParam->rspErr;
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
751 752
  } else {
    code = 0;
L
Liu Jicong 已提交
753 754 755 756 757 758 759
  }

  // avoid double free if msg is sent
  buf = NULL;

END:
  if (buf) taosMemoryFree(buf);
L
Liu Jicong 已提交
760 761
  /*if (pParam) taosMemoryFree(pParam);*/
  /*if (sendInfo) taosMemoryFree(sendInfo);*/
L
Liu Jicong 已提交
762 763 764

  if (code != 0 && async) {
    if (automatic) {
L
Liu Jicong 已提交
765
      tmq->commitCb(tmq, code, (tmq_topic_vgroup_list_t*)pOffsets, tmq->commitCbUserParam);
L
Liu Jicong 已提交
766
    } else {
L
Liu Jicong 已提交
767
      userCb(tmq, code, (tmq_topic_vgroup_list_t*)pOffsets, userParam);
L
Liu Jicong 已提交
768 769 770
    }
  }

L
Liu Jicong 已提交
771
  if (!async && freeOffsets) {
L
Liu Jicong 已提交
772 773 774 775
    taosArrayDestroy(pOffsets);
  }
  return code;
}
L
Liu Jicong 已提交
776
#endif
L
Liu Jicong 已提交
777

778
void tmqAssignAskEpTask(void* param, void* tmrId) {
L
Liu Jicong 已提交
779
  tmq_t*  tmq = (tmq_t*)param;
780
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
781
  *pTaskType = TMQ_DELAYED_TASK__ASK_EP;
L
Liu Jicong 已提交
782
  taosWriteQitem(tmq->delayedTask, pTaskType);
783
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
784 785 786 787
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
788
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
789 790
  *pTaskType = TMQ_DELAYED_TASK__COMMIT;
  taosWriteQitem(tmq->delayedTask, pTaskType);
791
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
792 793 794 795
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
796
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
797 798
  *pTaskType = TMQ_DELAYED_TASK__REPORT;
  taosWriteQitem(tmq->delayedTask, pTaskType);
799
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
800 801
}

802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
  if (pMsg && pMsg->pData) taosMemoryFree(pMsg->pData);
  return 0;
}

void tmqSendHbReq(void* param, void* tmrId) {
  // TODO replace with ref
  tmq_t*    tmq = (tmq_t*)param;
  int64_t   consumerId = tmq->consumerId;
  int32_t   epoch = tmq->epoch;
  SMqHbReq* pReq = taosMemoryMalloc(sizeof(SMqHbReq));
  if (pReq == NULL) goto OVER;
  pReq->consumerId = consumerId;
  pReq->epoch = epoch;

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pReq);
  }
  sendInfo->msgInfo = (SDataBuf){
      .pData = pReq,
      .len = sizeof(SMqHbReq),
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = NULL;
  sendInfo->fp = tmqHbCb;
  sendInfo->msgType = TDMT_MND_MQ_HB;

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

OVER:
  taosTmrReset(tmqSendHbReq, 1000, tmq, tmqMgmt.timer, &tmq->hbLiveTimer);
}

L
Liu Jicong 已提交
842 843 844 845 846 847 848 849
int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
  STaosQall* qall = taosAllocateQall();
  taosReadAllQitems(tmq->delayedTask, qall);
  while (1) {
    int8_t* pTaskType = NULL;
    taosGetQitem(qall, (void**)&pTaskType);
    if (pTaskType == NULL) break;

850
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
L
Liu Jicong 已提交
851
      tmqAskEp(tmq, true);
852
      taosTmrReset(tmqAssignAskEpTask, 1000, tmq, tmqMgmt.timer, &tmq->epTimer);
L
Liu Jicong 已提交
853
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
L
Liu Jicong 已提交
854
      tmqCommitInner2(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
L
Liu Jicong 已提交
855 856 857 858 859
      taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer);
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    } else {
      ASSERT(0);
    }
L
Liu Jicong 已提交
860
    taosFreeQitem(pTaskType);
L
Liu Jicong 已提交
861 862 863 864 865
  }
  taosFreeQall(qall);
  return 0;
}

L
Liu Jicong 已提交
866
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
867
  SMqRspWrapper* msg = NULL;
L
Liu Jicong 已提交
868 869 870 871 872 873 874 875
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }

L
fix  
Liu Jicong 已提交
876
  msg = NULL;
L
Liu Jicong 已提交
877 878 879 880 881 882 883 884 885 886
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }
}

D
dapan1121 已提交
887
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
888 889
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
L
Liu Jicong 已提交
890
  /*tmq_t* tmq = pParam->tmq;*/
L
Liu Jicong 已提交
891 892 893
  tsem_post(&pParam->rspSem);
  return 0;
}
894

L
Liu Jicong 已提交
895
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
X
Xiaoyu Wang 已提交
896 897 898 899
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
900
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
901
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
902
  }
L
Liu Jicong 已提交
903
  return 0;
X
Xiaoyu Wang 已提交
904 905
}

L
Liu Jicong 已提交
906 907 908
int32_t tmq_unsubscribe(tmq_t* tmq) {
  tmq_list_t* lst = tmq_list_new();
  int32_t     rsp = tmq_subscribe(tmq, lst);
L
Liu Jicong 已提交
909 910
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
911 912
}

L
Liu Jicong 已提交
913
#if 0
914
tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
wafwerar's avatar
wafwerar 已提交
915
  tmq_t* pTmq = taosMemoryCalloc(sizeof(tmq_t), 1);
916 917 918 919 920 921
  if (pTmq == NULL) {
    return NULL;
  }
  pTmq->pTscObj = (STscObj*)conn;
  pTmq->status = 0;
  pTmq->pollCnt = 0;
L
Liu Jicong 已提交
922
  pTmq->epoch = 0;
L
fix txn  
Liu Jicong 已提交
923
  pTmq->epStatus = 0;
L
temp  
Liu Jicong 已提交
924
  pTmq->epSkipCnt = 0;
L
Liu Jicong 已提交
925
  // set conf
926 927
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
L
Liu Jicong 已提交
928
  pTmq->autoCommit = conf->autoCommit;
929
  pTmq->commit_cb = conf->commit_cb;
L
Liu Jicong 已提交
930
  pTmq->resetOffsetCfg = conf->resetOffset;
L
Liu Jicong 已提交
931

L
Liu Jicong 已提交
932 933 934 935 936 937 938 939 940 941
  pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  if (pTmq->clientTopics == NULL) {
    taosMemoryFree(pTmq);
    return NULL;
  }

  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();

L
Liu Jicong 已提交
942 943
  tsem_init(&pTmq->rspSem, 0, 0);

L
Liu Jicong 已提交
944 945
  return pTmq;
}
L
Liu Jicong 已提交
946
#endif
L
Liu Jicong 已提交
947

L
Liu Jicong 已提交
948
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
L
Liu Jicong 已提交
949 950 951 952 953 954 955 956 957 958 959
  // init timer
  int8_t inited = atomic_val_compare_exchange_8(&tmqMgmt.inited, 0, 1);
  if (inited == 0) {
    tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
    if (tmqMgmt.timer == NULL) {
      atomic_store_8(&tmqMgmt.inited, 0);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
  }

L
Liu Jicong 已提交
960 961
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
L
Liu Jicong 已提交
962 963
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
L
Liu Jicong 已提交
964 965
    return NULL;
  }
L
Liu Jicong 已提交
966

L
Liu Jicong 已提交
967 968 969 970 971
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

  ASSERT(user);
  ASSERT(pass);
L
Liu Jicong 已提交
972
  ASSERT(conf->groupId[0]);
L
Liu Jicong 已提交
973

L
Liu Jicong 已提交
974 975 976 977 978 979
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
  pTmq->delayedTask = taosOpenQueue();

  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
L
Liu Jicong 已提交
980 981
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
L
Liu Jicong 已提交
982 983
    goto FAIL;
  }
L
Liu Jicong 已提交
984

L
Liu Jicong 已提交
985 986
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
987 988
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
989 990
  /*pTmq->epStatus = 0;*/
  /*pTmq->epSkipCnt = 0;*/
L
Liu Jicong 已提交
991

L
Liu Jicong 已提交
992 993 994
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
995
  pTmq->withTbName = conf->withTbName;
996
  pTmq->useSnapshot = conf->ssEnable;
L
Liu Jicong 已提交
997
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
998
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
999 1000
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
1001 1002
  pTmq->resetOffsetCfg = conf->resetOffset;

1003 1004
  pTmq->hbBgEnable = conf->hbBgEnable;

L
Liu Jicong 已提交
1005
  // assign consumerId
L
Liu Jicong 已提交
1006
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
1007

L
Liu Jicong 已提交
1008 1009
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
L
Liu Jicong 已提交
1010
    tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
L
Liu Jicong 已提交
1011 1012
    goto FAIL;
  }
L
Liu Jicong 已提交
1013

L
Liu Jicong 已提交
1014 1015 1016
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
L
Liu Jicong 已提交
1017
    tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
L
Liu Jicong 已提交
1018 1019 1020
    tsem_destroy(&pTmq->rspSem);
    goto FAIL;
  }
L
Liu Jicong 已提交
1021

1022 1023 1024 1025
  if (pTmq->hbBgEnable) {
    pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pTmq, tmqMgmt.timer);
  }

L
Liu Jicong 已提交
1026 1027
  tscInfo("consumer %ld is setup, consumer group %s", pTmq->consumerId, pTmq->groupId);

1028
  return pTmq;
L
Liu Jicong 已提交
1029 1030 1031 1032 1033 1034 1035 1036

FAIL:
  if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
  if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
  if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
  if (pTmq->qall) taosFreeQall(pTmq->qall);
  taosMemoryFree(pTmq);
  return NULL;
1037 1038
}

L
Liu Jicong 已提交
1039 1040
#if 0
int32_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
L
Liu Jicong 已提交
1041
  return tmqCommitInner2(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam);
L
Liu Jicong 已提交
1042
}
L
Liu Jicong 已提交
1043
#endif
L
Liu Jicong 已提交
1044

L
Liu Jicong 已提交
1045
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
L
Liu Jicong 已提交
1046 1047 1048 1049 1050
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
  SCMSubscribeReq req = {0};
  int32_t         code = -1;
1051 1052

  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
1053
  tstrncpy(req.clientId, tmq->clientId, 256);
L
Liu Jicong 已提交
1054
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
1055
  req.topicNames = taosArrayInit(sz, sizeof(void*));
L
Liu Jicong 已提交
1056
  if (req.topicNames == NULL) goto FAIL;
1057

L
Liu Jicong 已提交
1058 1059
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
1060 1061

    SName name = {0};
L
Liu Jicong 已提交
1062
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
1063

L
Liu Jicong 已提交
1064 1065 1066
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
1067
    }
L
Liu Jicong 已提交
1068
    tNameExtractFullName(&name, topicFName);
1069

L
Liu Jicong 已提交
1070 1071 1072
    tscDebug("subscribe topic: %s", topicFName);

    taosArrayPush(req.topicNames, &topicFName);
1073 1074
  }

L
Liu Jicong 已提交
1075 1076 1077 1078
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) goto FAIL;

1079 1080 1081
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

1082
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1083
  if (sendInfo == NULL) goto FAIL;
1084

X
Xiaoyu Wang 已提交
1085
  SMqSubscribeCbParam param = {
L
Liu Jicong 已提交
1086
      .rspErr = 0,
X
Xiaoyu Wang 已提交
1087 1088
      .tmq = tmq,
  };
L
Liu Jicong 已提交
1089

L
Liu Jicong 已提交
1090 1091 1092
  if (tsem_init(&param.rspSem, 0, 0) != 0) goto FAIL;

  sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1093 1094 1095 1096
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
1097

L
Liu Jicong 已提交
1098 1099
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1100 1101
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
1102 1103
  sendInfo->msgType = TDMT_MND_SUBSCRIBE;

1104 1105 1106 1107 1108
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
1109 1110 1111
  // avoid double free if msg is sent
  buf = NULL;

L
Liu Jicong 已提交
1112 1113
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
1114

L
Liu Jicong 已提交
1115 1116 1117
  code = param.rspErr;
  if (code != 0) goto FAIL;

L
Liu Jicong 已提交
1118
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
L
fix  
Liu Jicong 已提交
1119
    tscDebug("consumer not ready, retry");
L
Liu Jicong 已提交
1120 1121
    taosMsleep(500);
  }
1122

1123 1124 1125
  // init ep timer
  if (tmq->epTimer == NULL) {
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, tmq, tmqMgmt.timer);
1126
  }
L
Liu Jicong 已提交
1127 1128

  // init auto commit timer
1129
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
L
Liu Jicong 已提交
1130 1131 1132
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer);
  }

L
Liu Jicong 已提交
1133 1134 1135
  code = 0;
FAIL:
  if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
1136
  if (code != 0 && buf) {
L
Liu Jicong 已提交
1137 1138 1139
    taosMemoryFree(buf);
  }
  return code;
1140 1141
}

L
Liu Jicong 已提交
1142
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
1143
  //
1144
  conf->commitCb = cb;
L
Liu Jicong 已提交
1145
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
1146
}
1147

D
dapan1121 已提交
1148
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1149 1150
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
1151
  SMqClientTopic* pTopic = pParam->pTopic;
X
Xiaoyu Wang 已提交
1152
  tmq_t*          tmq = pParam->tmq;
L
Liu Jicong 已提交
1153 1154 1155
  int32_t         vgId = pParam->vgId;
  int32_t         epoch = pParam->epoch;
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
1156
  if (code != 0) {
S
Shengliang Guan 已提交
1157
    tscWarn("msg discard from vgId:%d, epoch %d, code:%x", vgId, epoch, code);
L
Liu Jicong 已提交
1158
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1159 1160 1161 1162
    if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
      if (pRspWrapper == NULL) {
        taosMemoryFree(pMsg->pData);
S
Shengliang Guan 已提交
1163
        tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
Liu Jicong 已提交
1164 1165 1166 1167 1168 1169 1170 1171
        goto CREATE_MSG_FAIL;
      }
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      /*pRspWrapper->vgHandle = pVg;*/
      /*pRspWrapper->topicHandle = pTopic;*/
      taosWriteQitem(tmq->mqueue, pRspWrapper);
      tsem_post(&tmq->rspSem);
    }
L
fix txn  
Liu Jicong 已提交
1172
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1173 1174
  }

X
Xiaoyu Wang 已提交
1175 1176 1177
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
1178
    // do not write into queue since updating epoch reset
S
Shengliang Guan 已提交
1179
    tscWarn("msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d", vgId, msgEpoch,
L
Liu Jicong 已提交
1180
            tmqEpoch);
1181
    tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1182
    taosMemoryFree(pMsg->pData);
X
Xiaoyu Wang 已提交
1183 1184 1185 1186
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
S
Shengliang Guan 已提交
1187
    tscWarn("mismatch rsp from vgId:%d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
X
Xiaoyu Wang 已提交
1188 1189
  }

L
Liu Jicong 已提交
1190 1191 1192
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

1193
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1194
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1195
    taosMemoryFree(pMsg->pData);
S
Shengliang Guan 已提交
1196
    tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
fix txn  
Liu Jicong 已提交
1197
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1198
  }
L
Liu Jicong 已提交
1199

L
Liu Jicong 已提交
1200
  pRspWrapper->tmqRspType = rspType;
L
Liu Jicong 已提交
1201 1202
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
1203

L
Liu Jicong 已提交
1204
  if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1205 1206 1207
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
L
Liu Jicong 已提交
1208
    memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1209 1210 1211
  } else {
    ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP);
    tDecodeSMqMetaRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->metaRsp);
L
Liu Jicong 已提交
1212
    memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1213
  }
L
Liu Jicong 已提交
1214

L
Liu Jicong 已提交
1215
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1216

S
Shengliang Guan 已提交
1217 1218 1219
  tscDebug("consumer:%" PRId64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d",
           tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version,
           rspType);
L
fix  
Liu Jicong 已提交
1220

L
Liu Jicong 已提交
1221
  taosWriteQitem(tmq->mqueue, pRspWrapper);
1222
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1223

L
Liu Jicong 已提交
1224
  return 0;
L
fix txn  
Liu Jicong 已提交
1225
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1226
  if (epoch == tmq->epoch) {
L
Liu Jicong 已提交
1227 1228
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
1229
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1230
  return -1;
1231 1232
}

1233 1234 1235 1236 1237
bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
  bool set = false;

  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
S
Shengliang Guan 已提交
1238
  tscDebug("consumer:%" PRId64 ", update ep epoch %d to epoch %d, topic num:%d", tmq->consumerId, tmq->epoch, epoch,
1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256
           topicNumGet);

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
S
Shengliang Guan 已提交
1257
      tscDebug("consumer:%" PRId64 ", new vg num: %d", tmq->consumerId, vgNumCur);
1258 1259 1260
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
        sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
L
Liu Jicong 已提交
1261 1262 1263 1264
        char buf[80];
        tFormatOffset(buf, 80, &pVgCur->currentOffsetNew);
        tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch,
                 pVgCur->vgId, vgKey, buf);
L
Liu Jicong 已提交
1265
        taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffsetNew, sizeof(STqOffsetVal));
1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
    topic.schema = pTopicEp->schema;
    topic.topicName = strdup(pTopicEp->topic);
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);

S
Shengliang Guan 已提交
1277
    tscDebug("consumer:%" PRId64 ", update topic: %s", tmq->consumerId, topic.topicName);
1278 1279 1280 1281 1282 1283

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
L
Liu Jicong 已提交
1284 1285
      STqOffsetVal* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      STqOffsetVal  offsetNew = {.type = tmq->resetOffsetCfg};
1286
      if (pOffset != NULL) {
L
Liu Jicong 已提交
1287
        offsetNew = *pOffset;
1288 1289 1290 1291
      }

      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1292
          .currentOffsetNew = offsetNew,
1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
          .vgSkipCnt = 0,
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
    taosArrayPush(newTopics, &topic);
  }
  if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
  taosHashCleanup(pHash);
  tmq->clientTopics = newTopics;

  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);

  atomic_store_32(&tmq->epoch, epoch);
  return set;
}

L
Liu Jicong 已提交
1316
#if 0
L
Liu Jicong 已提交
1317
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
L
Liu Jicong 已提交
1318
  /*printf("call update ep %d\n", epoch);*/
X
Xiaoyu Wang 已提交
1319
  bool    set = false;
L
Liu Jicong 已提交
1320 1321
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
S
Shengliang Guan 已提交
1322
  tscDebug("consumer:%" PRId64 ", update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch,
L
Liu Jicong 已提交
1323
           topicNumGet);
L
Liu Jicong 已提交
1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335
  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }
  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }

  // find topic, build hash
  for (int32_t i = 0; i < topicNumGet; i++) {
X
Xiaoyu Wang 已提交
1336 1337
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
L
Liu Jicong 已提交
1338
    topic.schema = pTopicEp->schema;
L
Liu Jicong 已提交
1339
    taosHashClear(pHash);
X
Xiaoyu Wang 已提交
1340
    topic.topicName = strdup(pTopicEp->topic);
L
Liu Jicong 已提交
1341
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1342

S
Shengliang Guan 已提交
1343
    tscDebug("consumer:%" PRId64 ", update topic: %s", tmq->consumerId, topic.topicName);
L
Liu Jicong 已提交
1344 1345 1346 1347 1348 1349
    int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
    for (int32_t j = 0; j < topicNumCur; j++) {
      // find old topic
      SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
      if (pTopicCur->vgs && strcmp(pTopicCur->topicName, pTopicEp->topic) == 0) {
        int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
S
Shengliang Guan 已提交
1350
        tscDebug("consumer:%" PRId64 ", new vg num: %d", tmq->consumerId, vgNumCur);
L
Liu Jicong 已提交
1351 1352 1353 1354
        if (vgNumCur == 0) break;
        for (int32_t k = 0; k < vgNumCur; k++) {
          SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k);
          sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId);
S
Shengliang Guan 已提交
1355
          tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d build %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey);
L
Liu Jicong 已提交
1356 1357 1358 1359 1360 1361 1362 1363 1364
          taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
        }
        break;
      }
    }

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
X
Xiaoyu Wang 已提交
1365
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
L
Liu Jicong 已提交
1366 1367 1368
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
      int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      int64_t  offset = pVgEp->offset;
S
Shengliang Guan 已提交
1369
      tscDebug("consumer:%" PRId64 ", (epoch %d) original offset of vgId:%d is %" PRId64, tmq->consumerId, epoch, pVgEp->vgId, offset);
L
Liu Jicong 已提交
1370 1371
      if (pOffset != NULL) {
        offset = *pOffset;
S
Shengliang Guan 已提交
1372
        tscDebug("consumer:%" PRId64 ", (epoch %d) receive offset of vgId:%d, full key is %s", tmq->consumerId, epoch, pVgEp->vgId,
1373
                 vgKey);
L
Liu Jicong 已提交
1374
      }
S
Shengliang Guan 已提交
1375
      tscDebug("consumer:%" PRId64 ", (epoch %d) offset of vgId:%d updated to %" PRId64, tmq->consumerId, epoch, pVgEp->vgId, offset);
X
Xiaoyu Wang 已提交
1376 1377
      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1378
          .currentOffset = offset,
X
Xiaoyu Wang 已提交
1379 1380 1381
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
L
Liu Jicong 已提交
1382
          .vgSkipCnt = 0,
X
Xiaoyu Wang 已提交
1383 1384 1385 1386
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
L
Liu Jicong 已提交
1387
    taosArrayPush(newTopics, &topic);
X
Xiaoyu Wang 已提交
1388
  }
L
Liu Jicong 已提交
1389
  if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
L
Liu Jicong 已提交
1390
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
1391
  tmq->clientTopics = newTopics;
1392

1393 1394 1395 1396
  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
1397

X
Xiaoyu Wang 已提交
1398 1399 1400
  atomic_store_32(&tmq->epoch, epoch);
  return set;
}
L
Liu Jicong 已提交
1401
#endif
X
Xiaoyu Wang 已提交
1402

D
dapan1121 已提交
1403
int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
1404
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1405
  tmq_t*           tmq = pParam->tmq;
L
Liu Jicong 已提交
1406
  int8_t           async = pParam->async;
L
Liu Jicong 已提交
1407
  pParam->code = code;
1408
  if (code != 0) {
S
Shengliang Guan 已提交
1409
    tscError("consumer:%" PRId64 ", get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->async);
L
Liu Jicong 已提交
1410
    goto END;
1411
  }
L
Liu Jicong 已提交
1412

L
Liu Jicong 已提交
1413
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1414
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1415
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1416 1417
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
S
Shengliang Guan 已提交
1418
  tscDebug("consumer:%" PRId64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1419 1420
  if (head->epoch <= epoch) {
    goto END;
1421
  }
L
Liu Jicong 已提交
1422

L
Liu Jicong 已提交
1423
  if (!async) {
L
Liu Jicong 已提交
1424 1425
    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
S
Shengliang Guan 已提交
1426 1427
    /*printf("rsp epoch %" PRId64 " sz %" PRId64 "\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %" PRId64 " sz %" PRId64 "\n", tmq->epoch, tmq->clientTopics->size);*/
L
Liu Jicong 已提交
1428
    tmqUpdateEp2(tmq, head->epoch, &rsp);
L
Liu Jicong 已提交
1429
    tDeleteSMqAskEpRsp(&rsp);
X
Xiaoyu Wang 已提交
1430
  } else {
1431
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1432
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1433
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1434 1435
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1436
    }
L
Liu Jicong 已提交
1437 1438 1439
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1440
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1441

L
Liu Jicong 已提交
1442
    taosWriteQitem(tmq->mqueue, pWrapper);
1443
    tsem_post(&tmq->rspSem);
1444
  }
L
Liu Jicong 已提交
1445 1446

END:
L
Liu Jicong 已提交
1447
  /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1448
  if (!async) {
L
Liu Jicong 已提交
1449
    tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
1450 1451
  } else {
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
1452 1453
  }
  return code;
1454 1455
}

L
Liu Jicong 已提交
1456
int32_t tmqAskEp(tmq_t* tmq, bool async) {
L
Liu Jicong 已提交
1457
  int32_t code = 0;
L
Liu Jicong 已提交
1458
#if 0
L
Liu Jicong 已提交
1459
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
L
fix txn  
Liu Jicong 已提交
1460
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
1461
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
S
Shengliang Guan 已提交
1462
    tscTrace("consumer:%" PRId64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
1463
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
1464
  }
L
temp  
Liu Jicong 已提交
1465
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
1466
#endif
L
Liu Jicong 已提交
1467
  int32_t      tlen = sizeof(SMqAskEpReq);
L
Liu Jicong 已提交
1468
  SMqAskEpReq* req = taosMemoryCalloc(1, tlen);
L
Liu Jicong 已提交
1469
  if (req == NULL) {
L
Liu Jicong 已提交
1470
    tscError("failed to malloc get subscribe ep buf");
L
Liu Jicong 已提交
1471
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1472
    return -1;
L
Liu Jicong 已提交
1473
  }
L
Liu Jicong 已提交
1474 1475 1476
  req->consumerId = htobe64(tmq->consumerId);
  req->epoch = htonl(tmq->epoch);
  strcpy(req->cgroup, tmq->groupId);
1477

L
Liu Jicong 已提交
1478
  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
1479 1480
  if (pParam == NULL) {
    tscError("failed to malloc subscribe param");
wafwerar's avatar
wafwerar 已提交
1481
    taosMemoryFree(req);
L
Liu Jicong 已提交
1482
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1483
    return -1;
L
Liu Jicong 已提交
1484 1485
  }
  pParam->tmq = tmq;
L
Liu Jicong 已提交
1486
  pParam->async = async;
X
Xiaoyu Wang 已提交
1487
  tsem_init(&pParam->rspSem, 0, 0);
1488

1489
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1490 1491
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1492 1493
    taosMemoryFree(pParam);
    taosMemoryFree(req);
L
Liu Jicong 已提交
1494
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1495 1496 1497 1498 1499 1500 1501 1502 1503 1504
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
      .pData = req,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
L
Liu Jicong 已提交
1505 1506 1507
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1508
  sendInfo->msgType = TDMT_MND_MQ_ASK_EP;
1509

L
Liu Jicong 已提交
1510 1511
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

S
Shengliang Guan 已提交
1512
  tscDebug("consumer:%" PRId64 ", ask ep", tmq->consumerId);
L
add log  
Liu Jicong 已提交
1513

L
Liu Jicong 已提交
1514 1515
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1516

L
Liu Jicong 已提交
1517
  if (!async) {
L
Liu Jicong 已提交
1518 1519 1520 1521 1522
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }
  return code;
1523 1524
}

L
Liu Jicong 已提交
1525 1526
#if 0
int32_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
L
Liu Jicong 已提交
1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539
  const SMqOffset* pOffset = &offset->offset;
  if (strcmp(pOffset->cgroup, tmq->groupId) != 0) {
    return TMQ_RESP_ERR__FAIL;
  }
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* clientTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(clientTopic->topicName, pOffset->topicName) == 0) {
      int32_t vgSz = taosArrayGetSize(clientTopic->vgs);
      for (int32_t j = 0; j < vgSz; j++) {
        SMqClientVg* pVg = taosArrayGet(clientTopic->vgs, j);
        if (pVg->vgId == pOffset->vgId) {
          pVg->currentOffset = pOffset->offset;
L
Liu Jicong 已提交
1540
          tmqClearUnhandleMsg(tmq);
L
Liu Jicong 已提交
1541 1542 1543 1544 1545 1546 1547
          return TMQ_RESP_ERR__SUCCESS;
        }
      }
    }
  }
  return TMQ_RESP_ERR__FAIL;
}
L
Liu Jicong 已提交
1548
#endif
L
Liu Jicong 已提交
1549

1550
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1551 1552 1553 1554 1555 1556 1557 1558 1559 1560
  /*int64_t reqOffset;*/
  /*if (pVg->currentOffset >= 0) {*/
  /*reqOffset = pVg->currentOffset;*/
  /*} else {*/
  /*if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {*/
  /*tscError("unable to poll since no committed offset but reset offset is set to none");*/
  /*return NULL;*/
  /*}*/
  /*reqOffset = tmq->resetOffsetCfg;*/
  /*}*/
L
Liu Jicong 已提交
1561

L
Liu Jicong 已提交
1562
  SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq));
1563 1564 1565
  if (pReq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
1566

L
Liu Jicong 已提交
1567 1568 1569
  /*strcpy(pReq->topic, pTopic->topicName);*/
  /*strcpy(pReq->cgroup, tmq->groupId);*/

L
Liu Jicong 已提交
1570 1571 1572 1573
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1574

1575
  pReq->withTbName = tmq->withTbName;
1576
  pReq->timeout = timeout;
L
Liu Jicong 已提交
1577
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1578
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1579 1580
  /*pReq->currentOffset = reqOffset;*/
  pReq->reqOffset = pVg->currentOffsetNew;
L
Liu Jicong 已提交
1581
  pReq->reqId = generateRequestId();
1582

L
Liu Jicong 已提交
1583 1584
  pReq->useSnapshot = tmq->useSnapshot;

1585
  pReq->head.vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
1586
  pReq->head.contLen = htonl(sizeof(SMqPollReq));
1587 1588 1589
  return pReq;
}

L
Liu Jicong 已提交
1590 1591
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
L
Liu Jicong 已提交
1592
  pRspObj->resType = RES_TYPE__TMQ_META;
L
Liu Jicong 已提交
1593 1594 1595 1596 1597 1598 1599 1600
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

L
Liu Jicong 已提交
1601 1602 1603
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1604 1605
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1606
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1607
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1608
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
L
Liu Jicong 已提交
1609

L
Liu Jicong 已提交
1610 1611
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
L
Liu Jicong 已提交
1612
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1613 1614
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1615

L
Liu Jicong 已提交
1616
  return pRspObj;
X
Xiaoyu Wang 已提交
1617 1618
}

1619
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1620
  /*tscDebug("call poll");*/
X
Xiaoyu Wang 已提交
1621 1622 1623 1624 1625 1626
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
      if (vgStatus != TMQ_VG_STATUS__IDLE) {
L
Liu Jicong 已提交
1627
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
L
Liu Jicong 已提交
1628 1629
        tscTrace("consumer:%" PRId64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId,
                 vgSkipCnt);
X
Xiaoyu Wang 已提交
1630
        continue;
L
Liu Jicong 已提交
1631
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1632 1633 1634 1635
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
S
Shengliang Guan 已提交
1636
        tscDebug("consumer:%" PRId64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId);
L
temp  
Liu Jicong 已提交
1637 1638
        }
#endif
X
Xiaoyu Wang 已提交
1639
      }
L
Liu Jicong 已提交
1640
      atomic_store_32(&pVg->vgSkipCnt, 0);
1641
      SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, timeout, pTopic, pVg);
X
Xiaoyu Wang 已提交
1642 1643
      if (pReq == NULL) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1644
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1645 1646
        return -1;
      }
wafwerar's avatar
wafwerar 已提交
1647
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1648
      if (pParam == NULL) {
wafwerar's avatar
wafwerar 已提交
1649
        taosMemoryFree(pReq);
X
Xiaoyu Wang 已提交
1650
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1651
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1652 1653
        return -1;
      }
L
Liu Jicong 已提交
1654 1655
      pParam->tmq = tmq;
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1656
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1657
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1658 1659
      pParam->epoch = tmq->epoch;

1660
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1661
      if (sendInfo == NULL) {
wafwerar's avatar
wafwerar 已提交
1662 1663
        taosMemoryFree(pReq);
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1664
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1665
        tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1666 1667 1668 1669
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1670
          .pData = pReq,
L
Liu Jicong 已提交
1671
          .len = sizeof(SMqPollReq),
X
Xiaoyu Wang 已提交
1672 1673
          .handle = NULL,
      };
L
Liu Jicong 已提交
1674
      sendInfo->requestId = pReq->reqId;
X
Xiaoyu Wang 已提交
1675
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1676
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1677
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1678
      sendInfo->msgType = TDMT_VND_CONSUME;
X
Xiaoyu Wang 已提交
1679 1680

      int64_t transporterId = 0;
L
fix  
Liu Jicong 已提交
1681
      /*printf("send poll\n");*/
L
Liu Jicong 已提交
1682

1683 1684
      char offsetFormatBuf[80];
      tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffsetNew);
L
Liu Jicong 已提交
1685 1686
      tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64,
               tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId);
S
Shengliang Guan 已提交
1687
      /*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/
X
Xiaoyu Wang 已提交
1688 1689 1690 1691 1692 1693 1694 1695
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
  return 0;
}

L
Liu Jicong 已提交
1696 1697
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1698
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1699 1700
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1701
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
L
Liu Jicong 已提交
1702
      tmqUpdateEp2(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1703
      /*tmqClearUnhandleMsg(tmq);*/
X
Xiaoyu Wang 已提交
1704 1705 1706 1707 1708 1709 1710 1711 1712 1713
      *pReset = true;
    } else {
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
1714
void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
X
Xiaoyu Wang 已提交
1715
  while (1) {
L
Liu Jicong 已提交
1716 1717 1718
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1719
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1720 1721
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
      if (rspWrapper == NULL) return NULL;
X
Xiaoyu Wang 已提交
1722 1723
    }

L
Liu Jicong 已提交
1724 1725 1726 1727 1728
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(rspWrapper);
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
      return NULL;
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1729
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1730
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
1731
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1732
      if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1733
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
L
Liu Jicong 已提交
1734 1735
        /*printf("vgId:%d offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
         * rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1736
        pVg->currentOffsetNew = pollRspWrapper->dataRsp.rspOffset;
X
Xiaoyu Wang 已提交
1737
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1738
        if (pollRspWrapper->dataRsp.blockNum == 0) {
L
Liu Jicong 已提交
1739 1740
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1741 1742
          continue;
        }
L
Liu Jicong 已提交
1743
        // build rsp
L
Liu Jicong 已提交
1744
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1745
        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1746
        return pRsp;
X
Xiaoyu Wang 已提交
1747
      } else {
L
Liu Jicong 已提交
1748 1749 1750 1751 1752 1753 1754
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
                 pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
        taosFreeQitem(pollRspWrapper);
      }
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1755
      if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1756
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
L
Liu Jicong 已提交
1757 1758
        /*printf("vgId:%d offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
         * rspMsg->msg.rspOffset);*/
wmmhello's avatar
wmmhello 已提交
1759 1760
        pVg->currentOffsetNew.version = pollRspWrapper->metaRsp.rspOffset;
        pVg->currentOffsetNew.type = TMQ_OFFSET__LOG;
L
Liu Jicong 已提交
1761 1762
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // build rsp
L
Liu Jicong 已提交
1763
        SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1764 1765 1766 1767
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
L
Liu Jicong 已提交
1768
                 pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1769
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1770 1771
      }
    } else {
L
fix  
Liu Jicong 已提交
1772
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1773
      bool reset = false;
L
Liu Jicong 已提交
1774 1775
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1776
      if (pollIfReset && reset) {
S
Shengliang Guan 已提交
1777
        tscDebug("consumer:%" PRId64 ", reset and repoll", tmq->consumerId);
1778
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
1779 1780 1781 1782 1783
      }
    }
  }
}

1784
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1785
  /*tscDebug("call poll1");*/
L
Liu Jicong 已提交
1786 1787
  void*   rspObj;
  int64_t startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1788

1789 1790 1791
#if 0
  tmqHandleAllDelayedTask(tmq);
  tmqPollImpl(tmq, timeout);
1792
  rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1793 1794
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1795
  }
1796
#endif
X
Xiaoyu Wang 已提交
1797

L
Liu Jicong 已提交
1798
  // in no topic status also need process delayed task
L
Liu Jicong 已提交
1799
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1800 1801 1802
    return NULL;
  }

X
Xiaoyu Wang 已提交
1803
  while (1) {
L
Liu Jicong 已提交
1804
    tmqHandleAllDelayedTask(tmq);
1805
    if (tmqPollImpl(tmq, timeout) < 0) return NULL;
L
Liu Jicong 已提交
1806

1807
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1808 1809
    if (rspObj) {
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
1810 1811
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      return NULL;
X
Xiaoyu Wang 已提交
1812
    }
1813
    if (timeout != -1) {
X
Xiaoyu Wang 已提交
1814
      int64_t endTime = taosGetTimestampMs();
1815
      int64_t leftTime = endTime - startTime;
1816
      if (leftTime > timeout) {
S
Shengliang Guan 已提交
1817
        tscDebug("consumer:%" PRId64 ", (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
X
Xiaoyu Wang 已提交
1818 1819
        return NULL;
      }
1820
      tsem_timewait(&tmq->rspSem, leftTime * 1000);
L
Liu Jicong 已提交
1821 1822 1823
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
      tsem_timewait(&tmq->rspSem, 500 * 1000);
X
Xiaoyu Wang 已提交
1824 1825 1826 1827
    }
  }
}

L
Liu Jicong 已提交
1828
int32_t tmq_consumer_close(tmq_t* tmq) {
1829
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
L
Liu Jicong 已提交
1830 1831
    int32_t rsp = tmq_commit_sync(tmq, NULL);
    if (rsp != 0) {
L
Liu Jicong 已提交
1832
      return rsp;
1833 1834 1835 1836
    }

    tmq_list_t* lst = tmq_list_new();
    rsp = tmq_subscribe(tmq, lst);
1837
    tmq_list_destroy(lst);
1838

L
Liu Jicong 已提交
1839
    if (rsp != 0) {
L
Liu Jicong 已提交
1840
      return rsp;
1841
    }
L
Liu Jicong 已提交
1842
  }
1843
  // TODO: free resources
L
Liu Jicong 已提交
1844
  return 0;
1845
}
L
Liu Jicong 已提交
1846

L
Liu Jicong 已提交
1847 1848
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
1849
    return "success";
L
Liu Jicong 已提交
1850
  } else if (err == -1) {
L
Liu Jicong 已提交
1851 1852 1853
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
1854 1855
  }
}
L
Liu Jicong 已提交
1856

L
Liu Jicong 已提交
1857 1858 1859 1860 1861 1862 1863 1864 1865 1866
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
    return TMQ_RES_TABLE_META;
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
1867
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
1868 1869
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
1870
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1871 1872 1873
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1874 1875 1876 1877 1878
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1879 1880 1881 1882
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1883 1884 1885
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1886 1887 1888 1889 1890
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1891 1892 1893 1894
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
1895 1896 1897
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
L
Liu Jicong 已提交
1898 1899 1900 1901
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
1902 1903 1904 1905 1906 1907 1908 1909

const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
1910
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
L
Liu Jicong 已提交
1911 1912 1913
  }
  return NULL;
}
1914

1915 1916
int32_t tmq_get_raw_meta(TAOS_RES* res, tmq_raw_data *raw) {
  if (TD_RES_TMQ_META(res) && raw) {
L
Liu Jicong 已提交
1917
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
wmmhello's avatar
wmmhello 已提交
1918 1919 1920
    raw->raw_meta = pMetaRspObj->metaRsp.metaRsp;
    raw->raw_meta_len = pMetaRspObj->metaRsp.metaRspLen;
    raw->raw_meta_type = pMetaRspObj->metaRsp.resMsgType;
1921
    return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1922
  }
1923
  return TSDB_CODE_INVALID_PARA;
L
Liu Jicong 已提交
1924 1925
}

1926 1927
static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id,
                                  int8_t t) {
wmmhello's avatar
wmmhello 已提交
1928 1929 1930 1931 1932 1933 1934
  char*  string = NULL;
  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    return string;
  }
  cJSON* type = cJSON_CreateString("create");
  cJSON_AddItemToObject(json, "type", type);
wmmhello's avatar
wmmhello 已提交
1935

1936 1937 1938 1939
  //  char uid[32] = {0};
  //  sprintf(uid, "%"PRIi64, id);
  //  cJSON* id_ = cJSON_CreateString(uid);
  //  cJSON_AddItemToObject(json, "id", id_);
wmmhello's avatar
wmmhello 已提交
1940 1941 1942 1943
  cJSON* tableName = cJSON_CreateString(name);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super");
  cJSON_AddItemToObject(json, "tableType", tableType);
1944 1945
  //  cJSON* version = cJSON_CreateNumber(1);
  //  cJSON_AddItemToObject(json, "version", version);
wmmhello's avatar
wmmhello 已提交
1946 1947

  cJSON* columns = cJSON_CreateArray();
1948 1949 1950 1951
  for (int i = 0; i < schemaRow->nCols; i++) {
    cJSON*   column = cJSON_CreateObject();
    SSchema* s = schemaRow->pSchema + i;
    cJSON*   cname = cJSON_CreateString(s->name);
wmmhello's avatar
wmmhello 已提交
1952 1953 1954
    cJSON_AddItemToObject(column, "name", cname);
    cJSON* ctype = cJSON_CreateNumber(s->type);
    cJSON_AddItemToObject(column, "type", ctype);
1955
    if (s->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
1956
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
1957
      cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1958
      cJSON_AddItemToObject(column, "length", cbytes);
1959 1960 1961
    } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
      cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1962 1963
      cJSON_AddItemToObject(column, "length", cbytes);
    }
wmmhello's avatar
wmmhello 已提交
1964 1965 1966 1967 1968
    cJSON_AddItemToArray(columns, column);
  }
  cJSON_AddItemToObject(json, "columns", columns);

  cJSON* tags = cJSON_CreateArray();
1969 1970 1971 1972
  for (int i = 0; schemaTag && i < schemaTag->nCols; i++) {
    cJSON*   tag = cJSON_CreateObject();
    SSchema* s = schemaTag->pSchema + i;
    cJSON*   tname = cJSON_CreateString(s->name);
wmmhello's avatar
wmmhello 已提交
1973 1974 1975
    cJSON_AddItemToObject(tag, "name", tname);
    cJSON* ttype = cJSON_CreateNumber(s->type);
    cJSON_AddItemToObject(tag, "type", ttype);
1976
    if (s->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
1977
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
1978
      cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1979
      cJSON_AddItemToObject(tag, "length", cbytes);
1980 1981 1982
    } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
      cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1983 1984
      cJSON_AddItemToObject(tag, "length", cbytes);
    }
wmmhello's avatar
wmmhello 已提交
1985 1986 1987 1988 1989 1990 1991 1992 1993
    cJSON_AddItemToArray(tags, tag);
  }
  cJSON_AddItemToObject(json, "tags", tags);

  string = cJSON_PrintUnformatted(json);
  cJSON_Delete(json);
  return string;
}

1994 1995 1996 1997
static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) {
  SMAlterStbReq req = {0};
  cJSON*        json = NULL;
  char*         string = NULL;
wmmhello's avatar
wmmhello 已提交
1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008

  if (tDeserializeSMAlterStbReq(alterData, alterDataLen, &req) != 0) {
    goto end;
  }

  json = cJSON_CreateObject();
  if (json == NULL) {
    goto end;
  }
  cJSON* type = cJSON_CreateString("alter");
  cJSON_AddItemToObject(json, "type", type);
2009 2010
  //  cJSON* uid = cJSON_CreateNumber(id);
  //  cJSON_AddItemToObject(json, "uid", uid);
wmmhello's avatar
wmmhello 已提交
2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022
  SName name = {0};
  tNameFromString(&name, req.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
  cJSON* tableName = cJSON_CreateString(name.tname);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString("super");
  cJSON_AddItemToObject(json, "tableType", tableType);

  cJSON* alterType = cJSON_CreateNumber(req.alterType);
  cJSON_AddItemToObject(json, "alterType", alterType);
  switch (req.alterType) {
    case TSDB_ALTER_TABLE_ADD_TAG:
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
2023 2024
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
      cJSON*      colName = cJSON_CreateString(field->name);
wmmhello's avatar
wmmhello 已提交
2025 2026 2027 2028
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colType = cJSON_CreateNumber(field->type);
      cJSON_AddItemToObject(json, "colType", colType);

2029
      if (field->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
2030
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
2031
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2032
        cJSON_AddItemToObject(json, "colLength", cbytes);
2033 2034 2035
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2036 2037 2038 2039 2040
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
      break;
    }
    case TSDB_ALTER_TABLE_DROP_TAG:
2041 2042 2043
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
      cJSON*      colName = cJSON_CreateString(field->name);
wmmhello's avatar
wmmhello 已提交
2044 2045 2046 2047
      cJSON_AddItemToObject(json, "colName", colName);
      break;
    }
    case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
2048 2049 2050
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
      cJSON*      colName = cJSON_CreateString(field->name);
wmmhello's avatar
wmmhello 已提交
2051 2052 2053
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colType = cJSON_CreateNumber(field->type);
      cJSON_AddItemToObject(json, "colType", colType);
2054
      if (field->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
2055
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
2056
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2057
        cJSON_AddItemToObject(json, "colLength", cbytes);
2058 2059 2060
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2061 2062 2063 2064 2065
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
      break;
    }
    case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
2066 2067 2068 2069
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
      TAOS_FIELD* oldField = taosArrayGet(req.pFields, 0);
      TAOS_FIELD* newField = taosArrayGet(req.pFields, 1);
      cJSON*      colName = cJSON_CreateString(oldField->name);
wmmhello's avatar
wmmhello 已提交
2070 2071 2072 2073 2074 2075 2076 2077 2078 2079
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colNewName = cJSON_CreateString(newField->name);
      cJSON_AddItemToObject(json, "colNewName", colNewName);
      break;
    }
    default:
      break;
  }
  string = cJSON_PrintUnformatted(json);

2080
end:
wmmhello's avatar
wmmhello 已提交
2081 2082 2083 2084 2085
  cJSON_Delete(json);
  tFreeSMAltertbReq(&req);
  return string;
}

2086
static char* processCreateStb(SMqMetaRsp* metaRsp) {
wmmhello's avatar
wmmhello 已提交
2087 2088
  SVCreateStbReq req = {0};
  SDecoder       coder;
2089
  char*          string = NULL;
wmmhello's avatar
wmmhello 已提交
2090 2091

  // decode and process req
2092
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2093 2094 2095 2096 2097 2098 2099 2100 2101 2102
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
    goto _err;
  }
  string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE);
  tDecoderClear(&coder);
  return string;

2103
_err:
wmmhello's avatar
wmmhello 已提交
2104 2105 2106 2107
  tDecoderClear(&coder);
  return string;
}

2108
static char* processAlterStb(SMqMetaRsp* metaRsp) {
wmmhello's avatar
wmmhello 已提交
2109 2110
  SVCreateStbReq req = {0};
  SDecoder       coder;
2111
  char*          string = NULL;
wmmhello's avatar
wmmhello 已提交
2112 2113

  // decode and process req
2114
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2115 2116 2117 2118 2119 2120 2121 2122 2123 2124
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
    goto _err;
  }
  string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen);
  tDecoderClear(&coder);
  return string;

2125
_err:
wmmhello's avatar
wmmhello 已提交
2126 2127 2128 2129
  tDecoderClear(&coder);
  return string;
}

2130 2131
static char* buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray* tagName, int64_t id) {
  char*   string = NULL;
wmmhello's avatar
wmmhello 已提交
2132
  SArray* pTagVals = NULL;
2133
  cJSON*  json = cJSON_CreateObject();
wmmhello's avatar
wmmhello 已提交
2134 2135 2136 2137 2138
  if (json == NULL) {
    return string;
  }
  cJSON* type = cJSON_CreateString("create");
  cJSON_AddItemToObject(json, "type", type);
2139 2140 2141 2142
  //  char cid[32] = {0};
  //  sprintf(cid, "%"PRIi64, id);
  //  cJSON* cid_ = cJSON_CreateString(cid);
  //  cJSON_AddItemToObject(json, "id", cid_);
wmmhello's avatar
wmmhello 已提交
2143

wmmhello's avatar
wmmhello 已提交
2144 2145 2146 2147
  cJSON* tableName = cJSON_CreateString(name);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString("child");
  cJSON_AddItemToObject(json, "tableType", tableType);
wmmhello's avatar
wmmhello 已提交
2148
  cJSON* using = cJSON_CreateString(sname);
wmmhello's avatar
wmmhello 已提交
2149
  cJSON_AddItemToObject(json, "using", using);
2150 2151
  //  cJSON* version = cJSON_CreateNumber(1);
  //  cJSON_AddItemToObject(json, "version", version);
wmmhello's avatar
wmmhello 已提交
2152

2153
  cJSON*  tags = cJSON_CreateArray();
wmmhello's avatar
wmmhello 已提交
2154 2155 2156 2157
  int32_t code = tTagToValArray(pTag, &pTagVals);
  if (code) {
    goto end;
  }
wmmhello's avatar
wmmhello 已提交
2158

wmmhello's avatar
wmmhello 已提交
2159 2160
  if (tTagIsJson(pTag)) {
    STag* p = (STag*)pTag;
2161
    if (p->nTag == 0) {
wmmhello's avatar
wmmhello 已提交
2162 2163
      goto end;
    }
2164 2165
    char*    pJson = parseTagDatatoJson(pTag);
    cJSON*   tag = cJSON_CreateObject();
wmmhello's avatar
wmmhello 已提交
2166 2167
    STagVal* pTagVal = taosArrayGet(pTagVals, 0);

wmmhello's avatar
wmmhello 已提交
2168 2169
    char*  ptname = taosArrayGet(tagName, 0);
    cJSON* tname = cJSON_CreateString(ptname);
wmmhello's avatar
wmmhello 已提交
2170
    cJSON_AddItemToObject(tag, "name", tname);
2171 2172
    //    cJSON* cid_ = cJSON_CreateString("");
    //    cJSON_AddItemToObject(tag, "cid", cid_);
wmmhello's avatar
wmmhello 已提交
2173 2174
    cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON);
    cJSON_AddItemToObject(tag, "type", ttype);
wmmhello's avatar
wmmhello 已提交
2175
    cJSON* tvalue = cJSON_CreateString(pJson);
wmmhello's avatar
wmmhello 已提交
2176 2177
    cJSON_AddItemToObject(tag, "value", tvalue);
    cJSON_AddItemToArray(tags, tag);
wmmhello's avatar
wmmhello 已提交
2178
    taosMemoryFree(pJson);
wmmhello's avatar
wmmhello 已提交
2179 2180 2181
    goto end;
  }

2182
  for (int i = 0; i < taosArrayGetSize(pTagVals); i++) {
wmmhello's avatar
wmmhello 已提交
2183 2184 2185
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);

    cJSON* tag = cJSON_CreateObject();
wmmhello's avatar
wmmhello 已提交
2186

wmmhello's avatar
wmmhello 已提交
2187 2188
    char*  ptname = taosArrayGet(tagName, i);
    cJSON* tname = cJSON_CreateString(ptname);
wmmhello's avatar
wmmhello 已提交
2189
    cJSON_AddItemToObject(tag, "name", tname);
2190 2191
    //    cJSON* cid = cJSON_CreateNumber(pTagVal->cid);
    //    cJSON_AddItemToObject(tag, "cid", cid);
wmmhello's avatar
wmmhello 已提交
2192 2193
    cJSON* ttype = cJSON_CreateNumber(pTagVal->type);
    cJSON_AddItemToObject(tag, "type", ttype);
wmmhello's avatar
wmmhello 已提交
2194

wmmhello's avatar
wmmhello 已提交
2195
    cJSON* tvalue = NULL;
wmmhello's avatar
wmmhello 已提交
2196
    if (IS_VAR_DATA_TYPE(pTagVal->type)) {
wmmhello's avatar
wmmhello 已提交
2197 2198
      char* buf = taosMemoryCalloc(pTagVal->nData + 3, 1);
      if(!buf) goto end;
wmmhello's avatar
wmmhello 已提交
2199
      dataConverToStr(buf, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL);
wmmhello's avatar
wmmhello 已提交
2200 2201
      tvalue = cJSON_CreateString(buf);
      taosMemoryFree(buf);
wmmhello's avatar
wmmhello 已提交
2202
    } else {
wmmhello's avatar
wmmhello 已提交
2203 2204 2205
      double val = 0;
      GET_TYPED_DATA(val, double, pTagVal->type, &pTagVal->i64);
      tvalue = cJSON_CreateNumber(val);
wmmhello's avatar
wmmhello 已提交
2206 2207
    }

wmmhello's avatar
wmmhello 已提交
2208 2209 2210
    cJSON_AddItemToObject(tag, "value", tvalue);
    cJSON_AddItemToArray(tags, tag);
  }
wmmhello's avatar
wmmhello 已提交
2211

2212
end:
wmmhello's avatar
wmmhello 已提交
2213 2214 2215
  cJSON_AddItemToObject(json, "tags", tags);
  string = cJSON_PrintUnformatted(json);
  cJSON_Delete(json);
wmmhello's avatar
wmmhello 已提交
2216
  taosArrayDestroy(pTagVals);
wmmhello's avatar
wmmhello 已提交
2217 2218 2219
  return string;
}

2220
static char* processCreateTable(SMqMetaRsp* metaRsp) {
wmmhello's avatar
wmmhello 已提交
2221 2222
  SDecoder           decoder = {0};
  SVCreateTbBatchReq req = {0};
2223 2224
  SVCreateTbReq*     pCreateReq;
  char*              string = NULL;
wmmhello's avatar
wmmhello 已提交
2225
  // decode
2226
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2227 2228 2229 2230 2231 2232 2233 2234 2235
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
    goto _exit;
  }

  // loop to create table
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;
2236 2237 2238 2239 2240 2241
    if (pCreateReq->type == TSDB_CHILD_TABLE) {
      string = buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.name, pCreateReq->name,
                                     pCreateReq->ctb.tagName, pCreateReq->uid);
    } else if (pCreateReq->type == TSDB_NORMAL_TABLE) {
      string =
          buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE);
wmmhello's avatar
wmmhello 已提交
2242 2243 2244 2245 2246
    }
  }

  tDecoderClear(&decoder);

2247
_exit:
wmmhello's avatar
wmmhello 已提交
2248 2249 2250 2251
  tDecoderClear(&decoder);
  return string;
}

2252 2253 2254 2255
static char* processAlterTable(SMqMetaRsp* metaRsp) {
  SDecoder     decoder = {0};
  SVAlterTbReq vAlterTbReq = {0};
  char*        string = NULL;
wmmhello's avatar
wmmhello 已提交
2256 2257

  // decode
2258
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVAlterTbReq(&decoder, &vAlterTbReq) < 0) {
    goto _exit;
  }

  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    goto _exit;
  }
  cJSON* type = cJSON_CreateString("alter");
  cJSON_AddItemToObject(json, "type", type);
2271 2272
  //  cJSON* uid = cJSON_CreateNumber(id);
  //  cJSON_AddItemToObject(json, "uid", uid);
wmmhello's avatar
wmmhello 已提交
2273 2274
  cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName);
  cJSON_AddItemToObject(json, "tableName", tableName);
wmmhello's avatar
wmmhello 已提交
2275
  cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ? "child" : "normal");
wmmhello's avatar
wmmhello 已提交
2276
  cJSON_AddItemToObject(json, "tableType", tableType);
wmmhello's avatar
wmmhello 已提交
2277 2278
  cJSON* alterType = cJSON_CreateNumber(vAlterTbReq.action);
  cJSON_AddItemToObject(json, "alterType", alterType);
wmmhello's avatar
wmmhello 已提交
2279 2280 2281 2282 2283 2284 2285

  switch (vAlterTbReq.action) {
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
      cJSON_AddItemToObject(json, "colType", colType);
wmmhello's avatar
wmmhello 已提交
2286

2287
      if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
2288
        int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
2289
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2290
        cJSON_AddItemToObject(json, "colLength", cbytes);
2291 2292 2293
      } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
        int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2294 2295
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
wmmhello's avatar
wmmhello 已提交
2296 2297
      break;
    }
2298
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
wmmhello's avatar
wmmhello 已提交
2299 2300 2301 2302
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
      break;
    }
2303
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
wmmhello's avatar
wmmhello 已提交
2304 2305
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
wmmhello's avatar
wmmhello 已提交
2306
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.colModType);
wmmhello's avatar
wmmhello 已提交
2307
      cJSON_AddItemToObject(json, "colType", colType);
2308
      if (vAlterTbReq.colModType == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
2309
        int32_t length = vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE;
2310
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2311
        cJSON_AddItemToObject(json, "colLength", cbytes);
2312 2313 2314
      } else if (vAlterTbReq.colModType == TSDB_DATA_TYPE_NCHAR) {
        int32_t length = (vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2315 2316
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
wmmhello's avatar
wmmhello 已提交
2317 2318
      break;
    }
2319
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
wmmhello's avatar
wmmhello 已提交
2320 2321 2322 2323 2324 2325
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colNewName = cJSON_CreateString(vAlterTbReq.colNewName);
      cJSON_AddItemToObject(json, "colNewName", colNewName);
      break;
    }
2326
    case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: {
wmmhello's avatar
wmmhello 已提交
2327 2328
      cJSON* tagName = cJSON_CreateString(vAlterTbReq.tagName);
      cJSON_AddItemToObject(json, "colName", tagName);
wmmhello's avatar
wmmhello 已提交
2329

wmmhello's avatar
wmmhello 已提交
2330
      bool isNull = vAlterTbReq.isNull;
2331 2332 2333
      if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
        STag* jsonTag = (STag*)vAlterTbReq.pTagVal;
        if (jsonTag->nTag == 0) isNull = true;
wmmhello's avatar
wmmhello 已提交
2334
      }
2335
      if (!isNull) {
wmmhello's avatar
wmmhello 已提交
2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350
        char* buf = NULL;

        if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
          ASSERT(tTagIsJson(vAlterTbReq.pTagVal) == true);
          buf = parseTagDatatoJson(vAlterTbReq.pTagVal);
        } else {
          buf = taosMemoryCalloc(vAlterTbReq.nTagVal + 1, 1);
          dataConverToStr(buf, vAlterTbReq.tagType, vAlterTbReq.pTagVal, vAlterTbReq.nTagVal, NULL);
        }

        cJSON* colValue = cJSON_CreateString(buf);
        cJSON_AddItemToObject(json, "colValue", colValue);
        taosMemoryFree(buf);
      }

wmmhello's avatar
wmmhello 已提交
2351 2352
      cJSON* isNullCJson = cJSON_CreateBool(isNull);
      cJSON_AddItemToObject(json, "colValueNull", isNullCJson);
wmmhello's avatar
wmmhello 已提交
2353 2354
      break;
    }
wmmhello's avatar
wmmhello 已提交
2355 2356 2357 2358 2359
    default:
      break;
  }
  string = cJSON_PrintUnformatted(json);

2360
_exit:
wmmhello's avatar
wmmhello 已提交
2361 2362 2363 2364
  tDecoderClear(&decoder);
  return string;
}

2365 2366 2367 2368
static char* processDropSTable(SMqMetaRsp* metaRsp) {
  SDecoder     decoder = {0};
  SVDropStbReq req = {0};
  char*        string = NULL;
wmmhello's avatar
wmmhello 已提交
2369 2370

  // decode
2371
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
    goto _exit;
  }

  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    goto _exit;
  }
  cJSON* type = cJSON_CreateString("drop");
  cJSON_AddItemToObject(json, "type", type);
  cJSON* tableName = cJSON_CreateString(req.name);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString("super");
  cJSON_AddItemToObject(json, "tableType", tableType);

  string = cJSON_PrintUnformatted(json);

2391
_exit:
wmmhello's avatar
wmmhello 已提交
2392 2393 2394 2395
  tDecoderClear(&decoder);
  return string;
}

2396 2397 2398 2399
static char* processDropTable(SMqMetaRsp* metaRsp) {
  SDecoder         decoder = {0};
  SVDropTbBatchReq req = {0};
  char*            string = NULL;
wmmhello's avatar
wmmhello 已提交
2400 2401

  // decode
2402
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVDropTbBatchReq(&decoder, &req) < 0) {
    goto _exit;
  }

  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    goto _exit;
  }
  cJSON* type = cJSON_CreateString("drop");
  cJSON_AddItemToObject(json, "type", type);
2415 2416 2417 2418
  //  cJSON* uid = cJSON_CreateNumber(id);
  //  cJSON_AddItemToObject(json, "uid", uid);
  //  cJSON* tableType = cJSON_CreateString("normal");
  //  cJSON_AddItemToObject(json, "tableType", tableType);
wmmhello's avatar
wmmhello 已提交
2419 2420 2421 2422 2423

  cJSON* tableNameList = cJSON_CreateArray();
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;

wmmhello's avatar
wmmhello 已提交
2424
    cJSON* tableName = cJSON_CreateString(pDropTbReq->name);
wmmhello's avatar
wmmhello 已提交
2425 2426 2427 2428 2429 2430
    cJSON_AddItemToArray(tableNameList, tableName);
  }
  cJSON_AddItemToObject(json, "tableNameList", tableNameList);

  string = cJSON_PrintUnformatted(json);

2431
_exit:
wmmhello's avatar
wmmhello 已提交
2432 2433 2434 2435
  tDecoderClear(&decoder);
  return string;
}

2436
char* tmq_get_json_meta(TAOS_RES* res) {
wmmhello's avatar
wmmhello 已提交
2437 2438 2439 2440 2441
  if (!TD_RES_TMQ_META(res)) {
    return NULL;
  }

  SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
2442
  if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB) {
wmmhello's avatar
wmmhello 已提交
2443
    return processCreateStb(&pMetaRspObj->metaRsp);
2444
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_STB) {
wmmhello's avatar
wmmhello 已提交
2445
    return processAlterStb(&pMetaRspObj->metaRsp);
2446
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_STB) {
wmmhello's avatar
wmmhello 已提交
2447
    return processDropSTable(&pMetaRspObj->metaRsp);
2448
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_TABLE) {
wmmhello's avatar
wmmhello 已提交
2449
    return processCreateTable(&pMetaRspObj->metaRsp);
2450
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_TABLE) {
wmmhello's avatar
wmmhello 已提交
2451
    return processAlterTable(&pMetaRspObj->metaRsp);
2452
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE) {
wmmhello's avatar
wmmhello 已提交
2453 2454 2455 2456 2457
    return processDropTable(&pMetaRspObj->metaRsp);
  }
  return NULL;
}

2458
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
wmmhello's avatar
wmmhello 已提交
2459

2460
static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
wmmhello's avatar
wmmhello 已提交
2461 2462 2463
  SVCreateStbReq req = {0};
  SDecoder       coder;
  SMCreateStbReq pReq = {0};
2464 2465
  int32_t        code = TSDB_CODE_SUCCESS;
  SRequestObj*   pRequest = NULL;
wmmhello's avatar
wmmhello 已提交
2466

2467
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2468 2469 2470 2471
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2472
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2473 2474 2475 2476
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2477
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2478 2479 2480 2481 2482 2483 2484 2485
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }
  // build create stable
  pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SField));
2486
  for (int32_t i = 0; i < req.schemaRow.nCols; i++) {
wmmhello's avatar
wmmhello 已提交
2487
    SSchema* pSchema = req.schemaRow.pSchema + i;
2488
    SField   field = {.type = pSchema->type, .bytes = pSchema->bytes};
wmmhello's avatar
wmmhello 已提交
2489 2490 2491 2492
    strcpy(field.name, pSchema->name);
    taosArrayPush(pReq.pColumns, &field);
  }
  pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField));
2493
  for (int32_t i = 0; i < req.schemaTag.nCols; i++) {
wmmhello's avatar
wmmhello 已提交
2494
    SSchema* pSchema = req.schemaTag.pSchema + i;
2495
    SField   field = {.type = pSchema->type, .bytes = pSchema->bytes};
wmmhello's avatar
wmmhello 已提交
2496 2497 2498
    strcpy(field.name, pSchema->name);
    taosArrayPush(pReq.pTags, &field);
  }
2499

2500 2501
  pReq.colVer = req.schemaRow.version;
  pReq.tagVer = req.schemaTag.version;
wmmhello's avatar
wmmhello 已提交
2502 2503 2504 2505
  pReq.numOfColumns = req.schemaRow.nCols;
  pReq.numOfTags = req.schemaTag.nCols;
  pReq.commentLen = -1;
  pReq.suid = req.suid;
wmmhello's avatar
wmmhello 已提交
2506
  pReq.source = TD_REQ_FROM_TAOX;
wmmhello's avatar
wmmhello 已提交
2507
  pReq.igExists = true;
wmmhello's avatar
wmmhello 已提交
2508

2509
  STscObj* pTscObj = pRequest->pTscObj;
2510
  SName    tableName;
wmmhello's avatar
wmmhello 已提交
2511
  tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
wmmhello's avatar
wmmhello 已提交
2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523

  SCmdMsgInfo pCmdMsg = {0};
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
  if (NULL == pCmdMsg.pMsg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);

2524
  SQuery pQuery = {0};
wmmhello's avatar
wmmhello 已提交
2525 2526 2527 2528 2529 2530 2531 2532 2533
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
  pQuery.pCmdMsg = &pCmdMsg;
  pQuery.msgType = pQuery.pCmdMsg->msgType;
  pQuery.stableQuery = true;

  launchQueryImpl(pRequest, &pQuery, true, NULL);
  code = pRequest->code;
  taosMemoryFree(pCmdMsg.pMsg);

2534
end:
wmmhello's avatar
wmmhello 已提交
2535 2536 2537 2538 2539 2540
  destroyRequest(pRequest);
  tFreeSMCreateStbReq(&pReq);
  tDecoderClear(&coder);
  return code;
}

2541
static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
wmmhello's avatar
wmmhello 已提交
2542 2543 2544
  SVDropStbReq req = {0};
  SDecoder     coder;
  SMDropStbReq pReq = {0};
2545
  int32_t      code = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2546 2547
  SRequestObj* pRequest = NULL;

2548
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2549 2550 2551 2552
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2553
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2554 2555 2556 2557
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2558
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2559 2560 2561 2562 2563 2564 2565 2566 2567
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVDropStbReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

  // build drop stable
  pReq.igNotExists = true;
wmmhello's avatar
wmmhello 已提交
2568
  pReq.source = TD_REQ_FROM_TAOX;
wmmhello's avatar
wmmhello 已提交
2569
  pReq.suid = req.suid;
2570 2571

  STscObj* pTscObj = pRequest->pTscObj;
2572
  SName    tableName;
wmmhello's avatar
wmmhello 已提交
2573
  tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
wmmhello's avatar
wmmhello 已提交
2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585

  SCmdMsgInfo pCmdMsg = {0};
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  pCmdMsg.msgType = TDMT_MND_DROP_STB;
  pCmdMsg.msgLen = tSerializeSMDropStbReq(NULL, 0, &pReq);
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
  if (NULL == pCmdMsg.pMsg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  tSerializeSMDropStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);

2586
  SQuery pQuery = {0};
wmmhello's avatar
wmmhello 已提交
2587 2588 2589 2590 2591 2592 2593 2594 2595
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
  pQuery.pCmdMsg = &pCmdMsg;
  pQuery.msgType = pQuery.pCmdMsg->msgType;
  pQuery.stableQuery = true;

  launchQueryImpl(pRequest, &pQuery, true, NULL);
  code = pRequest->code;
  taosMemoryFree(pCmdMsg.pMsg);

2596
end:
wmmhello's avatar
wmmhello 已提交
2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  return code;
}

typedef struct SVgroupCreateTableBatch {
  SVCreateTbBatchReq req;
  SVgroupInfo        info;
  char               dbName[TSDB_DB_NAME_LEN];
} SVgroupCreateTableBatch;

static void destroyCreateTbReqBatch(void* data) {
2609
  SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*)data;
wmmhello's avatar
wmmhello 已提交
2610 2611 2612
  taosArrayDestroy(pTbBatch->req.pArray);
}

L
Liu Jicong 已提交
2613 2614 2615 2616 2617 2618 2619
static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
  SVCreateTbBatchReq req = {0};
  SDecoder           coder = {0};
  int32_t            code = TSDB_CODE_SUCCESS;
  SRequestObj*       pRequest = NULL;
  SQuery*            pQuery = NULL;
  SHashObj*          pVgroupHashmap = NULL;
wmmhello's avatar
wmmhello 已提交
2620

L
Liu Jicong 已提交
2621
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2622 2623 2624 2625
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2626
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2627 2628 2629 2630
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2631
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2632 2633 2634 2635 2636 2637 2638
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

2639 2640
  STscObj* pTscObj = pRequest->pTscObj;

L
Liu Jicong 已提交
2641 2642
  SVCreateTbReq* pCreateReq = NULL;
  SCatalog*      pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
  if (NULL == pVgroupHashmap) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);

  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
2656 2657 2658
                           .requestId = pRequest->requestId,
                           .requestObjRefId = pRequest->self,
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
wmmhello's avatar
wmmhello 已提交
2659 2660 2661 2662 2663
  // loop to create table
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;

    SVgroupInfo pInfo = {0};
2664
    SName       pName;
wmmhello's avatar
wmmhello 已提交
2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695
    toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName);
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
    if (code != TSDB_CODE_SUCCESS) {
      goto end;
    }

    SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
    if (pTableBatch == NULL) {
      SVgroupCreateTableBatch tBatch = {0};
      tBatch.info = pInfo;
      strcpy(tBatch.dbName, pRequest->pDb);

      tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
      taosArrayPush(tBatch.req.pArray, pCreateReq);

      taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch));
    } else {  // add to the correct vgroup
      taosArrayPush(pTableBatch->req.pArray, pCreateReq);
    }
  }

  SArray* pBufArray = serializeVgroupsCreateTableBatch(pVgroupHashmap);
  if (NULL == pBufArray) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->msgType = TDMT_VND_CREATE_TABLE;
  pQuery->stableQuery = false;
2696
  pQuery->pRoot = nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT);
wmmhello's avatar
wmmhello 已提交
2697 2698 2699 2700 2701 2702 2703

  code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  launchQueryImpl(pRequest, pQuery, false, NULL);
2704 2705
  pQuery = NULL;  // no need to free in the end
  code = pRequest->code;
wmmhello's avatar
wmmhello 已提交
2706

2707
end:
wmmhello's avatar
wmmhello 已提交
2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725
  taosHashCleanup(pVgroupHashmap);
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  qDestroyQuery(pQuery);
  return code;
}

typedef struct SVgroupDropTableBatch {
  SVDropTbBatchReq req;
  SVgroupInfo      info;
  char             dbName[TSDB_DB_NAME_LEN];
} SVgroupDropTableBatch;

static void destroyDropTbReqBatch(void* data) {
  SVgroupDropTableBatch* pTbBatch = (SVgroupDropTableBatch*)data;
  taosArrayDestroy(pTbBatch->req.pArray);
}

L
Liu Jicong 已提交
2726 2727 2728 2729 2730 2731 2732
static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
  SVDropTbBatchReq req = {0};
  SDecoder         coder = {0};
  int32_t          code = TSDB_CODE_SUCCESS;
  SRequestObj*     pRequest = NULL;
  SQuery*          pQuery = NULL;
  SHashObj*        pVgroupHashmap = NULL;
wmmhello's avatar
wmmhello 已提交
2733

2734
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2735 2736 2737 2738
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2739
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2740 2741 2742 2743
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2744
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2745 2746 2747 2748 2749 2750 2751
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVDropTbBatchReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

2752 2753
  STscObj* pTscObj = pRequest->pTscObj;

L
Liu Jicong 已提交
2754 2755
  SVDropTbReq* pDropReq = NULL;
  SCatalog*    pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2756 2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
  if (NULL == pVgroupHashmap) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);

  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
2769 2770 2771
                           .requestId = pRequest->requestId,
                           .requestObjRefId = pRequest->self,
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
wmmhello's avatar
wmmhello 已提交
2772 2773 2774
  // loop to create table
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pDropReq = req.pReqs + iReq;
wmmhello's avatar
wmmhello 已提交
2775
    pDropReq->igNotExists = true;
wmmhello's avatar
wmmhello 已提交
2776 2777

    SVgroupInfo pInfo = {0};
2778
    SName       pName;
wmmhello's avatar
wmmhello 已提交
2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807
    toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName);
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
    if (code != TSDB_CODE_SUCCESS) {
      goto end;
    }

    SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
    if (pTableBatch == NULL) {
      SVgroupDropTableBatch tBatch = {0};
      tBatch.info = pInfo;
      tBatch.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
      taosArrayPush(tBatch.req.pArray, pDropReq);

      taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch));
    } else {  // add to the correct vgroup
      taosArrayPush(pTableBatch->req.pArray, pDropReq);
    }
  }

  SArray* pBufArray = serializeVgroupsDropTableBatch(pVgroupHashmap);
  if (NULL == pBufArray) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->msgType = TDMT_VND_DROP_TABLE;
  pQuery->stableQuery = false;
2808
  pQuery->pRoot = nodesMakeNode(QUERY_NODE_DROP_TABLE_STMT);
wmmhello's avatar
wmmhello 已提交
2809 2810 2811 2812 2813 2814 2815

  code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  launchQueryImpl(pRequest, pQuery, false, NULL);
2816 2817
  pQuery = NULL;  // no need to free in the end
  code = pRequest->code;
wmmhello's avatar
wmmhello 已提交
2818

2819
end:
wmmhello's avatar
wmmhello 已提交
2820 2821 2822 2823 2824 2825 2826
  taosHashCleanup(pVgroupHashmap);
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  qDestroyQuery(pQuery);
  return code;
}

2827 2828 2829 2830 2831 2832 2833 2834
static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
  SVAlterTbReq   req = {0};
  SDecoder       coder = {0};
  int32_t        code = TSDB_CODE_SUCCESS;
  SRequestObj*   pRequest = NULL;
  SQuery*        pQuery = NULL;
  SArray*        pArray = NULL;
  SVgDataBlocks* pVgData = NULL;
wmmhello's avatar
wmmhello 已提交
2835

2836
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2837 2838 2839 2840 2841

  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2842
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2843 2844 2845 2846
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2847
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2848 2849 2850 2851 2852 2853 2854 2855
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVAlterTbReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

  // do not deal TSDB_ALTER_TABLE_UPDATE_OPTIONS
2856
  if (req.action == TSDB_ALTER_TABLE_UPDATE_OPTIONS) {
wmmhello's avatar
wmmhello 已提交
2857 2858 2859
    goto end;
  }

2860
  STscObj*  pTscObj = pRequest->pTscObj;
2861
  SCatalog* pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2862 2863 2864 2865 2866 2867
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
2868 2869 2870
                           .requestId = pRequest->requestId,
                           .requestObjRefId = pRequest->self,
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
wmmhello's avatar
wmmhello 已提交
2871 2872

  SVgroupInfo pInfo = {0};
2873
  SName       pName = {0};
wmmhello's avatar
wmmhello 已提交
2874 2875 2876 2877 2878 2879 2880 2881 2882 2883 2884 2885 2886 2887 2888 2889 2890 2891 2892 2893 2894 2895 2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906
  toName(pTscObj->acctId, pRequest->pDb, req.tbName, &pName);
  code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  pArray = taosArrayInit(1, sizeof(void*));
  if (NULL == pArray) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }

  pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
  if (NULL == pVgData) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  pVgData->vg = pInfo;
  pVgData->pData = taosMemoryMalloc(metaLen);
  if (NULL == pVgData->pData) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  memcpy(pVgData->pData, meta, metaLen);
  ((SMsgHead*)pVgData->pData)->vgId = htonl(pInfo.vgId);
  pVgData->size = metaLen;
  pVgData->numOfTables = 1;
  taosArrayPush(pArray, &pVgData);

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->msgType = TDMT_VND_ALTER_TABLE;
  pQuery->stableQuery = false;
2907
  pQuery->pRoot = nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT);
wmmhello's avatar
wmmhello 已提交
2908 2909 2910 2911 2912 2913 2914

  code = rewriteToVnodeModifyOpStmt(pQuery, pArray);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  launchQueryImpl(pRequest, pQuery, false, NULL);
2915
  pQuery = NULL;  // no need to free in the end
wmmhello's avatar
wmmhello 已提交
2916
  pVgData = NULL;
2917 2918 2919
  pArray = NULL;
  code = pRequest->code;
  if (code == TSDB_CODE_VND_TABLE_NOT_EXIST) {
wmmhello's avatar
wmmhello 已提交
2920 2921
    code = 0;
  }
wmmhello's avatar
wmmhello 已提交
2922

2923
end:
wmmhello's avatar
wmmhello 已提交
2924
  taosArrayDestroy(pArray);
2925
  if (pVgData) taosMemoryFreeClear(pVgData->pData);
wmmhello's avatar
wmmhello 已提交
2926 2927 2928 2929 2930 2931 2932
  taosMemoryFreeClear(pVgData);
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  qDestroyQuery(pQuery);
  return code;
}

2933 2934
int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data raw_meta){
  if (!taos) {
wmmhello's avatar
wmmhello 已提交
2935 2936 2937
    return TSDB_CODE_INVALID_PARA;
  }

2938 2939 2940 2941 2942 2943 2944 2945 2946 2947 2948 2949
  if(raw_meta.raw_meta_type == TDMT_VND_CREATE_STB) {
    return taosCreateStb(taos, raw_meta.raw_meta, raw_meta.raw_meta_len);
  }else if(raw_meta.raw_meta_type == TDMT_VND_ALTER_STB){
    return taosCreateStb(taos, raw_meta.raw_meta, raw_meta.raw_meta_len);
  }else if(raw_meta.raw_meta_type == TDMT_VND_DROP_STB){
    return taosDropStb(taos, raw_meta.raw_meta, raw_meta.raw_meta_len);
  }else if(raw_meta.raw_meta_type == TDMT_VND_CREATE_TABLE){
    return taosCreateTable(taos, raw_meta.raw_meta, raw_meta.raw_meta_len);
  }else if(raw_meta.raw_meta_type == TDMT_VND_ALTER_TABLE){
    return taosAlterTable(taos, raw_meta.raw_meta, raw_meta.raw_meta_len);
  }else if(raw_meta.raw_meta_type == TDMT_VND_DROP_TABLE){
    return taosDropTable(taos, raw_meta.raw_meta, raw_meta.raw_meta_len);
wmmhello's avatar
wmmhello 已提交
2950 2951 2952 2953
  }
  return TSDB_CODE_INVALID_PARA;
}

2954 2955
void tmq_free_raw_meta(tmq_raw_data* rawMeta) {
  //
wmmhello's avatar
wmmhello 已提交
2956 2957 2958
  taosMemoryFreeClear(rawMeta);
}

L
Liu Jicong 已提交
2959
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
2960
  //
L
Liu Jicong 已提交
2961
  tmqCommitInner2(tmq, msg, 0, 1, cb, param);
L
Liu Jicong 已提交
2962 2963
}

2964 2965 2966 2967
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) {
  //
  return tmqCommitInner2(tmq, msg, 0, 0, NULL, NULL);
}