tmq.c 90.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "cJSON.h"
17 18 19
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22 23
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
24
#include "tqueue.h"
25
#include "tref.h"
L
Liu Jicong 已提交
26 27
#include "ttimer.h"

L
Liu Jicong 已提交
28 29 30 31 32 33 34 35
int32_t tmqAskEp(tmq_t* tmq, bool async);

typedef struct {
  int8_t inited;
  tmr_h  timer;
} SMqMgmt;

static SMqMgmt tmqMgmt = {0};
36

L
Liu Jicong 已提交
37 38 39 40 41 42
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
43 44 45
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
46 47
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
48
struct tmq_list_t {
L
Liu Jicong 已提交
49
  SArray container;
L
Liu Jicong 已提交
50
};
L
Liu Jicong 已提交
51

L
Liu Jicong 已提交
52
struct tmq_conf_t {
53 54 55 56 57 58 59 60 61 62
  char    clientId[256];
  char    groupId[TSDB_CGROUP_LEN];
  int8_t  autoCommit;
  int8_t  resetOffset;
  int8_t  withTbName;
  int8_t  ssEnable;
  int32_t ssBatchSize;

  bool hbBgEnable;

63 64 65 66 67
  uint16_t       port;
  int32_t        autoCommitInterval;
  char*          ip;
  char*          user;
  char*          pass;
68
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
69
  void*          commitCbUserParam;
L
Liu Jicong 已提交
70 71 72
};

struct tmq_t {
L
Liu Jicong 已提交
73
  // conf
74 75 76 77 78 79 80 81 82 83 84
  char    groupId[TSDB_CGROUP_LEN];
  char    clientId[256];
  int8_t  withTbName;
  int8_t  useSnapshot;
  int8_t  autoCommit;
  int32_t autoCommitInterval;
  int32_t resetOffsetCfg;
  int64_t consumerId;

  bool hbBgEnable;

L
Liu Jicong 已提交
85 86
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
87 88 89 90

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
91 92
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
93
  int32_t epSkipCnt;
L
Liu Jicong 已提交
94
#endif
L
Liu Jicong 已提交
95 96
  int64_t pollCnt;

L
Liu Jicong 已提交
97
  // timer
98 99
  tmr_h hbLiveTimer;
  tmr_h epTimer;
L
Liu Jicong 已提交
100 101 102
  tmr_h reportTimer;
  tmr_h commitTimer;

L
Liu Jicong 已提交
103 104 105 106
  // connection
  STscObj* pTscObj;

  // container
L
Liu Jicong 已提交
107
  SArray*     clientTopics;  // SArray<SMqClientTopic>
L
Liu Jicong 已提交
108
  STaosQueue* mqueue;        // queue of rsp
L
Liu Jicong 已提交
109
  STaosQall*  qall;
L
Liu Jicong 已提交
110 111 112 113
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit

  // ctl
  tsem_t rspSem;
L
Liu Jicong 已提交
114 115
};

L
Liu Jicong 已提交
116 117
struct tmq_raw_data {
  void*   raw_meta;
wmmhello's avatar
wmmhello 已提交
118 119 120 121
  int32_t raw_meta_len;
  int16_t raw_meta_type;
};

X
Xiaoyu Wang 已提交
122 123 124 125 126 127 128 129
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
130
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
131 132
};

L
Liu Jicong 已提交
133
enum {
134
  TMQ_DELAYED_TASK__ASK_EP = 1,
L
Liu Jicong 已提交
135 136 137 138
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

L
Liu Jicong 已提交
139
typedef struct {
140 141 142
  // statistics
  int64_t pollCnt;
  // offset
L
Liu Jicong 已提交
143 144 145 146
  /*int64_t      committedOffset;*/
  /*int64_t      currentOffset;*/
  STqOffsetVal committedOffsetNew;
  STqOffsetVal currentOffsetNew;
L
Liu Jicong 已提交
147
  // connection info
148
  int32_t vgId;
X
Xiaoyu Wang 已提交
149
  int32_t vgStatus;
L
Liu Jicong 已提交
150
  int32_t vgSkipCnt;
151 152 153
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
154
typedef struct {
155
  // subscribe info
L
Liu Jicong 已提交
156
  char* topicName;
L
Liu Jicong 已提交
157
  char  db[TSDB_DB_FNAME_LEN];
L
Liu Jicong 已提交
158 159 160

  SArray* vgs;  // SArray<SMqClientVg>

L
Liu Jicong 已提交
161 162
  int8_t         isSchemaAdaptive;
  SSchemaWrapper schema;
163 164
} SMqClientTopic;

L
Liu Jicong 已提交
165 166 167 168 169
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
170
  union {
L
Liu Jicong 已提交
171 172
    SMqDataRsp dataRsp;
    SMqMetaRsp metaRsp;
L
Liu Jicong 已提交
173
  };
L
Liu Jicong 已提交
174 175
} SMqPollRspWrapper;

L
Liu Jicong 已提交
176
typedef struct {
L
Liu Jicong 已提交
177 178 179
  tmq_t*  tmq;
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
180
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
181

L
Liu Jicong 已提交
182
typedef struct {
183
  tmq_t*  tmq;
L
Liu Jicong 已提交
184
  int32_t code;
L
Liu Jicong 已提交
185
  int32_t async;
X
Xiaoyu Wang 已提交
186
  tsem_t  rspSem;
187 188
} SMqAskEpCbParam;

L
Liu Jicong 已提交
189
typedef struct {
L
Liu Jicong 已提交
190 191
  tmq_t*          tmq;
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
192
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
193
  int32_t         epoch;
L
Liu Jicong 已提交
194
  int32_t         vgId;
L
Liu Jicong 已提交
195
  tsem_t          rspSem;
X
Xiaoyu Wang 已提交
196
} SMqPollCbParam;
197

L
Liu Jicong 已提交
198
#if 0
L
Liu Jicong 已提交
199
typedef struct {
L
Liu Jicong 已提交
200
  tmq_t*         tmq;
L
Liu Jicong 已提交
201 202
  int8_t         async;
  int8_t         automatic;
L
Liu Jicong 已提交
203
  int8_t         freeOffsets;
L
Liu Jicong 已提交
204
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
205
  tsem_t         rspSem;
L
Liu Jicong 已提交
206
  int32_t        rspErr;
L
Liu Jicong 已提交
207
  SArray*        offsets;
L
Liu Jicong 已提交
208
  void*          userParam;
L
Liu Jicong 已提交
209
} SMqCommitCbParam;
L
Liu Jicong 已提交
210
#endif
L
Liu Jicong 已提交
211

212
typedef struct {
L
Liu Jicong 已提交
213 214 215 216
  tmq_t* tmq;
  int8_t automatic;
  int8_t async;
  /*int8_t         freeOffsets;*/
L
Liu Jicong 已提交
217 218
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
L
Liu Jicong 已提交
219
  int32_t        rspErr;
220
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
221 222 223 224
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
  void*  userParam;
  tsem_t rspSem;
225 226 227 228 229 230 231
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
  STqOffset*           pOffset;
} SMqCommitCbParam2;

232
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
233
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
234
  conf->withTbName = false;
L
Liu Jicong 已提交
235
  conf->autoCommit = true;
L
Liu Jicong 已提交
236
  conf->autoCommitInterval = 5000;
X
Xiaoyu Wang 已提交
237
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
238 239 240
  return conf;
}

L
Liu Jicong 已提交
241
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
242 243 244 245 246 247
  if (conf) {
    if (conf->ip) taosMemoryFree(conf->ip);
    if (conf->user) taosMemoryFree(conf->user);
    if (conf->pass) taosMemoryFree(conf->pass);
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
248 249 250
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
251 252
  if (strcmp(key, "group.id") == 0) {
    strcpy(conf->groupId, value);
L
Liu Jicong 已提交
253
    return TMQ_CONF_OK;
254
  }
L
Liu Jicong 已提交
255

256 257
  if (strcmp(key, "client.id") == 0) {
    strcpy(conf->clientId, value);
L
Liu Jicong 已提交
258 259
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
260

L
Liu Jicong 已提交
261 262
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
263
      conf->autoCommit = true;
L
Liu Jicong 已提交
264 265
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
266
      conf->autoCommit = false;
L
Liu Jicong 已提交
267 268 269 270
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
271
  }
L
Liu Jicong 已提交
272

L
Liu Jicong 已提交
273 274 275 276 277
  if (strcmp(key, "auto.commit.interval.ms") == 0) {
    conf->autoCommitInterval = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
278 279 280 281 282 283 284 285 286 287 288 289 290 291
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
292

293 294
  if (strcmp(key, "msg.with.table.name") == 0) {
    if (strcmp(value, "true") == 0) {
295
      conf->withTbName = true;
L
Liu Jicong 已提交
296
      return TMQ_CONF_OK;
297
    } else if (strcmp(value, "false") == 0) {
298
      conf->withTbName = false;
L
Liu Jicong 已提交
299
      return TMQ_CONF_OK;
300 301 302 303 304
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
305
  if (strcmp(key, "experimental.snapshot.enable") == 0) {
L
Liu Jicong 已提交
306
    if (strcmp(value, "true") == 0) {
307 308 309 310 311 312 313 314 315 316 317 318 319
      conf->ssEnable = true;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
      conf->ssEnable = false;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

  if (strcmp(key, "enable.heartbeat.background") == 0) {
    if (strcmp(value, "true") == 0) {
      conf->hbBgEnable = true;
L
Liu Jicong 已提交
320 321
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
322
      conf->hbBgEnable = false;
L
Liu Jicong 已提交
323 324 325 326
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
327
    return TMQ_CONF_OK;
L
Liu Jicong 已提交
328 329
  }

L
Liu Jicong 已提交
330
  if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
331
    conf->ssBatchSize = atoi(value);
L
Liu Jicong 已提交
332 333 334
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
335
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
336 337 338
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
339
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
340 341 342
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
343
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
344 345 346
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
347
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
348 349 350
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
351
  if (strcmp(key, "td.connect.db") == 0) {
352
    /*conf->db = strdup(value);*/
L
Liu Jicong 已提交
353 354 355
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
356
  return TMQ_CONF_UNKNOWN;
357 358 359
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
360 361
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
362 363
}

L
Liu Jicong 已提交
364 365
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
366
  char*   topic = strdup(src);
L
fix  
Liu Jicong 已提交
367
  if (taosArrayPush(container, &topic) == NULL) return -1;
368 369 370
  return 0;
}

L
Liu Jicong 已提交
371
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
372
  SArray* container = &list->container;
L
Liu Jicong 已提交
373
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
374 375
}

L
Liu Jicong 已提交
376 377 378 379 380 381 382 383 384 385
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

L
Liu Jicong 已提交
386 387 388 389
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

L
Liu Jicong 已提交
390
#if 0
L
Liu Jicong 已提交
391 392
int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
  SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
L
Liu Jicong 已提交
393
  pParam->rspErr = code;
L
Liu Jicong 已提交
394 395
  if (pParam->async) {
    if (pParam->automatic && pParam->tmq->commitCb) {
L
Liu Jicong 已提交
396
      pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, pParam->tmq->commitCbUserParam);
L
Liu Jicong 已提交
397
    } else if (!pParam->automatic && pParam->userCb) {
L
Liu Jicong 已提交
398
      pParam->userCb(pParam->tmq, pParam->rspErr, pParam->userParam);
L
Liu Jicong 已提交
399 400
    }

L
Liu Jicong 已提交
401
    if (pParam->freeOffsets) {
L
Liu Jicong 已提交
402 403 404 405 406 407 408 409 410
      taosArrayDestroy(pParam->offsets);
    }

    taosMemoryFree(pParam);
  } else {
    tsem_post(&pParam->rspSem);
  }
  return 0;
}
L
Liu Jicong 已提交
411
#endif
L
Liu Jicong 已提交
412

D
dapan1121 已提交
413
int32_t tmqCommitCb2(void* param, SDataBuf* pBuf, int32_t code) {
414 415 416
  SMqCommitCbParam2*   pParam = (SMqCommitCbParam2*)param;
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
  // push into array
L
Liu Jicong 已提交
417
#if 0
418 419 420 421 422
  if (code == 0) {
    taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset);
  } else {
    taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
  }
L
Liu Jicong 已提交
423
#endif
L
Liu Jicong 已提交
424

S
Shengliang Guan 已提交
425
  /*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
L
Liu Jicong 已提交
426 427
   * pOffset->version);*/

428
  // count down waiting rsp
L
Liu Jicong 已提交
429
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
430 431 432 433 434 435 436
  ASSERT(waitingRspNum >= 0);

  if (waitingRspNum == 0) {
    // if no more waiting rsp
    if (pParamSet->async) {
      // call async cb func
      if (pParamSet->automatic && pParamSet->tmq->commitCb) {
L
Liu Jicong 已提交
437
        pParamSet->tmq->commitCb(pParamSet->tmq, pParamSet->rspErr, pParamSet->tmq->commitCbUserParam);
438 439
      } else if (!pParamSet->automatic && pParamSet->userCb) {
        // sem post
L
Liu Jicong 已提交
440
        pParamSet->userCb(pParamSet->tmq, pParamSet->rspErr, pParamSet->userParam);
441
      }
L
Liu Jicong 已提交
442 443
    } else {
      tsem_post(&pParamSet->rspSem);
444 445
    }

L
Liu Jicong 已提交
446
#if 0
447 448
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
L
Liu Jicong 已提交
449
#endif
450 451 452 453
  }
  return 0;
}

L
Liu Jicong 已提交
454 455 456 457 458 459 460
static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pTopic, SMqCommitCbParamSet* pParamSet) {
  STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
  if (pOffset == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pOffset->val = pVg->currentOffsetNew;
461

L
Liu Jicong 已提交
462 463 464 465
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pOffset->subKey, tmq->groupId, groupLen);
  pOffset->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);
L
Liu Jicong 已提交
466

L
Liu Jicong 已提交
467 468 469 470 471 472 473 474 475 476
  int32_t len;
  int32_t code;
  tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
  if (code < 0) {
    ASSERT(0);
    return -1;
  }
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
  if (buf == NULL) return -1;
  ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
477

L
Liu Jicong 已提交
478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
  tEncodeSTqOffset(&encoder, pOffset);

  // build param
  SMqCommitCbParam2* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam2));
  pParam->params = pParamSet;
  pParam->pOffset = pOffset;

  // build send info
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
    return -1;
  }
  pMsgSendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = sizeof(SMsgHead) + len,
      .handle = NULL,
  };

S
Shengliang Guan 已提交
500 501
  tscDebug("consumer:%" PRId64 ", commit offset of %s on vgId:%d, offset is %" PRId64, tmq->consumerId, pOffset->subKey,
           pVg->vgId, pOffset->val.version);
L
Liu Jicong 已提交
502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548

  // TODO: put into cb
  pVg->committedOffsetNew = pVg->currentOffsetNew;

  pMsgSendInfo->requestId = generateRequestId();
  pMsgSendInfo->requestObjRefId = 0;
  pMsgSendInfo->param = pParam;
  pMsgSendInfo->fp = tmqCommitCb2;
  pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
  // send msg

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
  pParamSet->waitingRspNum++;
  pParamSet->totalRspNum++;
  return 0;
}

int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
  char*   topic;
  int32_t vgId;
  ASSERT(msg != NULL);
  if (TD_RES_TMQ(msg)) {
    SMqRspObj* pRspObj = (SMqRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
  } else if (TD_RES_TMQ_META(msg)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
    topic = pMetaRspObj->topic;
    vgId = pMetaRspObj->vgId;
  } else {
    return TSDB_CODE_TMQ_INVALID_MSG;
  }

  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pParamSet->tmq = tmq;
  pParamSet->automatic = 0;
  pParamSet->async = async;
  /*pParamSet->freeOffsets = 1;*/
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

L
Liu Jicong 已提交
549 550
  int32_t code = -1;

L
Liu Jicong 已提交
551 552 553 554 555 556 557 558 559 560
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(pTopic->topicName, topic) != 0) continue;
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      if (pVg->vgId != vgId) continue;

      if (pVg->currentOffsetNew.type > 0 && !tOffsetEqual(&pVg->currentOffsetNew, &pVg->committedOffsetNew)) {
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          goto FAIL;
L
Liu Jicong 已提交
561
        }
L
Liu Jicong 已提交
562
        goto HANDLE_RSP;
L
Liu Jicong 已提交
563 564
      }
    }
L
Liu Jicong 已提交
565
  }
L
Liu Jicong 已提交
566

L
Liu Jicong 已提交
567 568 569 570
HANDLE_RSP:
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
571 572 573
    return 0;
  }

L
Liu Jicong 已提交
574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
    return code;
  } else {
    code = 0;
  }

FAIL:
  if (code != 0 && async) {
    userCb(tmq, code, userParam);
  }
  return 0;
}

int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                        void* userParam) {
  int32_t code = -1;

  if (msg != NULL) {
    return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
  }

598 599 600 601 602 603 604 605
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pParamSet->tmq = tmq;
  pParamSet->automatic = automatic;
  pParamSet->async = async;
L
Liu Jicong 已提交
606
  /*pParamSet->freeOffsets = 1;*/
607 608 609 610 611 612
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
613

S
Shengliang Guan 已提交
614
    tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgNum %d", tmq->consumerId, pTopic->topicName,
L
Liu Jicong 已提交
615 616
             (int32_t)taosArrayGetSize(pTopic->vgs));

617
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
L
Liu Jicong 已提交
618 619
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);

S
Shengliang Guan 已提交
620 621
      tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgId:%d", tmq->consumerId, pTopic->topicName,
               pVg->vgId);
L
Liu Jicong 已提交
622

L
Liu Jicong 已提交
623 624 625 626
      if (pVg->currentOffsetNew.type > 0 && !tOffsetEqual(&pVg->currentOffsetNew, &pVg->committedOffsetNew)) {
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          continue;
        }
627 628 629 630
      }
    }
  }

L
Liu Jicong 已提交
631 632 633 634 635 636
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
    return 0;
  }

637 638 639 640 641 642 643 644 645 646
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
  } else {
    code = 0;
  }

  if (code != 0 && async) {
    if (automatic) {
L
Liu Jicong 已提交
647
      tmq->commitCb(tmq, code, tmq->commitCbUserParam);
648
    } else {
L
Liu Jicong 已提交
649
      userCb(tmq, code, userParam);
650 651 652
    }
  }

L
Liu Jicong 已提交
653
#if 0
654 655 656 657
  if (!async) {
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
  }
L
Liu Jicong 已提交
658
#endif
659 660 661 662

  return 0;
}

L
Liu Jicong 已提交
663 664
#if 0
int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async,
L
Liu Jicong 已提交
665 666 667 668 669 670
                       tmq_commit_cb* userCb, void* userParam) {
  SMqCMCommitOffsetReq req;
  SArray*              pOffsets = NULL;
  void*                buf = NULL;
  SMqCommitCbParam*    pParam = NULL;
  SMsgSendInfo*        sendInfo = NULL;
L
Liu Jicong 已提交
671 672
  int8_t               freeOffsets;
  int32_t              code = -1;
L
Liu Jicong 已提交
673

L
Liu Jicong 已提交
674
  if (msg == NULL) {
L
Liu Jicong 已提交
675
    freeOffsets = 1;
L
Liu Jicong 已提交
676 677 678 679 680 681 682 683 684 685 686 687 688 689
    pOffsets = taosArrayInit(0, sizeof(SMqOffset));
    for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
      for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
        SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
        SMqOffset    offset;
        tstrncpy(offset.topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
        tstrncpy(offset.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
        offset.vgId = pVg->vgId;
        offset.offset = pVg->currentOffset;
        taosArrayPush(pOffsets, &offset);
      }
    }
  } else {
L
Liu Jicong 已提交
690
    freeOffsets = 0;
L
Liu Jicong 已提交
691
    pOffsets = (SArray*)&msg->container;
L
Liu Jicong 已提交
692 693 694 695 696
  }

  req.num = (int32_t)pOffsets->size;
  req.offsets = pOffsets->pData;

L
Liu Jicong 已提交
697 698 699 700
  SEncoder encoder;

  tEncoderInit(&encoder, NULL, 0);
  code = tEncodeSMqCMCommitOffsetReq(&encoder, &req);
L
Liu Jicong 已提交
701 702 703
  if (code < 0) {
    goto END;
  }
L
Liu Jicong 已提交
704
  int32_t tlen = encoder.pos;
L
Liu Jicong 已提交
705 706
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) {
L
Liu Jicong 已提交
707
    tEncoderClear(&encoder);
L
Liu Jicong 已提交
708 709
    goto END;
  }
L
Liu Jicong 已提交
710 711
  tEncoderClear(&encoder);

L
Liu Jicong 已提交
712 713 714 715 716 717 718 719 720 721 722 723
  tEncoderInit(&encoder, buf, tlen);
  tEncodeSMqCMCommitOffsetReq(&encoder, &req);
  tEncoderClear(&encoder);

  pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
  if (pParam == NULL) {
    goto END;
  }
  pParam->tmq = tmq;
  pParam->automatic = automatic;
  pParam->async = async;
  pParam->offsets = pOffsets;
L
Liu Jicong 已提交
724
  pParam->freeOffsets = freeOffsets;
L
Liu Jicong 已提交
725 726 727 728
  pParam->userCb = userCb;
  pParam->userParam = userParam;
  if (!async) tsem_init(&pParam->rspSem, 0, 0);

729
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752
  if (sendInfo == NULL) goto END;
  sendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqCommitCb;
  sendInfo->msgType = TDMT_MND_MQ_COMMIT_OFFSET;

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

  if (!async) {
    tsem_wait(&pParam->rspSem);
    code = pParam->rspErr;
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
753 754
  } else {
    code = 0;
L
Liu Jicong 已提交
755 756 757 758 759 760 761
  }

  // avoid double free if msg is sent
  buf = NULL;

END:
  if (buf) taosMemoryFree(buf);
L
Liu Jicong 已提交
762 763
  /*if (pParam) taosMemoryFree(pParam);*/
  /*if (sendInfo) taosMemoryFree(sendInfo);*/
L
Liu Jicong 已提交
764 765 766

  if (code != 0 && async) {
    if (automatic) {
L
Liu Jicong 已提交
767
      tmq->commitCb(tmq, code, (tmq_topic_vgroup_list_t*)pOffsets, tmq->commitCbUserParam);
L
Liu Jicong 已提交
768
    } else {
L
Liu Jicong 已提交
769
      userCb(tmq, code, (tmq_topic_vgroup_list_t*)pOffsets, userParam);
L
Liu Jicong 已提交
770 771 772
    }
  }

L
Liu Jicong 已提交
773
  if (!async && freeOffsets) {
L
Liu Jicong 已提交
774 775 776 777
    taosArrayDestroy(pOffsets);
  }
  return code;
}
L
Liu Jicong 已提交
778
#endif
L
Liu Jicong 已提交
779

780
void tmqAssignAskEpTask(void* param, void* tmrId) {
L
Liu Jicong 已提交
781
  tmq_t*  tmq = (tmq_t*)param;
782
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
783
  *pTaskType = TMQ_DELAYED_TASK__ASK_EP;
L
Liu Jicong 已提交
784
  taosWriteQitem(tmq->delayedTask, pTaskType);
785
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
786 787 788 789
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
790
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
791 792
  *pTaskType = TMQ_DELAYED_TASK__COMMIT;
  taosWriteQitem(tmq->delayedTask, pTaskType);
793
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
794 795 796 797
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
798
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
799 800
  *pTaskType = TMQ_DELAYED_TASK__REPORT;
  taosWriteQitem(tmq->delayedTask, pTaskType);
801
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
802 803
}

804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
  if (pMsg && pMsg->pData) taosMemoryFree(pMsg->pData);
  return 0;
}

void tmqSendHbReq(void* param, void* tmrId) {
  // TODO replace with ref
  tmq_t*    tmq = (tmq_t*)param;
  int64_t   consumerId = tmq->consumerId;
  int32_t   epoch = tmq->epoch;
  SMqHbReq* pReq = taosMemoryMalloc(sizeof(SMqHbReq));
  if (pReq == NULL) goto OVER;
  pReq->consumerId = consumerId;
  pReq->epoch = epoch;

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pReq);
  }
  sendInfo->msgInfo = (SDataBuf){
      .pData = pReq,
      .len = sizeof(SMqHbReq),
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = NULL;
  sendInfo->fp = tmqHbCb;
  sendInfo->msgType = TDMT_MND_MQ_HB;

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

OVER:
  taosTmrReset(tmqSendHbReq, 1000, tmq, tmqMgmt.timer, &tmq->hbLiveTimer);
}

L
Liu Jicong 已提交
844 845 846 847 848 849 850 851
int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
  STaosQall* qall = taosAllocateQall();
  taosReadAllQitems(tmq->delayedTask, qall);
  while (1) {
    int8_t* pTaskType = NULL;
    taosGetQitem(qall, (void**)&pTaskType);
    if (pTaskType == NULL) break;

852
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
L
Liu Jicong 已提交
853
      tmqAskEp(tmq, true);
854
      taosTmrReset(tmqAssignAskEpTask, 1000, tmq, tmqMgmt.timer, &tmq->epTimer);
L
Liu Jicong 已提交
855
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
L
Liu Jicong 已提交
856
      tmqCommitInner2(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
L
Liu Jicong 已提交
857 858 859 860 861
      taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer);
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    } else {
      ASSERT(0);
    }
L
Liu Jicong 已提交
862
    taosFreeQitem(pTaskType);
L
Liu Jicong 已提交
863 864 865 866 867
  }
  taosFreeQall(qall);
  return 0;
}

L
Liu Jicong 已提交
868
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
869
  SMqRspWrapper* msg = NULL;
L
Liu Jicong 已提交
870 871 872 873 874 875 876 877
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }

L
fix  
Liu Jicong 已提交
878
  msg = NULL;
L
Liu Jicong 已提交
879 880 881 882 883 884 885 886 887 888
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }
}

D
dapan1121 已提交
889
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
890 891
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
L
Liu Jicong 已提交
892
  /*tmq_t* tmq = pParam->tmq;*/
L
Liu Jicong 已提交
893 894 895
  tsem_post(&pParam->rspSem);
  return 0;
}
896

L
Liu Jicong 已提交
897
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
X
Xiaoyu Wang 已提交
898 899 900 901
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
902
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
903
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
904
  }
L
Liu Jicong 已提交
905
  return 0;
X
Xiaoyu Wang 已提交
906 907
}

L
Liu Jicong 已提交
908 909 910
int32_t tmq_unsubscribe(tmq_t* tmq) {
  tmq_list_t* lst = tmq_list_new();
  int32_t     rsp = tmq_subscribe(tmq, lst);
L
Liu Jicong 已提交
911 912
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
913 914
}

L
Liu Jicong 已提交
915
#if 0
916
tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
wafwerar's avatar
wafwerar 已提交
917
  tmq_t* pTmq = taosMemoryCalloc(sizeof(tmq_t), 1);
918 919 920 921 922 923
  if (pTmq == NULL) {
    return NULL;
  }
  pTmq->pTscObj = (STscObj*)conn;
  pTmq->status = 0;
  pTmq->pollCnt = 0;
L
Liu Jicong 已提交
924
  pTmq->epoch = 0;
L
fix txn  
Liu Jicong 已提交
925
  pTmq->epStatus = 0;
L
temp  
Liu Jicong 已提交
926
  pTmq->epSkipCnt = 0;
L
Liu Jicong 已提交
927
  // set conf
928 929
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
L
Liu Jicong 已提交
930
  pTmq->autoCommit = conf->autoCommit;
931
  pTmq->commit_cb = conf->commit_cb;
L
Liu Jicong 已提交
932
  pTmq->resetOffsetCfg = conf->resetOffset;
L
Liu Jicong 已提交
933

L
Liu Jicong 已提交
934 935 936 937 938 939 940 941 942 943
  pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  if (pTmq->clientTopics == NULL) {
    taosMemoryFree(pTmq);
    return NULL;
  }

  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();

L
Liu Jicong 已提交
944 945
  tsem_init(&pTmq->rspSem, 0, 0);

L
Liu Jicong 已提交
946 947
  return pTmq;
}
L
Liu Jicong 已提交
948
#endif
L
Liu Jicong 已提交
949

L
Liu Jicong 已提交
950
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
L
Liu Jicong 已提交
951 952 953 954 955 956 957 958 959 960 961
  // init timer
  int8_t inited = atomic_val_compare_exchange_8(&tmqMgmt.inited, 0, 1);
  if (inited == 0) {
    tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
    if (tmqMgmt.timer == NULL) {
      atomic_store_8(&tmqMgmt.inited, 0);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
  }

L
Liu Jicong 已提交
962 963
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
L
Liu Jicong 已提交
964 965
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
L
Liu Jicong 已提交
966 967
    return NULL;
  }
L
Liu Jicong 已提交
968

L
Liu Jicong 已提交
969 970 971 972 973
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

  ASSERT(user);
  ASSERT(pass);
L
Liu Jicong 已提交
974
  ASSERT(conf->groupId[0]);
L
Liu Jicong 已提交
975

L
Liu Jicong 已提交
976 977 978 979 980 981
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
  pTmq->delayedTask = taosOpenQueue();

  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
L
Liu Jicong 已提交
982 983
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
L
Liu Jicong 已提交
984 985
    goto FAIL;
  }
L
Liu Jicong 已提交
986

L
Liu Jicong 已提交
987 988
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
989 990
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
991 992
  /*pTmq->epStatus = 0;*/
  /*pTmq->epSkipCnt = 0;*/
L
Liu Jicong 已提交
993

L
Liu Jicong 已提交
994 995 996
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
997
  pTmq->withTbName = conf->withTbName;
998
  pTmq->useSnapshot = conf->ssEnable;
L
Liu Jicong 已提交
999
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
1000
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
1001 1002
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
1003 1004
  pTmq->resetOffsetCfg = conf->resetOffset;

1005 1006
  pTmq->hbBgEnable = conf->hbBgEnable;

L
Liu Jicong 已提交
1007
  // assign consumerId
L
Liu Jicong 已提交
1008
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
1009

L
Liu Jicong 已提交
1010 1011
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
L
Liu Jicong 已提交
1012
    tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
L
Liu Jicong 已提交
1013 1014
    goto FAIL;
  }
L
Liu Jicong 已提交
1015

L
Liu Jicong 已提交
1016 1017 1018
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
L
Liu Jicong 已提交
1019
    tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
L
Liu Jicong 已提交
1020 1021 1022
    tsem_destroy(&pTmq->rspSem);
    goto FAIL;
  }
L
Liu Jicong 已提交
1023

1024 1025 1026 1027
  if (pTmq->hbBgEnable) {
    pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pTmq, tmqMgmt.timer);
  }

L
Liu Jicong 已提交
1028 1029
  tscInfo("consumer %ld is setup, consumer group %s", pTmq->consumerId, pTmq->groupId);

1030
  return pTmq;
L
Liu Jicong 已提交
1031 1032 1033 1034 1035 1036 1037 1038

FAIL:
  if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
  if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
  if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
  if (pTmq->qall) taosFreeQall(pTmq->qall);
  taosMemoryFree(pTmq);
  return NULL;
1039 1040
}

L
Liu Jicong 已提交
1041 1042
#if 0
int32_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
L
Liu Jicong 已提交
1043
  return tmqCommitInner2(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam);
L
Liu Jicong 已提交
1044
}
L
Liu Jicong 已提交
1045
#endif
L
Liu Jicong 已提交
1046

L
Liu Jicong 已提交
1047
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
L
Liu Jicong 已提交
1048 1049 1050 1051 1052
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
  SCMSubscribeReq req = {0};
  int32_t         code = -1;
1053 1054

  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
1055
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
1056
  req.topicNames = taosArrayInit(sz, sizeof(void*));
L
Liu Jicong 已提交
1057
  if (req.topicNames == NULL) goto FAIL;
1058

L
Liu Jicong 已提交
1059 1060
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
1061 1062

    SName name = {0};
L
Liu Jicong 已提交
1063
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
1064

L
Liu Jicong 已提交
1065 1066 1067
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
1068
    }
L
Liu Jicong 已提交
1069
    tNameExtractFullName(&name, topicFName);
1070

L
Liu Jicong 已提交
1071 1072 1073
    tscDebug("subscribe topic: %s", topicFName);

    taosArrayPush(req.topicNames, &topicFName);
1074 1075
  }

L
Liu Jicong 已提交
1076 1077 1078 1079
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) goto FAIL;

1080 1081 1082
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

1083
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1084
  if (sendInfo == NULL) goto FAIL;
1085

X
Xiaoyu Wang 已提交
1086
  SMqSubscribeCbParam param = {
L
Liu Jicong 已提交
1087
      .rspErr = 0,
X
Xiaoyu Wang 已提交
1088 1089
      .tmq = tmq,
  };
L
Liu Jicong 已提交
1090

L
Liu Jicong 已提交
1091 1092 1093
  if (tsem_init(&param.rspSem, 0, 0) != 0) goto FAIL;

  sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1094 1095 1096 1097
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
1098

L
Liu Jicong 已提交
1099 1100
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1101 1102
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
1103 1104
  sendInfo->msgType = TDMT_MND_SUBSCRIBE;

1105 1106 1107 1108 1109
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
1110 1111 1112
  // avoid double free if msg is sent
  buf = NULL;

L
Liu Jicong 已提交
1113 1114
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
1115

L
Liu Jicong 已提交
1116 1117 1118
  code = param.rspErr;
  if (code != 0) goto FAIL;

L
Liu Jicong 已提交
1119
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
L
fix  
Liu Jicong 已提交
1120
    tscDebug("consumer not ready, retry");
L
Liu Jicong 已提交
1121 1122
    taosMsleep(500);
  }
1123

1124 1125 1126
  // init ep timer
  if (tmq->epTimer == NULL) {
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, tmq, tmqMgmt.timer);
1127
  }
L
Liu Jicong 已提交
1128 1129

  // init auto commit timer
1130
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
L
Liu Jicong 已提交
1131 1132 1133
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer);
  }

L
Liu Jicong 已提交
1134 1135 1136
  code = 0;
FAIL:
  if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
1137
  if (code != 0 && buf) {
L
Liu Jicong 已提交
1138 1139 1140
    taosMemoryFree(buf);
  }
  return code;
1141 1142
}

L
Liu Jicong 已提交
1143
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
1144
  //
1145
  conf->commitCb = cb;
L
Liu Jicong 已提交
1146
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
1147
}
1148

L
Liu Jicong 已提交
1149
#if 0
L
Liu Jicong 已提交
1150 1151
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
  if (tmq_message == NULL) return 0;
L
Liu Jicong 已提交
1152
  SMqPollRsp* pRsp = &tmq_message->msg;
L
Liu Jicong 已提交
1153 1154
  return pRsp->skipLogNum;
}
L
Liu Jicong 已提交
1155
#endif
L
Liu Jicong 已提交
1156

D
dapan1121 已提交
1157
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1158 1159
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
1160
  SMqClientTopic* pTopic = pParam->pTopic;
X
Xiaoyu Wang 已提交
1161
  tmq_t*          tmq = pParam->tmq;
L
Liu Jicong 已提交
1162 1163 1164
  int32_t         vgId = pParam->vgId;
  int32_t         epoch = pParam->epoch;
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
1165
  if (code != 0) {
S
Shengliang Guan 已提交
1166
    tscWarn("msg discard from vgId:%d, epoch %d, code:%x", vgId, epoch, code);
L
Liu Jicong 已提交
1167
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1168 1169 1170 1171
    if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
      if (pRspWrapper == NULL) {
        taosMemoryFree(pMsg->pData);
S
Shengliang Guan 已提交
1172
        tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
Liu Jicong 已提交
1173 1174 1175 1176 1177 1178 1179 1180
        goto CREATE_MSG_FAIL;
      }
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      /*pRspWrapper->vgHandle = pVg;*/
      /*pRspWrapper->topicHandle = pTopic;*/
      taosWriteQitem(tmq->mqueue, pRspWrapper);
      tsem_post(&tmq->rspSem);
    }
L
fix txn  
Liu Jicong 已提交
1181
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1182 1183
  }

X
Xiaoyu Wang 已提交
1184 1185 1186
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
1187
    // do not write into queue since updating epoch reset
S
Shengliang Guan 已提交
1188
    tscWarn("msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d", vgId, msgEpoch,
L
Liu Jicong 已提交
1189
            tmqEpoch);
1190
    tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1191
    taosMemoryFree(pMsg->pData);
X
Xiaoyu Wang 已提交
1192 1193 1194 1195
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
S
Shengliang Guan 已提交
1196
    tscWarn("mismatch rsp from vgId:%d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
X
Xiaoyu Wang 已提交
1197 1198
  }

L
Liu Jicong 已提交
1199 1200 1201
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

1202
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1203
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1204
    taosMemoryFree(pMsg->pData);
S
Shengliang Guan 已提交
1205
    tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
fix txn  
Liu Jicong 已提交
1206
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1207
  }
L
Liu Jicong 已提交
1208

L
Liu Jicong 已提交
1209
  pRspWrapper->tmqRspType = rspType;
L
Liu Jicong 已提交
1210 1211
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
1212

L
Liu Jicong 已提交
1213
  if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1214 1215 1216
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
L
Liu Jicong 已提交
1217
    memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1218 1219 1220
  } else {
    ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP);
    tDecodeSMqMetaRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->metaRsp);
L
Liu Jicong 已提交
1221
    memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1222
  }
L
Liu Jicong 已提交
1223

L
Liu Jicong 已提交
1224
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1225

S
Shengliang Guan 已提交
1226 1227 1228
  tscDebug("consumer:%" PRId64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d",
           tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version,
           rspType);
L
fix  
Liu Jicong 已提交
1229

L
Liu Jicong 已提交
1230
  taosWriteQitem(tmq->mqueue, pRspWrapper);
1231
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1232

L
Liu Jicong 已提交
1233
  return 0;
L
fix txn  
Liu Jicong 已提交
1234
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1235
  if (epoch == tmq->epoch) {
L
Liu Jicong 已提交
1236 1237
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
1238
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1239
  return -1;
1240 1241
}

1242 1243 1244 1245 1246
bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
  bool set = false;

  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
S
Shengliang Guan 已提交
1247
  tscDebug("consumer:%" PRId64 ", update ep epoch %d to epoch %d, topic num:%d", tmq->consumerId, tmq->epoch, epoch,
1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265
           topicNumGet);

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
S
Shengliang Guan 已提交
1266
      tscDebug("consumer:%" PRId64 ", new vg num: %d", tmq->consumerId, vgNumCur);
1267 1268 1269
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
        sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
L
Liu Jicong 已提交
1270 1271 1272 1273
        char buf[80];
        tFormatOffset(buf, 80, &pVgCur->currentOffsetNew);
        tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch,
                 pVgCur->vgId, vgKey, buf);
L
Liu Jicong 已提交
1274
        taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffsetNew, sizeof(STqOffsetVal));
1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
    topic.schema = pTopicEp->schema;
    topic.topicName = strdup(pTopicEp->topic);
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);

S
Shengliang Guan 已提交
1286
    tscDebug("consumer:%" PRId64 ", update topic: %s", tmq->consumerId, topic.topicName);
1287 1288 1289 1290 1291 1292

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
L
Liu Jicong 已提交
1293 1294
      STqOffsetVal* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      STqOffsetVal  offsetNew = {.type = tmq->resetOffsetCfg};
1295
      if (pOffset != NULL) {
L
Liu Jicong 已提交
1296
        offsetNew = *pOffset;
1297 1298
      }

S
Shengliang Guan 已提交
1299 1300
      /*tscDebug("consumer:%" PRId64 ", (epoch %d) offset of vgId:%d updated to %" PRId64 ", vgKey is %s",
       * tmq->consumerId, epoch,*/
L
Liu Jicong 已提交
1301
      /*pVgEp->vgId, offset, vgKey);*/
1302 1303
      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1304
          .currentOffsetNew = offsetNew,
1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
          .vgSkipCnt = 0,
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
    taosArrayPush(newTopics, &topic);
  }
  if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
  taosHashCleanup(pHash);
  tmq->clientTopics = newTopics;

  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);

  atomic_store_32(&tmq->epoch, epoch);
  return set;
}

L
Liu Jicong 已提交
1328
#if 0
L
Liu Jicong 已提交
1329
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
L
Liu Jicong 已提交
1330
  /*printf("call update ep %d\n", epoch);*/
X
Xiaoyu Wang 已提交
1331
  bool    set = false;
L
Liu Jicong 已提交
1332 1333
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
S
Shengliang Guan 已提交
1334
  tscDebug("consumer:%" PRId64 ", update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch,
L
Liu Jicong 已提交
1335
           topicNumGet);
L
Liu Jicong 已提交
1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347
  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }
  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }

  // find topic, build hash
  for (int32_t i = 0; i < topicNumGet; i++) {
X
Xiaoyu Wang 已提交
1348 1349
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
L
Liu Jicong 已提交
1350
    topic.schema = pTopicEp->schema;
L
Liu Jicong 已提交
1351
    taosHashClear(pHash);
X
Xiaoyu Wang 已提交
1352
    topic.topicName = strdup(pTopicEp->topic);
L
Liu Jicong 已提交
1353
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1354

S
Shengliang Guan 已提交
1355
    tscDebug("consumer:%" PRId64 ", update topic: %s", tmq->consumerId, topic.topicName);
L
Liu Jicong 已提交
1356 1357 1358 1359 1360 1361
    int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
    for (int32_t j = 0; j < topicNumCur; j++) {
      // find old topic
      SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
      if (pTopicCur->vgs && strcmp(pTopicCur->topicName, pTopicEp->topic) == 0) {
        int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
S
Shengliang Guan 已提交
1362
        tscDebug("consumer:%" PRId64 ", new vg num: %d", tmq->consumerId, vgNumCur);
L
Liu Jicong 已提交
1363 1364 1365 1366
        if (vgNumCur == 0) break;
        for (int32_t k = 0; k < vgNumCur; k++) {
          SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k);
          sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId);
S
Shengliang Guan 已提交
1367
          tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d build %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey);
L
Liu Jicong 已提交
1368 1369 1370 1371 1372 1373 1374 1375 1376
          taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
        }
        break;
      }
    }

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
X
Xiaoyu Wang 已提交
1377
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
L
Liu Jicong 已提交
1378 1379 1380
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
      int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      int64_t  offset = pVgEp->offset;
S
Shengliang Guan 已提交
1381
      tscDebug("consumer:%" PRId64 ", (epoch %d) original offset of vgId:%d is %" PRId64, tmq->consumerId, epoch, pVgEp->vgId, offset);
L
Liu Jicong 已提交
1382 1383
      if (pOffset != NULL) {
        offset = *pOffset;
S
Shengliang Guan 已提交
1384
        tscDebug("consumer:%" PRId64 ", (epoch %d) receive offset of vgId:%d, full key is %s", tmq->consumerId, epoch, pVgEp->vgId,
1385
                 vgKey);
L
Liu Jicong 已提交
1386
      }
S
Shengliang Guan 已提交
1387
      tscDebug("consumer:%" PRId64 ", (epoch %d) offset of vgId:%d updated to %" PRId64, tmq->consumerId, epoch, pVgEp->vgId, offset);
X
Xiaoyu Wang 已提交
1388 1389
      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1390
          .currentOffset = offset,
X
Xiaoyu Wang 已提交
1391 1392 1393
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
L
Liu Jicong 已提交
1394
          .vgSkipCnt = 0,
X
Xiaoyu Wang 已提交
1395 1396 1397 1398
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
L
Liu Jicong 已提交
1399
    taosArrayPush(newTopics, &topic);
X
Xiaoyu Wang 已提交
1400
  }
L
Liu Jicong 已提交
1401
  if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
L
Liu Jicong 已提交
1402
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
1403
  tmq->clientTopics = newTopics;
1404

1405 1406 1407 1408
  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
1409

X
Xiaoyu Wang 已提交
1410 1411 1412
  atomic_store_32(&tmq->epoch, epoch);
  return set;
}
L
Liu Jicong 已提交
1413
#endif
X
Xiaoyu Wang 已提交
1414

D
dapan1121 已提交
1415
int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
1416
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1417
  tmq_t*           tmq = pParam->tmq;
L
Liu Jicong 已提交
1418
  int8_t           async = pParam->async;
L
Liu Jicong 已提交
1419
  pParam->code = code;
1420
  if (code != 0) {
S
Shengliang Guan 已提交
1421
    tscError("consumer:%" PRId64 ", get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->async);
L
Liu Jicong 已提交
1422
    goto END;
1423
  }
L
Liu Jicong 已提交
1424

L
Liu Jicong 已提交
1425
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1426
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1427
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1428 1429
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
S
Shengliang Guan 已提交
1430
  tscDebug("consumer:%" PRId64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1431 1432
  if (head->epoch <= epoch) {
    goto END;
1433
  }
L
Liu Jicong 已提交
1434

L
Liu Jicong 已提交
1435
  if (!async) {
L
Liu Jicong 已提交
1436 1437
    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
S
Shengliang Guan 已提交
1438 1439
    /*printf("rsp epoch %" PRId64 " sz %" PRId64 "\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %" PRId64 " sz %" PRId64 "\n", tmq->epoch, tmq->clientTopics->size);*/
L
Liu Jicong 已提交
1440
    tmqUpdateEp2(tmq, head->epoch, &rsp);
L
Liu Jicong 已提交
1441
    tDeleteSMqAskEpRsp(&rsp);
X
Xiaoyu Wang 已提交
1442
  } else {
1443
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1444
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1445
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1446 1447
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1448
    }
L
Liu Jicong 已提交
1449 1450 1451
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1452
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1453

L
Liu Jicong 已提交
1454
    taosWriteQitem(tmq->mqueue, pWrapper);
1455
    tsem_post(&tmq->rspSem);
1456
  }
L
Liu Jicong 已提交
1457 1458

END:
L
Liu Jicong 已提交
1459
  /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1460
  if (!async) {
L
Liu Jicong 已提交
1461
    tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
1462 1463
  } else {
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
1464 1465
  }
  return code;
1466 1467
}

L
Liu Jicong 已提交
1468
int32_t tmqAskEp(tmq_t* tmq, bool async) {
L
Liu Jicong 已提交
1469
  int32_t code = 0;
L
Liu Jicong 已提交
1470
#if 0
L
Liu Jicong 已提交
1471
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
L
fix txn  
Liu Jicong 已提交
1472
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
1473
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
S
Shengliang Guan 已提交
1474
    tscTrace("consumer:%" PRId64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
1475
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
1476
  }
L
temp  
Liu Jicong 已提交
1477
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
1478
#endif
L
Liu Jicong 已提交
1479
  int32_t      tlen = sizeof(SMqAskEpReq);
L
Liu Jicong 已提交
1480
  SMqAskEpReq* req = taosMemoryCalloc(1, tlen);
L
Liu Jicong 已提交
1481
  if (req == NULL) {
L
Liu Jicong 已提交
1482
    tscError("failed to malloc get subscribe ep buf");
L
Liu Jicong 已提交
1483
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1484
    return -1;
L
Liu Jicong 已提交
1485
  }
L
Liu Jicong 已提交
1486 1487 1488
  req->consumerId = htobe64(tmq->consumerId);
  req->epoch = htonl(tmq->epoch);
  strcpy(req->cgroup, tmq->groupId);
1489

L
Liu Jicong 已提交
1490
  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
1491 1492
  if (pParam == NULL) {
    tscError("failed to malloc subscribe param");
wafwerar's avatar
wafwerar 已提交
1493
    taosMemoryFree(req);
L
Liu Jicong 已提交
1494
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1495
    return -1;
L
Liu Jicong 已提交
1496 1497
  }
  pParam->tmq = tmq;
L
Liu Jicong 已提交
1498
  pParam->async = async;
X
Xiaoyu Wang 已提交
1499
  tsem_init(&pParam->rspSem, 0, 0);
1500

1501
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1502 1503
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1504 1505
    taosMemoryFree(pParam);
    taosMemoryFree(req);
L
Liu Jicong 已提交
1506
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1507 1508 1509 1510 1511 1512 1513 1514 1515 1516
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
      .pData = req,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
L
Liu Jicong 已提交
1517 1518 1519
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1520
  sendInfo->msgType = TDMT_MND_MQ_ASK_EP;
1521

L
Liu Jicong 已提交
1522 1523
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

S
Shengliang Guan 已提交
1524
  tscDebug("consumer:%" PRId64 ", ask ep", tmq->consumerId);
L
add log  
Liu Jicong 已提交
1525

L
Liu Jicong 已提交
1526 1527
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1528

L
Liu Jicong 已提交
1529
  if (!async) {
L
Liu Jicong 已提交
1530 1531 1532 1533 1534
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }
  return code;
1535 1536
}

L
Liu Jicong 已提交
1537 1538
#if 0
int32_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
L
Liu Jicong 已提交
1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551
  const SMqOffset* pOffset = &offset->offset;
  if (strcmp(pOffset->cgroup, tmq->groupId) != 0) {
    return TMQ_RESP_ERR__FAIL;
  }
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* clientTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(clientTopic->topicName, pOffset->topicName) == 0) {
      int32_t vgSz = taosArrayGetSize(clientTopic->vgs);
      for (int32_t j = 0; j < vgSz; j++) {
        SMqClientVg* pVg = taosArrayGet(clientTopic->vgs, j);
        if (pVg->vgId == pOffset->vgId) {
          pVg->currentOffset = pOffset->offset;
L
Liu Jicong 已提交
1552
          tmqClearUnhandleMsg(tmq);
L
Liu Jicong 已提交
1553 1554 1555 1556 1557 1558 1559
          return TMQ_RESP_ERR__SUCCESS;
        }
      }
    }
  }
  return TMQ_RESP_ERR__FAIL;
}
L
Liu Jicong 已提交
1560
#endif
L
Liu Jicong 已提交
1561

1562
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1563 1564 1565 1566 1567 1568 1569 1570 1571 1572
  /*int64_t reqOffset;*/
  /*if (pVg->currentOffset >= 0) {*/
  /*reqOffset = pVg->currentOffset;*/
  /*} else {*/
  /*if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {*/
  /*tscError("unable to poll since no committed offset but reset offset is set to none");*/
  /*return NULL;*/
  /*}*/
  /*reqOffset = tmq->resetOffsetCfg;*/
  /*}*/
L
Liu Jicong 已提交
1573

L
Liu Jicong 已提交
1574
  SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq));
1575 1576 1577
  if (pReq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
1578

L
Liu Jicong 已提交
1579 1580 1581
  /*strcpy(pReq->topic, pTopic->topicName);*/
  /*strcpy(pReq->cgroup, tmq->groupId);*/

L
Liu Jicong 已提交
1582 1583 1584 1585
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1586

1587
  pReq->withTbName = tmq->withTbName;
1588
  pReq->timeout = timeout;
L
Liu Jicong 已提交
1589
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1590
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1591 1592
  /*pReq->currentOffset = reqOffset;*/
  pReq->reqOffset = pVg->currentOffsetNew;
L
Liu Jicong 已提交
1593
  pReq->reqId = generateRequestId();
1594

L
Liu Jicong 已提交
1595 1596
  pReq->useSnapshot = tmq->useSnapshot;

1597
  pReq->head.vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
1598
  pReq->head.contLen = htonl(sizeof(SMqPollReq));
1599 1600 1601
  return pReq;
}

L
Liu Jicong 已提交
1602 1603
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
L
Liu Jicong 已提交
1604
  pRspObj->resType = RES_TYPE__TMQ_META;
L
Liu Jicong 已提交
1605 1606 1607 1608 1609 1610 1611 1612
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

L
Liu Jicong 已提交
1613 1614 1615
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1616 1617
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1618
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1619
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1620
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
L
Liu Jicong 已提交
1621

L
Liu Jicong 已提交
1622 1623
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
L
Liu Jicong 已提交
1624
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1625 1626
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1627

L
Liu Jicong 已提交
1628
  return pRspObj;
X
Xiaoyu Wang 已提交
1629 1630
}

1631
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1632
  /*tscDebug("call poll");*/
X
Xiaoyu Wang 已提交
1633 1634 1635 1636 1637 1638
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
      if (vgStatus != TMQ_VG_STATUS__IDLE) {
L
Liu Jicong 已提交
1639
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
L
Liu Jicong 已提交
1640 1641
        tscTrace("consumer:%" PRId64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId,
                 vgSkipCnt);
X
Xiaoyu Wang 已提交
1642
        continue;
L
Liu Jicong 已提交
1643
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1644 1645 1646 1647
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
S
Shengliang Guan 已提交
1648
        tscDebug("consumer:%" PRId64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId);
L
temp  
Liu Jicong 已提交
1649 1650
        }
#endif
X
Xiaoyu Wang 已提交
1651
      }
L
Liu Jicong 已提交
1652
      atomic_store_32(&pVg->vgSkipCnt, 0);
1653
      SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, timeout, pTopic, pVg);
X
Xiaoyu Wang 已提交
1654 1655
      if (pReq == NULL) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1656
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1657 1658
        return -1;
      }
wafwerar's avatar
wafwerar 已提交
1659
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1660
      if (pParam == NULL) {
wafwerar's avatar
wafwerar 已提交
1661
        taosMemoryFree(pReq);
X
Xiaoyu Wang 已提交
1662
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1663
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1664 1665
        return -1;
      }
L
Liu Jicong 已提交
1666 1667
      pParam->tmq = tmq;
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1668
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1669
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1670 1671
      pParam->epoch = tmq->epoch;

1672
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1673
      if (sendInfo == NULL) {
wafwerar's avatar
wafwerar 已提交
1674 1675
        taosMemoryFree(pReq);
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1676
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1677
        tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1678 1679 1680 1681
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1682
          .pData = pReq,
L
Liu Jicong 已提交
1683
          .len = sizeof(SMqPollReq),
X
Xiaoyu Wang 已提交
1684 1685
          .handle = NULL,
      };
L
Liu Jicong 已提交
1686
      sendInfo->requestId = pReq->reqId;
X
Xiaoyu Wang 已提交
1687
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1688
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1689
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1690
      sendInfo->msgType = TDMT_VND_CONSUME;
X
Xiaoyu Wang 已提交
1691 1692

      int64_t transporterId = 0;
L
fix  
Liu Jicong 已提交
1693
      /*printf("send poll\n");*/
L
Liu Jicong 已提交
1694

1695 1696
      char offsetFormatBuf[80];
      tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffsetNew);
L
Liu Jicong 已提交
1697 1698
      tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64,
               tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId);
S
Shengliang Guan 已提交
1699
      /*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/
X
Xiaoyu Wang 已提交
1700 1701 1702 1703 1704 1705 1706 1707
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
  return 0;
}

L
Liu Jicong 已提交
1708 1709
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1710
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1711 1712
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1713
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
L
Liu Jicong 已提交
1714
      tmqUpdateEp2(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1715
      /*tmqClearUnhandleMsg(tmq);*/
X
Xiaoyu Wang 已提交
1716 1717 1718 1719 1720 1721 1722 1723 1724 1725
      *pReset = true;
    } else {
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
1726
void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
X
Xiaoyu Wang 已提交
1727
  while (1) {
L
Liu Jicong 已提交
1728 1729 1730
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1731
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1732 1733
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
      if (rspWrapper == NULL) return NULL;
X
Xiaoyu Wang 已提交
1734 1735
    }

L
Liu Jicong 已提交
1736 1737 1738 1739 1740
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(rspWrapper);
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
      return NULL;
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1741
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1742
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
1743
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1744
      if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1745
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
L
Liu Jicong 已提交
1746 1747
        /*printf("vgId:%d offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
         * rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1748
        pVg->currentOffsetNew = pollRspWrapper->dataRsp.rspOffset;
X
Xiaoyu Wang 已提交
1749
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1750
        if (pollRspWrapper->dataRsp.blockNum == 0) {
L
Liu Jicong 已提交
1751 1752
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1753 1754
          continue;
        }
L
Liu Jicong 已提交
1755
        // build rsp
L
Liu Jicong 已提交
1756
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1757
        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1758
        return pRsp;
X
Xiaoyu Wang 已提交
1759
      } else {
L
Liu Jicong 已提交
1760 1761 1762 1763 1764 1765 1766
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
                 pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
        taosFreeQitem(pollRspWrapper);
      }
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1767
      if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1768
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
L
Liu Jicong 已提交
1769 1770
        /*printf("vgId:%d offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
         * rspMsg->msg.rspOffset);*/
wmmhello's avatar
wmmhello 已提交
1771 1772
        pVg->currentOffsetNew.version = pollRspWrapper->metaRsp.rspOffset;
        pVg->currentOffsetNew.type = TMQ_OFFSET__LOG;
L
Liu Jicong 已提交
1773 1774
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // build rsp
L
Liu Jicong 已提交
1775
        SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1776 1777 1778 1779
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
L
Liu Jicong 已提交
1780
                 pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1781
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1782 1783
      }
    } else {
L
fix  
Liu Jicong 已提交
1784
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1785
      bool reset = false;
L
Liu Jicong 已提交
1786 1787
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1788
      if (pollIfReset && reset) {
S
Shengliang Guan 已提交
1789
        tscDebug("consumer:%" PRId64 ", reset and repoll", tmq->consumerId);
1790
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
1791 1792 1793 1794 1795
      }
    }
  }
}

1796
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1797
  /*tscDebug("call poll1");*/
L
Liu Jicong 已提交
1798 1799
  void*   rspObj;
  int64_t startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1800

1801 1802 1803
#if 0
  tmqHandleAllDelayedTask(tmq);
  tmqPollImpl(tmq, timeout);
1804
  rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1805 1806
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1807
  }
1808
#endif
X
Xiaoyu Wang 已提交
1809

L
Liu Jicong 已提交
1810
  // in no topic status also need process delayed task
L
Liu Jicong 已提交
1811
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1812 1813 1814
    return NULL;
  }

X
Xiaoyu Wang 已提交
1815
  while (1) {
L
Liu Jicong 已提交
1816
    tmqHandleAllDelayedTask(tmq);
1817
    if (tmqPollImpl(tmq, timeout) < 0) return NULL;
L
Liu Jicong 已提交
1818

1819
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1820 1821
    if (rspObj) {
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
1822 1823
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      return NULL;
X
Xiaoyu Wang 已提交
1824
    }
1825
    if (timeout != -1) {
X
Xiaoyu Wang 已提交
1826
      int64_t endTime = taosGetTimestampMs();
1827
      int64_t leftTime = endTime - startTime;
1828
      if (leftTime > timeout) {
S
Shengliang Guan 已提交
1829
        tscDebug("consumer:%" PRId64 ", (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
X
Xiaoyu Wang 已提交
1830 1831
        return NULL;
      }
1832
      tsem_timewait(&tmq->rspSem, leftTime * 1000);
L
Liu Jicong 已提交
1833 1834 1835
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
      tsem_timewait(&tmq->rspSem, 500 * 1000);
X
Xiaoyu Wang 已提交
1836 1837 1838 1839
    }
  }
}

L
Liu Jicong 已提交
1840
int32_t tmq_consumer_close(tmq_t* tmq) {
1841
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
L
Liu Jicong 已提交
1842 1843
    int32_t rsp = tmq_commit_sync(tmq, NULL);
    if (rsp != 0) {
L
Liu Jicong 已提交
1844
      return rsp;
1845 1846 1847 1848
    }

    tmq_list_t* lst = tmq_list_new();
    rsp = tmq_subscribe(tmq, lst);
1849
    tmq_list_destroy(lst);
1850

L
Liu Jicong 已提交
1851
    if (rsp != 0) {
L
Liu Jicong 已提交
1852
      return rsp;
1853
    }
L
Liu Jicong 已提交
1854
  }
1855
  // TODO: free resources
L
Liu Jicong 已提交
1856
  return 0;
1857
}
L
Liu Jicong 已提交
1858

L
Liu Jicong 已提交
1859 1860
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
1861
    return "success";
L
Liu Jicong 已提交
1862
  } else if (err == -1) {
L
Liu Jicong 已提交
1863 1864 1865
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
1866 1867
  }
}
L
Liu Jicong 已提交
1868

L
Liu Jicong 已提交
1869 1870 1871 1872 1873 1874 1875 1876 1877 1878
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
    return TMQ_RES_TABLE_META;
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
1879
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
1880 1881
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
1882
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1883 1884 1885
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1886 1887 1888 1889 1890
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1891 1892 1893 1894
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1895 1896 1897
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1898 1899 1900 1901 1902
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1903 1904 1905 1906
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
1907 1908 1909
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
L
Liu Jicong 已提交
1910 1911 1912 1913
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
1914 1915 1916 1917 1918 1919 1920 1921

const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
1922
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
L
Liu Jicong 已提交
1923 1924 1925
  }
  return NULL;
}
1926

1927
tmq_raw_data* tmq_get_raw_meta(TAOS_RES* res) {
L
Liu Jicong 已提交
1928
  if (TD_RES_TMQ_META(res)) {
1929
    tmq_raw_data*  raw = taosMemoryCalloc(1, sizeof(tmq_raw_data));
L
Liu Jicong 已提交
1930
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
wmmhello's avatar
wmmhello 已提交
1931 1932 1933 1934
    raw->raw_meta = pMetaRspObj->metaRsp.metaRsp;
    raw->raw_meta_len = pMetaRspObj->metaRsp.metaRspLen;
    raw->raw_meta_type = pMetaRspObj->metaRsp.resMsgType;
    return raw;
L
Liu Jicong 已提交
1935
  }
wmmhello's avatar
wmmhello 已提交
1936
  return NULL;
L
Liu Jicong 已提交
1937 1938
}

1939 1940
static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id,
                                  int8_t t) {
wmmhello's avatar
wmmhello 已提交
1941 1942 1943 1944 1945 1946 1947
  char*  string = NULL;
  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    return string;
  }
  cJSON* type = cJSON_CreateString("create");
  cJSON_AddItemToObject(json, "type", type);
wmmhello's avatar
wmmhello 已提交
1948

1949 1950 1951 1952
  //  char uid[32] = {0};
  //  sprintf(uid, "%"PRIi64, id);
  //  cJSON* id_ = cJSON_CreateString(uid);
  //  cJSON_AddItemToObject(json, "id", id_);
wmmhello's avatar
wmmhello 已提交
1953 1954 1955 1956
  cJSON* tableName = cJSON_CreateString(name);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super");
  cJSON_AddItemToObject(json, "tableType", tableType);
1957 1958
  //  cJSON* version = cJSON_CreateNumber(1);
  //  cJSON_AddItemToObject(json, "version", version);
wmmhello's avatar
wmmhello 已提交
1959 1960

  cJSON* columns = cJSON_CreateArray();
1961 1962 1963 1964
  for (int i = 0; i < schemaRow->nCols; i++) {
    cJSON*   column = cJSON_CreateObject();
    SSchema* s = schemaRow->pSchema + i;
    cJSON*   cname = cJSON_CreateString(s->name);
wmmhello's avatar
wmmhello 已提交
1965 1966 1967
    cJSON_AddItemToObject(column, "name", cname);
    cJSON* ctype = cJSON_CreateNumber(s->type);
    cJSON_AddItemToObject(column, "type", ctype);
1968
    if (s->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
1969
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
1970
      cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1971
      cJSON_AddItemToObject(column, "length", cbytes);
1972 1973 1974
    } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
      cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1975 1976
      cJSON_AddItemToObject(column, "length", cbytes);
    }
wmmhello's avatar
wmmhello 已提交
1977 1978 1979 1980 1981
    cJSON_AddItemToArray(columns, column);
  }
  cJSON_AddItemToObject(json, "columns", columns);

  cJSON* tags = cJSON_CreateArray();
1982 1983 1984 1985
  for (int i = 0; schemaTag && i < schemaTag->nCols; i++) {
    cJSON*   tag = cJSON_CreateObject();
    SSchema* s = schemaTag->pSchema + i;
    cJSON*   tname = cJSON_CreateString(s->name);
wmmhello's avatar
wmmhello 已提交
1986 1987 1988
    cJSON_AddItemToObject(tag, "name", tname);
    cJSON* ttype = cJSON_CreateNumber(s->type);
    cJSON_AddItemToObject(tag, "type", ttype);
1989
    if (s->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
1990
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
1991
      cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1992
      cJSON_AddItemToObject(tag, "length", cbytes);
1993 1994 1995
    } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
      cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1996 1997
      cJSON_AddItemToObject(tag, "length", cbytes);
    }
wmmhello's avatar
wmmhello 已提交
1998 1999 2000 2001 2002 2003 2004 2005 2006
    cJSON_AddItemToArray(tags, tag);
  }
  cJSON_AddItemToObject(json, "tags", tags);

  string = cJSON_PrintUnformatted(json);
  cJSON_Delete(json);
  return string;
}

2007 2008 2009 2010
static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) {
  SMAlterStbReq req = {0};
  cJSON*        json = NULL;
  char*         string = NULL;
wmmhello's avatar
wmmhello 已提交
2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021

  if (tDeserializeSMAlterStbReq(alterData, alterDataLen, &req) != 0) {
    goto end;
  }

  json = cJSON_CreateObject();
  if (json == NULL) {
    goto end;
  }
  cJSON* type = cJSON_CreateString("alter");
  cJSON_AddItemToObject(json, "type", type);
2022 2023
  //  cJSON* uid = cJSON_CreateNumber(id);
  //  cJSON_AddItemToObject(json, "uid", uid);
wmmhello's avatar
wmmhello 已提交
2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035
  SName name = {0};
  tNameFromString(&name, req.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
  cJSON* tableName = cJSON_CreateString(name.tname);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString("super");
  cJSON_AddItemToObject(json, "tableType", tableType);

  cJSON* alterType = cJSON_CreateNumber(req.alterType);
  cJSON_AddItemToObject(json, "alterType", alterType);
  switch (req.alterType) {
    case TSDB_ALTER_TABLE_ADD_TAG:
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
2036 2037
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
      cJSON*      colName = cJSON_CreateString(field->name);
wmmhello's avatar
wmmhello 已提交
2038 2039 2040 2041
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colType = cJSON_CreateNumber(field->type);
      cJSON_AddItemToObject(json, "colType", colType);

2042
      if (field->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
2043
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
2044
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2045
        cJSON_AddItemToObject(json, "colLength", cbytes);
2046 2047 2048
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2049 2050 2051 2052 2053
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
      break;
    }
    case TSDB_ALTER_TABLE_DROP_TAG:
2054 2055 2056
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
      cJSON*      colName = cJSON_CreateString(field->name);
wmmhello's avatar
wmmhello 已提交
2057 2058 2059 2060
      cJSON_AddItemToObject(json, "colName", colName);
      break;
    }
    case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
2061 2062 2063
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
      cJSON*      colName = cJSON_CreateString(field->name);
wmmhello's avatar
wmmhello 已提交
2064 2065 2066
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colType = cJSON_CreateNumber(field->type);
      cJSON_AddItemToObject(json, "colType", colType);
2067
      if (field->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
2068
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
2069
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2070
        cJSON_AddItemToObject(json, "colLength", cbytes);
2071 2072 2073
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2074 2075 2076 2077 2078
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
      break;
    }
    case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
2079 2080 2081 2082
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
      TAOS_FIELD* oldField = taosArrayGet(req.pFields, 0);
      TAOS_FIELD* newField = taosArrayGet(req.pFields, 1);
      cJSON*      colName = cJSON_CreateString(oldField->name);
wmmhello's avatar
wmmhello 已提交
2083 2084 2085 2086 2087 2088 2089 2090 2091 2092
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colNewName = cJSON_CreateString(newField->name);
      cJSON_AddItemToObject(json, "colNewName", colNewName);
      break;
    }
    default:
      break;
  }
  string = cJSON_PrintUnformatted(json);

2093
end:
wmmhello's avatar
wmmhello 已提交
2094 2095 2096 2097 2098
  cJSON_Delete(json);
  tFreeSMAltertbReq(&req);
  return string;
}

2099
static char* processCreateStb(SMqMetaRsp* metaRsp) {
wmmhello's avatar
wmmhello 已提交
2100 2101
  SVCreateStbReq req = {0};
  SDecoder       coder;
2102
  char*          string = NULL;
wmmhello's avatar
wmmhello 已提交
2103 2104

  // decode and process req
2105
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2106 2107 2108 2109 2110 2111 2112 2113 2114 2115
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
    goto _err;
  }
  string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE);
  tDecoderClear(&coder);
  return string;

2116
_err:
wmmhello's avatar
wmmhello 已提交
2117 2118 2119 2120
  tDecoderClear(&coder);
  return string;
}

2121
static char* processAlterStb(SMqMetaRsp* metaRsp) {
wmmhello's avatar
wmmhello 已提交
2122 2123
  SVCreateStbReq req = {0};
  SDecoder       coder;
2124
  char*          string = NULL;
wmmhello's avatar
wmmhello 已提交
2125 2126

  // decode and process req
2127
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2128 2129 2130 2131 2132 2133 2134 2135 2136 2137
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
    goto _err;
  }
  string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen);
  tDecoderClear(&coder);
  return string;

2138
_err:
wmmhello's avatar
wmmhello 已提交
2139 2140 2141 2142
  tDecoderClear(&coder);
  return string;
}

2143 2144
static char* buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray* tagName, int64_t id) {
  char*   string = NULL;
wmmhello's avatar
wmmhello 已提交
2145
  SArray* pTagVals = NULL;
2146
  cJSON*  json = cJSON_CreateObject();
wmmhello's avatar
wmmhello 已提交
2147 2148 2149 2150 2151
  if (json == NULL) {
    return string;
  }
  cJSON* type = cJSON_CreateString("create");
  cJSON_AddItemToObject(json, "type", type);
2152 2153 2154 2155
  //  char cid[32] = {0};
  //  sprintf(cid, "%"PRIi64, id);
  //  cJSON* cid_ = cJSON_CreateString(cid);
  //  cJSON_AddItemToObject(json, "id", cid_);
wmmhello's avatar
wmmhello 已提交
2156

wmmhello's avatar
wmmhello 已提交
2157 2158 2159 2160
  cJSON* tableName = cJSON_CreateString(name);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString("child");
  cJSON_AddItemToObject(json, "tableType", tableType);
wmmhello's avatar
wmmhello 已提交
2161
  cJSON* using = cJSON_CreateString(sname);
wmmhello's avatar
wmmhello 已提交
2162
  cJSON_AddItemToObject(json, "using", using);
2163 2164
  //  cJSON* version = cJSON_CreateNumber(1);
  //  cJSON_AddItemToObject(json, "version", version);
wmmhello's avatar
wmmhello 已提交
2165

2166
  cJSON*  tags = cJSON_CreateArray();
wmmhello's avatar
wmmhello 已提交
2167 2168 2169 2170
  int32_t code = tTagToValArray(pTag, &pTagVals);
  if (code) {
    goto end;
  }
wmmhello's avatar
wmmhello 已提交
2171

wmmhello's avatar
wmmhello 已提交
2172 2173
  if (tTagIsJson(pTag)) {
    STag* p = (STag*)pTag;
2174
    if (p->nTag == 0) {
wmmhello's avatar
wmmhello 已提交
2175 2176
      goto end;
    }
2177 2178
    char*    pJson = parseTagDatatoJson(pTag);
    cJSON*   tag = cJSON_CreateObject();
wmmhello's avatar
wmmhello 已提交
2179 2180
    STagVal* pTagVal = taosArrayGet(pTagVals, 0);

wmmhello's avatar
wmmhello 已提交
2181 2182
    char*  ptname = taosArrayGet(tagName, 0);
    cJSON* tname = cJSON_CreateString(ptname);
wmmhello's avatar
wmmhello 已提交
2183
    cJSON_AddItemToObject(tag, "name", tname);
2184 2185
    //    cJSON* cid_ = cJSON_CreateString("");
    //    cJSON_AddItemToObject(tag, "cid", cid_);
wmmhello's avatar
wmmhello 已提交
2186 2187
    cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON);
    cJSON_AddItemToObject(tag, "type", ttype);
wmmhello's avatar
wmmhello 已提交
2188
    cJSON* tvalue = cJSON_CreateString(pJson);
wmmhello's avatar
wmmhello 已提交
2189 2190
    cJSON_AddItemToObject(tag, "value", tvalue);
    cJSON_AddItemToArray(tags, tag);
wmmhello's avatar
wmmhello 已提交
2191
    taosMemoryFree(pJson);
wmmhello's avatar
wmmhello 已提交
2192 2193 2194
    goto end;
  }

2195
  for (int i = 0; i < taosArrayGetSize(pTagVals); i++) {
wmmhello's avatar
wmmhello 已提交
2196 2197 2198
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);

    cJSON* tag = cJSON_CreateObject();
wmmhello's avatar
wmmhello 已提交
2199

wmmhello's avatar
wmmhello 已提交
2200 2201
    char*  ptname = taosArrayGet(tagName, i);
    cJSON* tname = cJSON_CreateString(ptname);
wmmhello's avatar
wmmhello 已提交
2202
    cJSON_AddItemToObject(tag, "name", tname);
2203 2204
    //    cJSON* cid = cJSON_CreateNumber(pTagVal->cid);
    //    cJSON_AddItemToObject(tag, "cid", cid);
wmmhello's avatar
wmmhello 已提交
2205 2206
    cJSON* ttype = cJSON_CreateNumber(pTagVal->type);
    cJSON_AddItemToObject(tag, "type", ttype);
wmmhello's avatar
wmmhello 已提交
2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218

    char* buf = NULL;
    if (IS_VAR_DATA_TYPE(pTagVal->type)) {
      buf = taosMemoryCalloc(pTagVal->nData + 1, 1);
      dataConverToStr(buf, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL);
    } else {
      buf = taosMemoryCalloc(32, 1);
      dataConverToStr(buf, pTagVal->type, &pTagVal->i64, tDataTypes[pTagVal->type].bytes, NULL);
    }

    cJSON* tvalue = cJSON_CreateString(buf);
    taosMemoryFree(buf);
wmmhello's avatar
wmmhello 已提交
2219 2220 2221
    cJSON_AddItemToObject(tag, "value", tvalue);
    cJSON_AddItemToArray(tags, tag);
  }
wmmhello's avatar
wmmhello 已提交
2222

2223
end:
wmmhello's avatar
wmmhello 已提交
2224 2225 2226
  cJSON_AddItemToObject(json, "tags", tags);
  string = cJSON_PrintUnformatted(json);
  cJSON_Delete(json);
wmmhello's avatar
wmmhello 已提交
2227
  taosArrayDestroy(pTagVals);
wmmhello's avatar
wmmhello 已提交
2228 2229 2230
  return string;
}

2231
static char* processCreateTable(SMqMetaRsp* metaRsp) {
wmmhello's avatar
wmmhello 已提交
2232 2233
  SDecoder           decoder = {0};
  SVCreateTbBatchReq req = {0};
2234 2235
  SVCreateTbReq*     pCreateReq;
  char*              string = NULL;
wmmhello's avatar
wmmhello 已提交
2236
  // decode
2237
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2238 2239 2240 2241 2242 2243 2244 2245 2246
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
    goto _exit;
  }

  // loop to create table
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;
2247 2248 2249 2250 2251 2252
    if (pCreateReq->type == TSDB_CHILD_TABLE) {
      string = buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.name, pCreateReq->name,
                                     pCreateReq->ctb.tagName, pCreateReq->uid);
    } else if (pCreateReq->type == TSDB_NORMAL_TABLE) {
      string =
          buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE);
wmmhello's avatar
wmmhello 已提交
2253 2254 2255 2256 2257
    }
  }

  tDecoderClear(&decoder);

2258
_exit:
wmmhello's avatar
wmmhello 已提交
2259 2260 2261 2262
  tDecoderClear(&decoder);
  return string;
}

2263 2264 2265 2266
static char* processAlterTable(SMqMetaRsp* metaRsp) {
  SDecoder     decoder = {0};
  SVAlterTbReq vAlterTbReq = {0};
  char*        string = NULL;
wmmhello's avatar
wmmhello 已提交
2267 2268

  // decode
2269
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVAlterTbReq(&decoder, &vAlterTbReq) < 0) {
    goto _exit;
  }

  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    goto _exit;
  }
  cJSON* type = cJSON_CreateString("alter");
  cJSON_AddItemToObject(json, "type", type);
2282 2283
  //  cJSON* uid = cJSON_CreateNumber(id);
  //  cJSON_AddItemToObject(json, "uid", uid);
wmmhello's avatar
wmmhello 已提交
2284 2285
  cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName);
  cJSON_AddItemToObject(json, "tableName", tableName);
wmmhello's avatar
wmmhello 已提交
2286
  cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ? "child" : "normal");
wmmhello's avatar
wmmhello 已提交
2287
  cJSON_AddItemToObject(json, "tableType", tableType);
wmmhello's avatar
wmmhello 已提交
2288 2289
  cJSON* alterType = cJSON_CreateNumber(vAlterTbReq.action);
  cJSON_AddItemToObject(json, "alterType", alterType);
wmmhello's avatar
wmmhello 已提交
2290 2291 2292 2293 2294 2295 2296

  switch (vAlterTbReq.action) {
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
      cJSON_AddItemToObject(json, "colType", colType);
wmmhello's avatar
wmmhello 已提交
2297

2298
      if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
2299
        int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
2300
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2301
        cJSON_AddItemToObject(json, "colLength", cbytes);
2302 2303 2304
      } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
        int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2305 2306
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
wmmhello's avatar
wmmhello 已提交
2307 2308
      break;
    }
2309
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
wmmhello's avatar
wmmhello 已提交
2310 2311 2312 2313
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
      break;
    }
2314
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
wmmhello's avatar
wmmhello 已提交
2315 2316
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
wmmhello's avatar
wmmhello 已提交
2317
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.colModType);
wmmhello's avatar
wmmhello 已提交
2318
      cJSON_AddItemToObject(json, "colType", colType);
2319
      if (vAlterTbReq.colModType == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
2320
        int32_t length = vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE;
2321
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2322
        cJSON_AddItemToObject(json, "colLength", cbytes);
2323 2324 2325
      } else if (vAlterTbReq.colModType == TSDB_DATA_TYPE_NCHAR) {
        int32_t length = (vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2326 2327
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
wmmhello's avatar
wmmhello 已提交
2328 2329
      break;
    }
2330
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
wmmhello's avatar
wmmhello 已提交
2331 2332 2333 2334 2335 2336
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colNewName = cJSON_CreateString(vAlterTbReq.colNewName);
      cJSON_AddItemToObject(json, "colNewName", colNewName);
      break;
    }
2337
    case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: {
wmmhello's avatar
wmmhello 已提交
2338 2339
      cJSON* tagName = cJSON_CreateString(vAlterTbReq.tagName);
      cJSON_AddItemToObject(json, "colName", tagName);
wmmhello's avatar
wmmhello 已提交
2340

wmmhello's avatar
wmmhello 已提交
2341
      bool isNull = vAlterTbReq.isNull;
2342 2343 2344
      if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
        STag* jsonTag = (STag*)vAlterTbReq.pTagVal;
        if (jsonTag->nTag == 0) isNull = true;
wmmhello's avatar
wmmhello 已提交
2345
      }
2346
      if (!isNull) {
wmmhello's avatar
wmmhello 已提交
2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361
        char* buf = NULL;

        if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
          ASSERT(tTagIsJson(vAlterTbReq.pTagVal) == true);
          buf = parseTagDatatoJson(vAlterTbReq.pTagVal);
        } else {
          buf = taosMemoryCalloc(vAlterTbReq.nTagVal + 1, 1);
          dataConverToStr(buf, vAlterTbReq.tagType, vAlterTbReq.pTagVal, vAlterTbReq.nTagVal, NULL);
        }

        cJSON* colValue = cJSON_CreateString(buf);
        cJSON_AddItemToObject(json, "colValue", colValue);
        taosMemoryFree(buf);
      }

wmmhello's avatar
wmmhello 已提交
2362 2363
      cJSON* isNullCJson = cJSON_CreateBool(isNull);
      cJSON_AddItemToObject(json, "colValueNull", isNullCJson);
wmmhello's avatar
wmmhello 已提交
2364 2365
      break;
    }
wmmhello's avatar
wmmhello 已提交
2366 2367 2368 2369 2370
    default:
      break;
  }
  string = cJSON_PrintUnformatted(json);

2371
_exit:
wmmhello's avatar
wmmhello 已提交
2372 2373 2374 2375
  tDecoderClear(&decoder);
  return string;
}

2376 2377 2378 2379
static char* processDropSTable(SMqMetaRsp* metaRsp) {
  SDecoder     decoder = {0};
  SVDropStbReq req = {0};
  char*        string = NULL;
wmmhello's avatar
wmmhello 已提交
2380 2381

  // decode
2382
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
    goto _exit;
  }

  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    goto _exit;
  }
  cJSON* type = cJSON_CreateString("drop");
  cJSON_AddItemToObject(json, "type", type);
  cJSON* tableName = cJSON_CreateString(req.name);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString("super");
  cJSON_AddItemToObject(json, "tableType", tableType);

  string = cJSON_PrintUnformatted(json);

2402
_exit:
wmmhello's avatar
wmmhello 已提交
2403 2404 2405 2406
  tDecoderClear(&decoder);
  return string;
}

2407 2408 2409 2410
static char* processDropTable(SMqMetaRsp* metaRsp) {
  SDecoder         decoder = {0};
  SVDropTbBatchReq req = {0};
  char*            string = NULL;
wmmhello's avatar
wmmhello 已提交
2411 2412

  // decode
2413
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVDropTbBatchReq(&decoder, &req) < 0) {
    goto _exit;
  }

  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    goto _exit;
  }
  cJSON* type = cJSON_CreateString("drop");
  cJSON_AddItemToObject(json, "type", type);
2426 2427 2428 2429
  //  cJSON* uid = cJSON_CreateNumber(id);
  //  cJSON_AddItemToObject(json, "uid", uid);
  //  cJSON* tableType = cJSON_CreateString("normal");
  //  cJSON_AddItemToObject(json, "tableType", tableType);
wmmhello's avatar
wmmhello 已提交
2430 2431 2432 2433 2434

  cJSON* tableNameList = cJSON_CreateArray();
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;

wmmhello's avatar
wmmhello 已提交
2435
    cJSON* tableName = cJSON_CreateString(pDropTbReq->name);
wmmhello's avatar
wmmhello 已提交
2436 2437 2438 2439 2440 2441
    cJSON_AddItemToArray(tableNameList, tableName);
  }
  cJSON_AddItemToObject(json, "tableNameList", tableNameList);

  string = cJSON_PrintUnformatted(json);

2442
_exit:
wmmhello's avatar
wmmhello 已提交
2443 2444 2445 2446
  tDecoderClear(&decoder);
  return string;
}

2447
char* tmq_get_json_meta(TAOS_RES* res) {
wmmhello's avatar
wmmhello 已提交
2448 2449 2450 2451 2452
  if (!TD_RES_TMQ_META(res)) {
    return NULL;
  }

  SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
2453
  if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB) {
wmmhello's avatar
wmmhello 已提交
2454
    return processCreateStb(&pMetaRspObj->metaRsp);
2455
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_STB) {
wmmhello's avatar
wmmhello 已提交
2456
    return processAlterStb(&pMetaRspObj->metaRsp);
2457
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_STB) {
wmmhello's avatar
wmmhello 已提交
2458
    return processDropSTable(&pMetaRspObj->metaRsp);
2459
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_TABLE) {
wmmhello's avatar
wmmhello 已提交
2460
    return processCreateTable(&pMetaRspObj->metaRsp);
2461
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_TABLE) {
wmmhello's avatar
wmmhello 已提交
2462
    return processAlterTable(&pMetaRspObj->metaRsp);
2463
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE) {
wmmhello's avatar
wmmhello 已提交
2464 2465 2466 2467 2468
    return processDropTable(&pMetaRspObj->metaRsp);
  }
  return NULL;
}

2469
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
wmmhello's avatar
wmmhello 已提交
2470

2471
static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
wmmhello's avatar
wmmhello 已提交
2472 2473 2474
  SVCreateStbReq req = {0};
  SDecoder       coder;
  SMCreateStbReq pReq = {0};
2475 2476
  int32_t        code = TSDB_CODE_SUCCESS;
  SRequestObj*   pRequest = NULL;
wmmhello's avatar
wmmhello 已提交
2477

2478
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2479 2480 2481 2482
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2483
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2484 2485 2486 2487
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2488
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2489 2490 2491 2492 2493 2494 2495 2496
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }
  // build create stable
  pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SField));
2497
  for (int32_t i = 0; i < req.schemaRow.nCols; i++) {
wmmhello's avatar
wmmhello 已提交
2498
    SSchema* pSchema = req.schemaRow.pSchema + i;
2499
    SField   field = {.type = pSchema->type, .bytes = pSchema->bytes};
wmmhello's avatar
wmmhello 已提交
2500 2501 2502 2503
    strcpy(field.name, pSchema->name);
    taosArrayPush(pReq.pColumns, &field);
  }
  pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField));
2504
  for (int32_t i = 0; i < req.schemaTag.nCols; i++) {
wmmhello's avatar
wmmhello 已提交
2505
    SSchema* pSchema = req.schemaTag.pSchema + i;
2506
    SField   field = {.type = pSchema->type, .bytes = pSchema->bytes};
wmmhello's avatar
wmmhello 已提交
2507 2508 2509
    strcpy(field.name, pSchema->name);
    taosArrayPush(pReq.pTags, &field);
  }
2510

2511 2512
  pReq.colVer = req.schemaRow.version;
  pReq.tagVer = req.schemaTag.version;
wmmhello's avatar
wmmhello 已提交
2513 2514 2515 2516
  pReq.numOfColumns = req.schemaRow.nCols;
  pReq.numOfTags = req.schemaTag.nCols;
  pReq.commentLen = -1;
  pReq.suid = req.suid;
wmmhello's avatar
wmmhello 已提交
2517
  pReq.source = TD_REQ_FROM_TAOX;
wmmhello's avatar
wmmhello 已提交
2518
  pReq.igExists = true;
wmmhello's avatar
wmmhello 已提交
2519

2520
  STscObj* pTscObj = pRequest->pTscObj;
2521
  SName    tableName;
wmmhello's avatar
wmmhello 已提交
2522
  tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
wmmhello's avatar
wmmhello 已提交
2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534

  SCmdMsgInfo pCmdMsg = {0};
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
  if (NULL == pCmdMsg.pMsg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);

2535
  SQuery pQuery = {0};
wmmhello's avatar
wmmhello 已提交
2536 2537 2538 2539 2540 2541 2542 2543 2544
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
  pQuery.pCmdMsg = &pCmdMsg;
  pQuery.msgType = pQuery.pCmdMsg->msgType;
  pQuery.stableQuery = true;

  launchQueryImpl(pRequest, &pQuery, true, NULL);
  code = pRequest->code;
  taosMemoryFree(pCmdMsg.pMsg);

2545
end:
wmmhello's avatar
wmmhello 已提交
2546 2547 2548 2549 2550 2551
  destroyRequest(pRequest);
  tFreeSMCreateStbReq(&pReq);
  tDecoderClear(&coder);
  return code;
}

2552
static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
wmmhello's avatar
wmmhello 已提交
2553 2554 2555
  SVDropStbReq req = {0};
  SDecoder     coder;
  SMDropStbReq pReq = {0};
2556
  int32_t      code = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2557 2558
  SRequestObj* pRequest = NULL;

2559
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2560 2561 2562 2563
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2564
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2565 2566 2567 2568
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2569
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2570 2571 2572 2573 2574 2575 2576 2577 2578
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVDropStbReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

  // build drop stable
  pReq.igNotExists = true;
wmmhello's avatar
wmmhello 已提交
2579
  pReq.source = TD_REQ_FROM_TAOX;
wmmhello's avatar
wmmhello 已提交
2580
  pReq.suid = req.suid;
2581 2582

  STscObj* pTscObj = pRequest->pTscObj;
2583
  SName    tableName;
wmmhello's avatar
wmmhello 已提交
2584
  tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
wmmhello's avatar
wmmhello 已提交
2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596

  SCmdMsgInfo pCmdMsg = {0};
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  pCmdMsg.msgType = TDMT_MND_DROP_STB;
  pCmdMsg.msgLen = tSerializeSMDropStbReq(NULL, 0, &pReq);
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
  if (NULL == pCmdMsg.pMsg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  tSerializeSMDropStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);

2597
  SQuery pQuery = {0};
wmmhello's avatar
wmmhello 已提交
2598 2599 2600 2601 2602 2603 2604 2605 2606
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
  pQuery.pCmdMsg = &pCmdMsg;
  pQuery.msgType = pQuery.pCmdMsg->msgType;
  pQuery.stableQuery = true;

  launchQueryImpl(pRequest, &pQuery, true, NULL);
  code = pRequest->code;
  taosMemoryFree(pCmdMsg.pMsg);

2607
end:
wmmhello's avatar
wmmhello 已提交
2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  return code;
}

typedef struct SVgroupCreateTableBatch {
  SVCreateTbBatchReq req;
  SVgroupInfo        info;
  char               dbName[TSDB_DB_NAME_LEN];
} SVgroupCreateTableBatch;

static void destroyCreateTbReqBatch(void* data) {
2620
  SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*)data;
wmmhello's avatar
wmmhello 已提交
2621 2622 2623
  taosArrayDestroy(pTbBatch->req.pArray);
}

L
Liu Jicong 已提交
2624 2625 2626 2627 2628 2629 2630
static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
  SVCreateTbBatchReq req = {0};
  SDecoder           coder = {0};
  int32_t            code = TSDB_CODE_SUCCESS;
  SRequestObj*       pRequest = NULL;
  SQuery*            pQuery = NULL;
  SHashObj*          pVgroupHashmap = NULL;
wmmhello's avatar
wmmhello 已提交
2631

L
Liu Jicong 已提交
2632
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2633 2634 2635 2636
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2637
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2638 2639 2640 2641
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2642
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2643 2644 2645 2646 2647 2648 2649
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

2650 2651
  STscObj* pTscObj = pRequest->pTscObj;

L
Liu Jicong 已提交
2652 2653
  SVCreateTbReq* pCreateReq = NULL;
  SCatalog*      pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
  if (NULL == pVgroupHashmap) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);

  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
2667 2668 2669
                           .requestId = pRequest->requestId,
                           .requestObjRefId = pRequest->self,
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
wmmhello's avatar
wmmhello 已提交
2670 2671 2672 2673 2674
  // loop to create table
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;

    SVgroupInfo pInfo = {0};
2675
    SName       pName;
wmmhello's avatar
wmmhello 已提交
2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706
    toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName);
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
    if (code != TSDB_CODE_SUCCESS) {
      goto end;
    }

    SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
    if (pTableBatch == NULL) {
      SVgroupCreateTableBatch tBatch = {0};
      tBatch.info = pInfo;
      strcpy(tBatch.dbName, pRequest->pDb);

      tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
      taosArrayPush(tBatch.req.pArray, pCreateReq);

      taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch));
    } else {  // add to the correct vgroup
      taosArrayPush(pTableBatch->req.pArray, pCreateReq);
    }
  }

  SArray* pBufArray = serializeVgroupsCreateTableBatch(pVgroupHashmap);
  if (NULL == pBufArray) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->msgType = TDMT_VND_CREATE_TABLE;
  pQuery->stableQuery = false;
2707
  pQuery->pRoot = nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT);
wmmhello's avatar
wmmhello 已提交
2708 2709 2710 2711 2712 2713 2714

  code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  launchQueryImpl(pRequest, pQuery, false, NULL);
2715 2716
  pQuery = NULL;  // no need to free in the end
  code = pRequest->code;
wmmhello's avatar
wmmhello 已提交
2717

2718
end:
wmmhello's avatar
wmmhello 已提交
2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736
  taosHashCleanup(pVgroupHashmap);
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  qDestroyQuery(pQuery);
  return code;
}

typedef struct SVgroupDropTableBatch {
  SVDropTbBatchReq req;
  SVgroupInfo      info;
  char             dbName[TSDB_DB_NAME_LEN];
} SVgroupDropTableBatch;

static void destroyDropTbReqBatch(void* data) {
  SVgroupDropTableBatch* pTbBatch = (SVgroupDropTableBatch*)data;
  taosArrayDestroy(pTbBatch->req.pArray);
}

L
Liu Jicong 已提交
2737 2738 2739 2740 2741 2742 2743
static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
  SVDropTbBatchReq req = {0};
  SDecoder         coder = {0};
  int32_t          code = TSDB_CODE_SUCCESS;
  SRequestObj*     pRequest = NULL;
  SQuery*          pQuery = NULL;
  SHashObj*        pVgroupHashmap = NULL;
wmmhello's avatar
wmmhello 已提交
2744

2745
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2746 2747 2748 2749
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2750
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2751 2752 2753 2754
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2755
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2756 2757 2758 2759 2760 2761 2762
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVDropTbBatchReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

2763 2764
  STscObj* pTscObj = pRequest->pTscObj;

L
Liu Jicong 已提交
2765 2766
  SVDropTbReq* pDropReq = NULL;
  SCatalog*    pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
  if (NULL == pVgroupHashmap) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);

  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
2780 2781 2782
                           .requestId = pRequest->requestId,
                           .requestObjRefId = pRequest->self,
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
wmmhello's avatar
wmmhello 已提交
2783 2784 2785
  // loop to create table
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pDropReq = req.pReqs + iReq;
wmmhello's avatar
wmmhello 已提交
2786
    pDropReq->igNotExists = true;
wmmhello's avatar
wmmhello 已提交
2787 2788

    SVgroupInfo pInfo = {0};
2789
    SName       pName;
wmmhello's avatar
wmmhello 已提交
2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811 2812 2813 2814 2815 2816 2817 2818
    toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName);
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
    if (code != TSDB_CODE_SUCCESS) {
      goto end;
    }

    SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
    if (pTableBatch == NULL) {
      SVgroupDropTableBatch tBatch = {0};
      tBatch.info = pInfo;
      tBatch.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
      taosArrayPush(tBatch.req.pArray, pDropReq);

      taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch));
    } else {  // add to the correct vgroup
      taosArrayPush(pTableBatch->req.pArray, pDropReq);
    }
  }

  SArray* pBufArray = serializeVgroupsDropTableBatch(pVgroupHashmap);
  if (NULL == pBufArray) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->msgType = TDMT_VND_DROP_TABLE;
  pQuery->stableQuery = false;
2819
  pQuery->pRoot = nodesMakeNode(QUERY_NODE_DROP_TABLE_STMT);
wmmhello's avatar
wmmhello 已提交
2820 2821 2822 2823 2824 2825 2826

  code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  launchQueryImpl(pRequest, pQuery, false, NULL);
2827 2828
  pQuery = NULL;  // no need to free in the end
  code = pRequest->code;
wmmhello's avatar
wmmhello 已提交
2829

2830
end:
wmmhello's avatar
wmmhello 已提交
2831 2832 2833 2834 2835 2836 2837
  taosHashCleanup(pVgroupHashmap);
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  qDestroyQuery(pQuery);
  return code;
}

2838 2839 2840 2841 2842 2843 2844 2845
static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
  SVAlterTbReq   req = {0};
  SDecoder       coder = {0};
  int32_t        code = TSDB_CODE_SUCCESS;
  SRequestObj*   pRequest = NULL;
  SQuery*        pQuery = NULL;
  SArray*        pArray = NULL;
  SVgDataBlocks* pVgData = NULL;
wmmhello's avatar
wmmhello 已提交
2846

2847
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2848 2849 2850 2851 2852

  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2853
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2854 2855 2856 2857
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2858
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2859 2860 2861 2862 2863 2864 2865 2866
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVAlterTbReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

  // do not deal TSDB_ALTER_TABLE_UPDATE_OPTIONS
2867
  if (req.action == TSDB_ALTER_TABLE_UPDATE_OPTIONS) {
wmmhello's avatar
wmmhello 已提交
2868 2869 2870
    goto end;
  }

2871
  STscObj*  pTscObj = pRequest->pTscObj;
2872
  SCatalog* pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2873 2874 2875 2876 2877 2878
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
2879 2880 2881
                           .requestId = pRequest->requestId,
                           .requestObjRefId = pRequest->self,
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
wmmhello's avatar
wmmhello 已提交
2882 2883

  SVgroupInfo pInfo = {0};
2884
  SName       pName = {0};
wmmhello's avatar
wmmhello 已提交
2885 2886 2887 2888 2889 2890 2891 2892 2893 2894 2895 2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914 2915 2916 2917
  toName(pTscObj->acctId, pRequest->pDb, req.tbName, &pName);
  code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  pArray = taosArrayInit(1, sizeof(void*));
  if (NULL == pArray) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }

  pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
  if (NULL == pVgData) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  pVgData->vg = pInfo;
  pVgData->pData = taosMemoryMalloc(metaLen);
  if (NULL == pVgData->pData) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  memcpy(pVgData->pData, meta, metaLen);
  ((SMsgHead*)pVgData->pData)->vgId = htonl(pInfo.vgId);
  pVgData->size = metaLen;
  pVgData->numOfTables = 1;
  taosArrayPush(pArray, &pVgData);

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->msgType = TDMT_VND_ALTER_TABLE;
  pQuery->stableQuery = false;
2918
  pQuery->pRoot = nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT);
wmmhello's avatar
wmmhello 已提交
2919 2920 2921 2922 2923 2924 2925

  code = rewriteToVnodeModifyOpStmt(pQuery, pArray);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  launchQueryImpl(pRequest, pQuery, false, NULL);
2926
  pQuery = NULL;  // no need to free in the end
wmmhello's avatar
wmmhello 已提交
2927
  pVgData = NULL;
2928 2929 2930
  pArray = NULL;
  code = pRequest->code;
  if (code == TSDB_CODE_VND_TABLE_NOT_EXIST) {
wmmhello's avatar
wmmhello 已提交
2931 2932
    code = 0;
  }
wmmhello's avatar
wmmhello 已提交
2933

2934
end:
wmmhello's avatar
wmmhello 已提交
2935
  taosArrayDestroy(pArray);
2936
  if (pVgData) taosMemoryFreeClear(pVgData->pData);
wmmhello's avatar
wmmhello 已提交
2937 2938 2939 2940 2941 2942 2943
  taosMemoryFreeClear(pVgData);
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  qDestroyQuery(pQuery);
  return code;
}

2944
int32_t taos_write_raw_meta(TAOS* taos, tmq_raw_data* raw_meta) {
wmmhello's avatar
wmmhello 已提交
2945 2946 2947 2948
  if (!taos || !raw_meta) {
    return TSDB_CODE_INVALID_PARA;
  }

2949
  if (raw_meta->raw_meta_type == TDMT_VND_CREATE_STB) {
wmmhello's avatar
wmmhello 已提交
2950
    return taosCreateStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
2951
  } else if (raw_meta->raw_meta_type == TDMT_VND_ALTER_STB) {
wmmhello's avatar
wmmhello 已提交
2952
    return taosCreateStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
2953
  } else if (raw_meta->raw_meta_type == TDMT_VND_DROP_STB) {
wmmhello's avatar
wmmhello 已提交
2954
    return taosDropStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
2955
  } else if (raw_meta->raw_meta_type == TDMT_VND_CREATE_TABLE) {
wmmhello's avatar
wmmhello 已提交
2956
    return taosCreateTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
2957
  } else if (raw_meta->raw_meta_type == TDMT_VND_ALTER_TABLE) {
wmmhello's avatar
wmmhello 已提交
2958
    return taosAlterTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
2959
  } else if (raw_meta->raw_meta_type == TDMT_VND_DROP_TABLE) {
wmmhello's avatar
wmmhello 已提交
2960 2961 2962 2963 2964
    return taosDropTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
  }
  return TSDB_CODE_INVALID_PARA;
}

2965 2966
void tmq_free_raw_meta(tmq_raw_data* rawMeta) {
  //
wmmhello's avatar
wmmhello 已提交
2967 2968 2969
  taosMemoryFreeClear(rawMeta);
}

L
Liu Jicong 已提交
2970
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
2971
  //
L
Liu Jicong 已提交
2972
  tmqCommitInner2(tmq, msg, 0, 1, cb, param);
L
Liu Jicong 已提交
2973 2974
}

2975 2976 2977 2978
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) {
  //
  return tmqCommitInner2(tmq, msg, 0, 0, NULL, NULL);
}