tmq.c 90.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "cJSON.h"
17 18 19
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22 23
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
24
#include "tqueue.h"
25
#include "tref.h"
L
Liu Jicong 已提交
26 27
#include "ttimer.h"

L
Liu Jicong 已提交
28 29 30 31 32 33 34 35
int32_t tmqAskEp(tmq_t* tmq, bool async);

typedef struct {
  int8_t inited;
  tmr_h  timer;
} SMqMgmt;

static SMqMgmt tmqMgmt = {0};
36

L
Liu Jicong 已提交
37 38 39 40 41 42
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
43 44 45
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
46 47
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
48
struct tmq_list_t {
L
Liu Jicong 已提交
49
  SArray container;
L
Liu Jicong 已提交
50
};
L
Liu Jicong 已提交
51

L
Liu Jicong 已提交
52
struct tmq_conf_t {
53 54 55 56 57 58 59 60 61 62
  char    clientId[256];
  char    groupId[TSDB_CGROUP_LEN];
  int8_t  autoCommit;
  int8_t  resetOffset;
  int8_t  withTbName;
  int8_t  ssEnable;
  int32_t ssBatchSize;

  bool hbBgEnable;

63 64 65 66 67
  uint16_t       port;
  int32_t        autoCommitInterval;
  char*          ip;
  char*          user;
  char*          pass;
68
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
69
  void*          commitCbUserParam;
L
Liu Jicong 已提交
70 71 72
};

struct tmq_t {
L
Liu Jicong 已提交
73
  // conf
74 75 76 77 78 79 80 81 82 83 84
  char    groupId[TSDB_CGROUP_LEN];
  char    clientId[256];
  int8_t  withTbName;
  int8_t  useSnapshot;
  int8_t  autoCommit;
  int32_t autoCommitInterval;
  int32_t resetOffsetCfg;
  int64_t consumerId;

  bool hbBgEnable;

L
Liu Jicong 已提交
85 86
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
87 88 89 90

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
91 92
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
93
  int32_t epSkipCnt;
L
Liu Jicong 已提交
94
#endif
L
Liu Jicong 已提交
95 96
  int64_t pollCnt;

L
Liu Jicong 已提交
97
  // timer
98 99
  tmr_h hbLiveTimer;
  tmr_h epTimer;
L
Liu Jicong 已提交
100 101 102
  tmr_h reportTimer;
  tmr_h commitTimer;

L
Liu Jicong 已提交
103 104 105 106
  // connection
  STscObj* pTscObj;

  // container
L
Liu Jicong 已提交
107
  SArray*     clientTopics;  // SArray<SMqClientTopic>
L
Liu Jicong 已提交
108
  STaosQueue* mqueue;        // queue of rsp
L
Liu Jicong 已提交
109
  STaosQall*  qall;
L
Liu Jicong 已提交
110 111 112 113
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit

  // ctl
  tsem_t rspSem;
L
Liu Jicong 已提交
114 115
};

L
Liu Jicong 已提交
116 117
struct tmq_raw_data {
  void*   raw_meta;
wmmhello's avatar
wmmhello 已提交
118 119 120 121
  int32_t raw_meta_len;
  int16_t raw_meta_type;
};

X
Xiaoyu Wang 已提交
122 123 124 125 126 127 128 129
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
130
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
131 132
};

L
Liu Jicong 已提交
133
enum {
134
  TMQ_DELAYED_TASK__ASK_EP = 1,
L
Liu Jicong 已提交
135 136 137 138
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

L
Liu Jicong 已提交
139
typedef struct {
140 141 142
  // statistics
  int64_t pollCnt;
  // offset
L
Liu Jicong 已提交
143 144 145 146
  /*int64_t      committedOffset;*/
  /*int64_t      currentOffset;*/
  STqOffsetVal committedOffsetNew;
  STqOffsetVal currentOffsetNew;
L
Liu Jicong 已提交
147
  // connection info
148
  int32_t vgId;
X
Xiaoyu Wang 已提交
149
  int32_t vgStatus;
L
Liu Jicong 已提交
150
  int32_t vgSkipCnt;
151 152 153
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
154
typedef struct {
155
  // subscribe info
L
Liu Jicong 已提交
156
  char* topicName;
L
Liu Jicong 已提交
157
  char  db[TSDB_DB_FNAME_LEN];
L
Liu Jicong 已提交
158 159 160

  SArray* vgs;  // SArray<SMqClientVg>

L
Liu Jicong 已提交
161 162
  int8_t         isSchemaAdaptive;
  SSchemaWrapper schema;
163 164
} SMqClientTopic;

L
Liu Jicong 已提交
165 166 167 168 169
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
170
  union {
L
Liu Jicong 已提交
171 172
    SMqDataRsp dataRsp;
    SMqMetaRsp metaRsp;
L
Liu Jicong 已提交
173
  };
L
Liu Jicong 已提交
174 175
} SMqPollRspWrapper;

L
Liu Jicong 已提交
176
typedef struct {
L
Liu Jicong 已提交
177 178 179
  tmq_t*  tmq;
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
180
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
181

L
Liu Jicong 已提交
182
typedef struct {
183
  tmq_t*  tmq;
L
Liu Jicong 已提交
184
  int32_t code;
L
Liu Jicong 已提交
185
  int32_t async;
X
Xiaoyu Wang 已提交
186
  tsem_t  rspSem;
187 188
} SMqAskEpCbParam;

L
Liu Jicong 已提交
189
typedef struct {
L
Liu Jicong 已提交
190 191
  tmq_t*          tmq;
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
192
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
193
  int32_t         epoch;
L
Liu Jicong 已提交
194
  int32_t         vgId;
L
Liu Jicong 已提交
195
  tsem_t          rspSem;
X
Xiaoyu Wang 已提交
196
} SMqPollCbParam;
197

L
Liu Jicong 已提交
198
#if 0
L
Liu Jicong 已提交
199
typedef struct {
L
Liu Jicong 已提交
200
  tmq_t*         tmq;
L
Liu Jicong 已提交
201 202
  int8_t         async;
  int8_t         automatic;
L
Liu Jicong 已提交
203
  int8_t         freeOffsets;
L
Liu Jicong 已提交
204
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
205
  tsem_t         rspSem;
L
Liu Jicong 已提交
206
  int32_t        rspErr;
L
Liu Jicong 已提交
207
  SArray*        offsets;
L
Liu Jicong 已提交
208
  void*          userParam;
L
Liu Jicong 已提交
209
} SMqCommitCbParam;
L
Liu Jicong 已提交
210
#endif
L
Liu Jicong 已提交
211

212
typedef struct {
L
Liu Jicong 已提交
213 214 215 216
  tmq_t* tmq;
  int8_t automatic;
  int8_t async;
  /*int8_t         freeOffsets;*/
L
Liu Jicong 已提交
217 218
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
L
Liu Jicong 已提交
219
  int32_t        rspErr;
220
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
221 222 223 224
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
  void*  userParam;
  tsem_t rspSem;
225 226 227 228 229 230 231
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
  STqOffset*           pOffset;
} SMqCommitCbParam2;

232
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
233
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
234
  conf->withTbName = false;
L
Liu Jicong 已提交
235
  conf->autoCommit = true;
L
Liu Jicong 已提交
236
  conf->autoCommitInterval = 5000;
X
Xiaoyu Wang 已提交
237
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
238 239 240
  return conf;
}

L
Liu Jicong 已提交
241
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
242 243 244 245 246 247
  if (conf) {
    if (conf->ip) taosMemoryFree(conf->ip);
    if (conf->user) taosMemoryFree(conf->user);
    if (conf->pass) taosMemoryFree(conf->pass);
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
248 249 250
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
251 252
  if (strcmp(key, "group.id") == 0) {
    strcpy(conf->groupId, value);
L
Liu Jicong 已提交
253
    return TMQ_CONF_OK;
254
  }
L
Liu Jicong 已提交
255

256 257
  if (strcmp(key, "client.id") == 0) {
    strcpy(conf->clientId, value);
L
Liu Jicong 已提交
258 259
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
260

L
Liu Jicong 已提交
261 262
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
263
      conf->autoCommit = true;
L
Liu Jicong 已提交
264 265
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
266
      conf->autoCommit = false;
L
Liu Jicong 已提交
267 268 269 270
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
271
  }
L
Liu Jicong 已提交
272

L
Liu Jicong 已提交
273 274 275 276 277
  if (strcmp(key, "auto.commit.interval.ms") == 0) {
    conf->autoCommitInterval = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
278 279 280 281 282 283 284 285 286 287 288 289 290 291
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
292

293 294
  if (strcmp(key, "msg.with.table.name") == 0) {
    if (strcmp(value, "true") == 0) {
295
      conf->withTbName = true;
L
Liu Jicong 已提交
296
      return TMQ_CONF_OK;
297
    } else if (strcmp(value, "false") == 0) {
298
      conf->withTbName = false;
L
Liu Jicong 已提交
299
      return TMQ_CONF_OK;
300 301 302 303 304
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
305
  if (strcmp(key, "experimental.snapshot.enable") == 0) {
L
Liu Jicong 已提交
306
    if (strcmp(value, "true") == 0) {
307 308 309 310 311 312 313 314 315 316 317 318 319
      conf->ssEnable = true;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
      conf->ssEnable = false;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

  if (strcmp(key, "enable.heartbeat.background") == 0) {
    if (strcmp(value, "true") == 0) {
      conf->hbBgEnable = true;
L
Liu Jicong 已提交
320 321
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
322
      conf->hbBgEnable = false;
L
Liu Jicong 已提交
323 324 325 326
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
327
    return TMQ_CONF_OK;
L
Liu Jicong 已提交
328 329
  }

L
Liu Jicong 已提交
330
  if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
331
    conf->ssBatchSize = atoi(value);
L
Liu Jicong 已提交
332 333 334
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
335
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
336 337 338
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
339
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
340 341 342
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
343
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
344 345 346
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
347
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
348 349 350
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
351
  if (strcmp(key, "td.connect.db") == 0) {
352
    /*conf->db = strdup(value);*/
L
Liu Jicong 已提交
353 354 355
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
356
  return TMQ_CONF_UNKNOWN;
357 358 359
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
360 361
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
362 363
}

L
Liu Jicong 已提交
364 365
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
366 367 368 369 370
  if (src == NULL || src[0] == 0) return -1;
  char* topic = strdup(src);
  if (topic[0] != '`') {
    strtolower(topic, src);
  }
L
fix  
Liu Jicong 已提交
371
  if (taosArrayPush(container, &topic) == NULL) return -1;
372 373 374
  return 0;
}

L
Liu Jicong 已提交
375
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
376
  SArray* container = &list->container;
L
Liu Jicong 已提交
377
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
378 379
}

L
Liu Jicong 已提交
380 381 382 383 384 385 386 387 388 389
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

L
Liu Jicong 已提交
390 391 392 393
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

L
Liu Jicong 已提交
394
#if 0
L
Liu Jicong 已提交
395 396
int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
  SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
L
Liu Jicong 已提交
397
  pParam->rspErr = code;
L
Liu Jicong 已提交
398 399
  if (pParam->async) {
    if (pParam->automatic && pParam->tmq->commitCb) {
L
Liu Jicong 已提交
400
      pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, pParam->tmq->commitCbUserParam);
L
Liu Jicong 已提交
401
    } else if (!pParam->automatic && pParam->userCb) {
L
Liu Jicong 已提交
402
      pParam->userCb(pParam->tmq, pParam->rspErr, pParam->userParam);
L
Liu Jicong 已提交
403 404
    }

L
Liu Jicong 已提交
405
    if (pParam->freeOffsets) {
L
Liu Jicong 已提交
406 407 408 409 410 411 412 413 414
      taosArrayDestroy(pParam->offsets);
    }

    taosMemoryFree(pParam);
  } else {
    tsem_post(&pParam->rspSem);
  }
  return 0;
}
L
Liu Jicong 已提交
415
#endif
L
Liu Jicong 已提交
416

D
dapan1121 已提交
417
int32_t tmqCommitCb2(void* param, SDataBuf* pBuf, int32_t code) {
418 419 420
  SMqCommitCbParam2*   pParam = (SMqCommitCbParam2*)param;
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
  // push into array
L
Liu Jicong 已提交
421
#if 0
422 423 424 425 426
  if (code == 0) {
    taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset);
  } else {
    taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
  }
L
Liu Jicong 已提交
427
#endif
L
Liu Jicong 已提交
428

S
Shengliang Guan 已提交
429
  /*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
L
Liu Jicong 已提交
430 431
   * pOffset->version);*/

432
  // count down waiting rsp
L
Liu Jicong 已提交
433
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
434 435 436 437 438 439 440
  ASSERT(waitingRspNum >= 0);

  if (waitingRspNum == 0) {
    // if no more waiting rsp
    if (pParamSet->async) {
      // call async cb func
      if (pParamSet->automatic && pParamSet->tmq->commitCb) {
L
Liu Jicong 已提交
441
        pParamSet->tmq->commitCb(pParamSet->tmq, pParamSet->rspErr, pParamSet->tmq->commitCbUserParam);
442 443
      } else if (!pParamSet->automatic && pParamSet->userCb) {
        // sem post
L
Liu Jicong 已提交
444
        pParamSet->userCb(pParamSet->tmq, pParamSet->rspErr, pParamSet->userParam);
445
      }
L
Liu Jicong 已提交
446 447
    } else {
      tsem_post(&pParamSet->rspSem);
448 449
    }

L
Liu Jicong 已提交
450
#if 0
451 452
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
L
Liu Jicong 已提交
453
#endif
454 455 456 457
  }
  return 0;
}

L
Liu Jicong 已提交
458 459 460 461 462 463 464
static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pTopic, SMqCommitCbParamSet* pParamSet) {
  STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
  if (pOffset == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pOffset->val = pVg->currentOffsetNew;
465

L
Liu Jicong 已提交
466 467 468 469
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pOffset->subKey, tmq->groupId, groupLen);
  pOffset->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);
L
Liu Jicong 已提交
470

L
Liu Jicong 已提交
471 472 473 474 475 476 477 478 479 480
  int32_t len;
  int32_t code;
  tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
  if (code < 0) {
    ASSERT(0);
    return -1;
  }
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
  if (buf == NULL) return -1;
  ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
481

L
Liu Jicong 已提交
482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
  tEncodeSTqOffset(&encoder, pOffset);

  // build param
  SMqCommitCbParam2* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam2));
  pParam->params = pParamSet;
  pParam->pOffset = pOffset;

  // build send info
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
    return -1;
  }
  pMsgSendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = sizeof(SMsgHead) + len,
      .handle = NULL,
  };

S
Shengliang Guan 已提交
504 505
  tscDebug("consumer:%" PRId64 ", commit offset of %s on vgId:%d, offset is %" PRId64, tmq->consumerId, pOffset->subKey,
           pVg->vgId, pOffset->val.version);
L
Liu Jicong 已提交
506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552

  // TODO: put into cb
  pVg->committedOffsetNew = pVg->currentOffsetNew;

  pMsgSendInfo->requestId = generateRequestId();
  pMsgSendInfo->requestObjRefId = 0;
  pMsgSendInfo->param = pParam;
  pMsgSendInfo->fp = tmqCommitCb2;
  pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
  // send msg

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
  pParamSet->waitingRspNum++;
  pParamSet->totalRspNum++;
  return 0;
}

int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
  char*   topic;
  int32_t vgId;
  ASSERT(msg != NULL);
  if (TD_RES_TMQ(msg)) {
    SMqRspObj* pRspObj = (SMqRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
  } else if (TD_RES_TMQ_META(msg)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
    topic = pMetaRspObj->topic;
    vgId = pMetaRspObj->vgId;
  } else {
    return TSDB_CODE_TMQ_INVALID_MSG;
  }

  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pParamSet->tmq = tmq;
  pParamSet->automatic = 0;
  pParamSet->async = async;
  /*pParamSet->freeOffsets = 1;*/
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

L
Liu Jicong 已提交
553 554
  int32_t code = -1;

L
Liu Jicong 已提交
555 556 557 558 559 560 561 562 563 564
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(pTopic->topicName, topic) != 0) continue;
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      if (pVg->vgId != vgId) continue;

      if (pVg->currentOffsetNew.type > 0 && !tOffsetEqual(&pVg->currentOffsetNew, &pVg->committedOffsetNew)) {
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          goto FAIL;
L
Liu Jicong 已提交
565
        }
L
Liu Jicong 已提交
566
        goto HANDLE_RSP;
L
Liu Jicong 已提交
567 568
      }
    }
L
Liu Jicong 已提交
569
  }
L
Liu Jicong 已提交
570

L
Liu Jicong 已提交
571 572 573 574
HANDLE_RSP:
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
575 576 577
    return 0;
  }

L
Liu Jicong 已提交
578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
    return code;
  } else {
    code = 0;
  }

FAIL:
  if (code != 0 && async) {
    userCb(tmq, code, userParam);
  }
  return 0;
}

int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                        void* userParam) {
  int32_t code = -1;

  if (msg != NULL) {
    return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
  }

602 603 604 605 606 607 608 609
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pParamSet->tmq = tmq;
  pParamSet->automatic = automatic;
  pParamSet->async = async;
L
Liu Jicong 已提交
610
  /*pParamSet->freeOffsets = 1;*/
611 612 613 614 615 616
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
617

S
Shengliang Guan 已提交
618
    tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgNum %d", tmq->consumerId, pTopic->topicName,
L
Liu Jicong 已提交
619 620
             (int32_t)taosArrayGetSize(pTopic->vgs));

621
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
L
Liu Jicong 已提交
622 623
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);

S
Shengliang Guan 已提交
624 625
      tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgId:%d", tmq->consumerId, pTopic->topicName,
               pVg->vgId);
L
Liu Jicong 已提交
626

L
Liu Jicong 已提交
627 628 629 630
      if (pVg->currentOffsetNew.type > 0 && !tOffsetEqual(&pVg->currentOffsetNew, &pVg->committedOffsetNew)) {
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          continue;
        }
631 632 633 634
      }
    }
  }

L
Liu Jicong 已提交
635 636 637 638 639 640
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
    return 0;
  }

641 642 643 644 645 646 647 648 649 650
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
  } else {
    code = 0;
  }

  if (code != 0 && async) {
    if (automatic) {
L
Liu Jicong 已提交
651
      tmq->commitCb(tmq, code, tmq->commitCbUserParam);
652
    } else {
L
Liu Jicong 已提交
653
      userCb(tmq, code, userParam);
654 655 656
    }
  }

L
Liu Jicong 已提交
657
#if 0
658 659 660 661
  if (!async) {
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
  }
L
Liu Jicong 已提交
662
#endif
663 664 665 666

  return 0;
}

L
Liu Jicong 已提交
667 668
#if 0
int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async,
L
Liu Jicong 已提交
669 670 671 672 673 674
                       tmq_commit_cb* userCb, void* userParam) {
  SMqCMCommitOffsetReq req;
  SArray*              pOffsets = NULL;
  void*                buf = NULL;
  SMqCommitCbParam*    pParam = NULL;
  SMsgSendInfo*        sendInfo = NULL;
L
Liu Jicong 已提交
675 676
  int8_t               freeOffsets;
  int32_t              code = -1;
L
Liu Jicong 已提交
677

L
Liu Jicong 已提交
678
  if (msg == NULL) {
L
Liu Jicong 已提交
679
    freeOffsets = 1;
L
Liu Jicong 已提交
680 681 682 683 684 685 686 687 688 689 690 691 692 693
    pOffsets = taosArrayInit(0, sizeof(SMqOffset));
    for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
      for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
        SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
        SMqOffset    offset;
        tstrncpy(offset.topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
        tstrncpy(offset.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
        offset.vgId = pVg->vgId;
        offset.offset = pVg->currentOffset;
        taosArrayPush(pOffsets, &offset);
      }
    }
  } else {
L
Liu Jicong 已提交
694
    freeOffsets = 0;
L
Liu Jicong 已提交
695
    pOffsets = (SArray*)&msg->container;
L
Liu Jicong 已提交
696 697 698 699 700
  }

  req.num = (int32_t)pOffsets->size;
  req.offsets = pOffsets->pData;

L
Liu Jicong 已提交
701 702 703 704
  SEncoder encoder;

  tEncoderInit(&encoder, NULL, 0);
  code = tEncodeSMqCMCommitOffsetReq(&encoder, &req);
L
Liu Jicong 已提交
705 706 707
  if (code < 0) {
    goto END;
  }
L
Liu Jicong 已提交
708
  int32_t tlen = encoder.pos;
L
Liu Jicong 已提交
709 710
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) {
L
Liu Jicong 已提交
711
    tEncoderClear(&encoder);
L
Liu Jicong 已提交
712 713
    goto END;
  }
L
Liu Jicong 已提交
714 715
  tEncoderClear(&encoder);

L
Liu Jicong 已提交
716 717 718 719 720 721 722 723 724 725 726 727
  tEncoderInit(&encoder, buf, tlen);
  tEncodeSMqCMCommitOffsetReq(&encoder, &req);
  tEncoderClear(&encoder);

  pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
  if (pParam == NULL) {
    goto END;
  }
  pParam->tmq = tmq;
  pParam->automatic = automatic;
  pParam->async = async;
  pParam->offsets = pOffsets;
L
Liu Jicong 已提交
728
  pParam->freeOffsets = freeOffsets;
L
Liu Jicong 已提交
729 730 731 732
  pParam->userCb = userCb;
  pParam->userParam = userParam;
  if (!async) tsem_init(&pParam->rspSem, 0, 0);

733
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756
  if (sendInfo == NULL) goto END;
  sendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqCommitCb;
  sendInfo->msgType = TDMT_MND_MQ_COMMIT_OFFSET;

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

  if (!async) {
    tsem_wait(&pParam->rspSem);
    code = pParam->rspErr;
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
757 758
  } else {
    code = 0;
L
Liu Jicong 已提交
759 760 761 762 763 764 765
  }

  // avoid double free if msg is sent
  buf = NULL;

END:
  if (buf) taosMemoryFree(buf);
L
Liu Jicong 已提交
766 767
  /*if (pParam) taosMemoryFree(pParam);*/
  /*if (sendInfo) taosMemoryFree(sendInfo);*/
L
Liu Jicong 已提交
768 769 770

  if (code != 0 && async) {
    if (automatic) {
L
Liu Jicong 已提交
771
      tmq->commitCb(tmq, code, (tmq_topic_vgroup_list_t*)pOffsets, tmq->commitCbUserParam);
L
Liu Jicong 已提交
772
    } else {
L
Liu Jicong 已提交
773
      userCb(tmq, code, (tmq_topic_vgroup_list_t*)pOffsets, userParam);
L
Liu Jicong 已提交
774 775 776
    }
  }

L
Liu Jicong 已提交
777
  if (!async && freeOffsets) {
L
Liu Jicong 已提交
778 779 780 781
    taosArrayDestroy(pOffsets);
  }
  return code;
}
L
Liu Jicong 已提交
782
#endif
L
Liu Jicong 已提交
783

784
void tmqAssignAskEpTask(void* param, void* tmrId) {
L
Liu Jicong 已提交
785
  tmq_t*  tmq = (tmq_t*)param;
786
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
787
  *pTaskType = TMQ_DELAYED_TASK__ASK_EP;
L
Liu Jicong 已提交
788
  taosWriteQitem(tmq->delayedTask, pTaskType);
789
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
790 791 792 793
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
794
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
795 796
  *pTaskType = TMQ_DELAYED_TASK__COMMIT;
  taosWriteQitem(tmq->delayedTask, pTaskType);
797
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
798 799 800 801
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
802
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
803 804
  *pTaskType = TMQ_DELAYED_TASK__REPORT;
  taosWriteQitem(tmq->delayedTask, pTaskType);
805
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
806 807
}

808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
  if (pMsg && pMsg->pData) taosMemoryFree(pMsg->pData);
  return 0;
}

void tmqSendHbReq(void* param, void* tmrId) {
  // TODO replace with ref
  tmq_t*    tmq = (tmq_t*)param;
  int64_t   consumerId = tmq->consumerId;
  int32_t   epoch = tmq->epoch;
  SMqHbReq* pReq = taosMemoryMalloc(sizeof(SMqHbReq));
  if (pReq == NULL) goto OVER;
  pReq->consumerId = consumerId;
  pReq->epoch = epoch;

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pReq);
  }
  sendInfo->msgInfo = (SDataBuf){
      .pData = pReq,
      .len = sizeof(SMqHbReq),
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = NULL;
  sendInfo->fp = tmqHbCb;
  sendInfo->msgType = TDMT_MND_MQ_HB;

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

OVER:
  taosTmrReset(tmqSendHbReq, 1000, tmq, tmqMgmt.timer, &tmq->hbLiveTimer);
}

L
Liu Jicong 已提交
848 849 850 851 852 853 854 855
int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
  STaosQall* qall = taosAllocateQall();
  taosReadAllQitems(tmq->delayedTask, qall);
  while (1) {
    int8_t* pTaskType = NULL;
    taosGetQitem(qall, (void**)&pTaskType);
    if (pTaskType == NULL) break;

856
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
L
Liu Jicong 已提交
857
      tmqAskEp(tmq, true);
858
      taosTmrReset(tmqAssignAskEpTask, 1000, tmq, tmqMgmt.timer, &tmq->epTimer);
L
Liu Jicong 已提交
859
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
L
Liu Jicong 已提交
860
      tmqCommitInner2(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
L
Liu Jicong 已提交
861 862 863 864 865
      taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer);
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    } else {
      ASSERT(0);
    }
L
Liu Jicong 已提交
866
    taosFreeQitem(pTaskType);
L
Liu Jicong 已提交
867 868 869 870 871
  }
  taosFreeQall(qall);
  return 0;
}

L
Liu Jicong 已提交
872
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
873
  SMqRspWrapper* msg = NULL;
L
Liu Jicong 已提交
874 875 876 877 878 879 880 881
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }

L
fix  
Liu Jicong 已提交
882
  msg = NULL;
L
Liu Jicong 已提交
883 884 885 886 887 888 889 890 891 892
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }
}

D
dapan1121 已提交
893
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
894 895
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
L
Liu Jicong 已提交
896
  /*tmq_t* tmq = pParam->tmq;*/
L
Liu Jicong 已提交
897 898 899
  tsem_post(&pParam->rspSem);
  return 0;
}
900

L
Liu Jicong 已提交
901
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
X
Xiaoyu Wang 已提交
902 903 904 905
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
906
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
907
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
908
  }
L
Liu Jicong 已提交
909
  return 0;
X
Xiaoyu Wang 已提交
910 911
}

L
Liu Jicong 已提交
912 913 914
int32_t tmq_unsubscribe(tmq_t* tmq) {
  tmq_list_t* lst = tmq_list_new();
  int32_t     rsp = tmq_subscribe(tmq, lst);
L
Liu Jicong 已提交
915 916
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
917 918
}

L
Liu Jicong 已提交
919
#if 0
920
tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
wafwerar's avatar
wafwerar 已提交
921
  tmq_t* pTmq = taosMemoryCalloc(sizeof(tmq_t), 1);
922 923 924 925 926 927
  if (pTmq == NULL) {
    return NULL;
  }
  pTmq->pTscObj = (STscObj*)conn;
  pTmq->status = 0;
  pTmq->pollCnt = 0;
L
Liu Jicong 已提交
928
  pTmq->epoch = 0;
L
fix txn  
Liu Jicong 已提交
929
  pTmq->epStatus = 0;
L
temp  
Liu Jicong 已提交
930
  pTmq->epSkipCnt = 0;
L
Liu Jicong 已提交
931
  // set conf
932 933
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
L
Liu Jicong 已提交
934
  pTmq->autoCommit = conf->autoCommit;
935
  pTmq->commit_cb = conf->commit_cb;
L
Liu Jicong 已提交
936
  pTmq->resetOffsetCfg = conf->resetOffset;
L
Liu Jicong 已提交
937

L
Liu Jicong 已提交
938 939 940 941 942 943 944 945 946 947
  pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  if (pTmq->clientTopics == NULL) {
    taosMemoryFree(pTmq);
    return NULL;
  }

  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();

L
Liu Jicong 已提交
948 949
  tsem_init(&pTmq->rspSem, 0, 0);

L
Liu Jicong 已提交
950 951
  return pTmq;
}
L
Liu Jicong 已提交
952
#endif
L
Liu Jicong 已提交
953

L
Liu Jicong 已提交
954
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
L
Liu Jicong 已提交
955 956 957 958 959 960 961 962 963 964 965
  // init timer
  int8_t inited = atomic_val_compare_exchange_8(&tmqMgmt.inited, 0, 1);
  if (inited == 0) {
    tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
    if (tmqMgmt.timer == NULL) {
      atomic_store_8(&tmqMgmt.inited, 0);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
  }

L
Liu Jicong 已提交
966 967
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
L
Liu Jicong 已提交
968 969
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
L
Liu Jicong 已提交
970 971
    return NULL;
  }
L
Liu Jicong 已提交
972

L
Liu Jicong 已提交
973 974 975 976 977
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

  ASSERT(user);
  ASSERT(pass);
L
Liu Jicong 已提交
978
  ASSERT(conf->groupId[0]);
L
Liu Jicong 已提交
979

L
Liu Jicong 已提交
980 981 982 983 984 985
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
  pTmq->delayedTask = taosOpenQueue();

  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
L
Liu Jicong 已提交
986 987
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
L
Liu Jicong 已提交
988 989
    goto FAIL;
  }
L
Liu Jicong 已提交
990

L
Liu Jicong 已提交
991 992
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
993 994
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
995 996
  /*pTmq->epStatus = 0;*/
  /*pTmq->epSkipCnt = 0;*/
L
Liu Jicong 已提交
997

L
Liu Jicong 已提交
998 999 1000
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
1001
  pTmq->withTbName = conf->withTbName;
1002
  pTmq->useSnapshot = conf->ssEnable;
L
Liu Jicong 已提交
1003
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
1004
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
1005 1006
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
1007 1008
  pTmq->resetOffsetCfg = conf->resetOffset;

1009 1010
  pTmq->hbBgEnable = conf->hbBgEnable;

L
Liu Jicong 已提交
1011
  // assign consumerId
L
Liu Jicong 已提交
1012
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
1013

L
Liu Jicong 已提交
1014 1015
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
L
Liu Jicong 已提交
1016
    tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
L
Liu Jicong 已提交
1017 1018
    goto FAIL;
  }
L
Liu Jicong 已提交
1019

L
Liu Jicong 已提交
1020 1021 1022
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
L
Liu Jicong 已提交
1023
    tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
L
Liu Jicong 已提交
1024 1025 1026
    tsem_destroy(&pTmq->rspSem);
    goto FAIL;
  }
L
Liu Jicong 已提交
1027

1028 1029 1030 1031
  if (pTmq->hbBgEnable) {
    pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pTmq, tmqMgmt.timer);
  }

L
Liu Jicong 已提交
1032 1033
  tscInfo("consumer %ld is setup, consumer group %s", pTmq->consumerId, pTmq->groupId);

1034
  return pTmq;
L
Liu Jicong 已提交
1035 1036 1037 1038 1039 1040 1041 1042

FAIL:
  if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
  if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
  if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
  if (pTmq->qall) taosFreeQall(pTmq->qall);
  taosMemoryFree(pTmq);
  return NULL;
1043 1044
}

L
Liu Jicong 已提交
1045 1046
#if 0
int32_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
L
Liu Jicong 已提交
1047
  return tmqCommitInner2(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam);
L
Liu Jicong 已提交
1048
}
L
Liu Jicong 已提交
1049
#endif
L
Liu Jicong 已提交
1050

L
Liu Jicong 已提交
1051
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
L
Liu Jicong 已提交
1052 1053 1054 1055 1056
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
  SCMSubscribeReq req = {0};
  int32_t         code = -1;
1057 1058

  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
1059
  tstrncpy(req.clientId, tmq->clientId, 256);
L
Liu Jicong 已提交
1060
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
1061
  req.topicNames = taosArrayInit(sz, sizeof(void*));
L
Liu Jicong 已提交
1062
  if (req.topicNames == NULL) goto FAIL;
1063

L
Liu Jicong 已提交
1064 1065
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
1066 1067

    SName name = {0};
L
Liu Jicong 已提交
1068
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
1069

L
Liu Jicong 已提交
1070 1071 1072
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
1073
    }
L
Liu Jicong 已提交
1074
    tNameExtractFullName(&name, topicFName);
1075

L
Liu Jicong 已提交
1076 1077 1078
    tscDebug("subscribe topic: %s", topicFName);

    taosArrayPush(req.topicNames, &topicFName);
1079 1080
  }

L
Liu Jicong 已提交
1081 1082 1083 1084
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) goto FAIL;

1085 1086 1087
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

1088
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1089
  if (sendInfo == NULL) goto FAIL;
1090

X
Xiaoyu Wang 已提交
1091
  SMqSubscribeCbParam param = {
L
Liu Jicong 已提交
1092
      .rspErr = 0,
X
Xiaoyu Wang 已提交
1093 1094
      .tmq = tmq,
  };
L
Liu Jicong 已提交
1095

L
Liu Jicong 已提交
1096 1097 1098
  if (tsem_init(&param.rspSem, 0, 0) != 0) goto FAIL;

  sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1099 1100 1101 1102
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
1103

L
Liu Jicong 已提交
1104 1105
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1106 1107
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
1108 1109
  sendInfo->msgType = TDMT_MND_SUBSCRIBE;

1110 1111 1112 1113 1114
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
1115 1116 1117
  // avoid double free if msg is sent
  buf = NULL;

L
Liu Jicong 已提交
1118 1119
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
1120

L
Liu Jicong 已提交
1121 1122 1123
  code = param.rspErr;
  if (code != 0) goto FAIL;

L
Liu Jicong 已提交
1124
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
L
fix  
Liu Jicong 已提交
1125
    tscDebug("consumer not ready, retry");
L
Liu Jicong 已提交
1126 1127
    taosMsleep(500);
  }
1128

1129 1130 1131
  // init ep timer
  if (tmq->epTimer == NULL) {
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, tmq, tmqMgmt.timer);
1132
  }
L
Liu Jicong 已提交
1133 1134

  // init auto commit timer
1135
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
L
Liu Jicong 已提交
1136 1137 1138
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer);
  }

L
Liu Jicong 已提交
1139 1140 1141
  code = 0;
FAIL:
  if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
1142
  if (code != 0 && buf) {
L
Liu Jicong 已提交
1143 1144 1145
    taosMemoryFree(buf);
  }
  return code;
1146 1147
}

L
Liu Jicong 已提交
1148
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
1149
  //
1150
  conf->commitCb = cb;
L
Liu Jicong 已提交
1151
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
1152
}
1153

D
dapan1121 已提交
1154
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1155 1156
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
1157
  SMqClientTopic* pTopic = pParam->pTopic;
X
Xiaoyu Wang 已提交
1158
  tmq_t*          tmq = pParam->tmq;
L
Liu Jicong 已提交
1159 1160 1161
  int32_t         vgId = pParam->vgId;
  int32_t         epoch = pParam->epoch;
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
1162
  if (code != 0) {
S
Shengliang Guan 已提交
1163
    tscWarn("msg discard from vgId:%d, epoch %d, code:%x", vgId, epoch, code);
L
Liu Jicong 已提交
1164
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1165 1166 1167 1168
    if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
      if (pRspWrapper == NULL) {
        taosMemoryFree(pMsg->pData);
S
Shengliang Guan 已提交
1169
        tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
Liu Jicong 已提交
1170 1171 1172 1173 1174 1175 1176 1177
        goto CREATE_MSG_FAIL;
      }
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      /*pRspWrapper->vgHandle = pVg;*/
      /*pRspWrapper->topicHandle = pTopic;*/
      taosWriteQitem(tmq->mqueue, pRspWrapper);
      tsem_post(&tmq->rspSem);
    }
L
fix txn  
Liu Jicong 已提交
1178
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1179 1180
  }

X
Xiaoyu Wang 已提交
1181 1182 1183
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
1184
    // do not write into queue since updating epoch reset
S
Shengliang Guan 已提交
1185
    tscWarn("msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d", vgId, msgEpoch,
L
Liu Jicong 已提交
1186
            tmqEpoch);
1187
    tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1188
    taosMemoryFree(pMsg->pData);
X
Xiaoyu Wang 已提交
1189 1190 1191 1192
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
S
Shengliang Guan 已提交
1193
    tscWarn("mismatch rsp from vgId:%d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
X
Xiaoyu Wang 已提交
1194 1195
  }

L
Liu Jicong 已提交
1196 1197 1198
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

1199
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1200
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1201
    taosMemoryFree(pMsg->pData);
S
Shengliang Guan 已提交
1202
    tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
fix txn  
Liu Jicong 已提交
1203
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1204
  }
L
Liu Jicong 已提交
1205

L
Liu Jicong 已提交
1206
  pRspWrapper->tmqRspType = rspType;
L
Liu Jicong 已提交
1207 1208
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
1209

L
Liu Jicong 已提交
1210
  if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1211 1212 1213
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
L
Liu Jicong 已提交
1214
    memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1215 1216 1217
  } else {
    ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP);
    tDecodeSMqMetaRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->metaRsp);
L
Liu Jicong 已提交
1218
    memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1219
  }
L
Liu Jicong 已提交
1220

L
Liu Jicong 已提交
1221
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1222

S
Shengliang Guan 已提交
1223 1224 1225
  tscDebug("consumer:%" PRId64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d",
           tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version,
           rspType);
L
fix  
Liu Jicong 已提交
1226

L
Liu Jicong 已提交
1227
  taosWriteQitem(tmq->mqueue, pRspWrapper);
1228
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1229

L
Liu Jicong 已提交
1230
  return 0;
L
fix txn  
Liu Jicong 已提交
1231
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1232
  if (epoch == tmq->epoch) {
L
Liu Jicong 已提交
1233 1234
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
1235
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1236
  return -1;
1237 1238
}

1239 1240 1241 1242 1243
bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
  bool set = false;

  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
S
Shengliang Guan 已提交
1244
  tscDebug("consumer:%" PRId64 ", update ep epoch %d to epoch %d, topic num:%d", tmq->consumerId, tmq->epoch, epoch,
1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262
           topicNumGet);

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
S
Shengliang Guan 已提交
1263
      tscDebug("consumer:%" PRId64 ", new vg num: %d", tmq->consumerId, vgNumCur);
1264 1265 1266
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
        sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
L
Liu Jicong 已提交
1267 1268 1269 1270
        char buf[80];
        tFormatOffset(buf, 80, &pVgCur->currentOffsetNew);
        tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch,
                 pVgCur->vgId, vgKey, buf);
L
Liu Jicong 已提交
1271
        taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffsetNew, sizeof(STqOffsetVal));
1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
    topic.schema = pTopicEp->schema;
    topic.topicName = strdup(pTopicEp->topic);
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);

S
Shengliang Guan 已提交
1283
    tscDebug("consumer:%" PRId64 ", update topic: %s", tmq->consumerId, topic.topicName);
1284 1285 1286 1287 1288 1289

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
L
Liu Jicong 已提交
1290 1291
      STqOffsetVal* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      STqOffsetVal  offsetNew = {.type = tmq->resetOffsetCfg};
1292
      if (pOffset != NULL) {
L
Liu Jicong 已提交
1293
        offsetNew = *pOffset;
1294 1295 1296 1297
      }

      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1298
          .currentOffsetNew = offsetNew,
1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
          .vgSkipCnt = 0,
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
    taosArrayPush(newTopics, &topic);
  }
  if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
  taosHashCleanup(pHash);
  tmq->clientTopics = newTopics;

  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);

  atomic_store_32(&tmq->epoch, epoch);
  return set;
}

L
Liu Jicong 已提交
1322
#if 0
L
Liu Jicong 已提交
1323
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
L
Liu Jicong 已提交
1324
  /*printf("call update ep %d\n", epoch);*/
X
Xiaoyu Wang 已提交
1325
  bool    set = false;
L
Liu Jicong 已提交
1326 1327
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
S
Shengliang Guan 已提交
1328
  tscDebug("consumer:%" PRId64 ", update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch,
L
Liu Jicong 已提交
1329
           topicNumGet);
L
Liu Jicong 已提交
1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341
  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }
  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }

  // find topic, build hash
  for (int32_t i = 0; i < topicNumGet; i++) {
X
Xiaoyu Wang 已提交
1342 1343
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
L
Liu Jicong 已提交
1344
    topic.schema = pTopicEp->schema;
L
Liu Jicong 已提交
1345
    taosHashClear(pHash);
X
Xiaoyu Wang 已提交
1346
    topic.topicName = strdup(pTopicEp->topic);
L
Liu Jicong 已提交
1347
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1348

S
Shengliang Guan 已提交
1349
    tscDebug("consumer:%" PRId64 ", update topic: %s", tmq->consumerId, topic.topicName);
L
Liu Jicong 已提交
1350 1351 1352 1353 1354 1355
    int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
    for (int32_t j = 0; j < topicNumCur; j++) {
      // find old topic
      SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
      if (pTopicCur->vgs && strcmp(pTopicCur->topicName, pTopicEp->topic) == 0) {
        int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
S
Shengliang Guan 已提交
1356
        tscDebug("consumer:%" PRId64 ", new vg num: %d", tmq->consumerId, vgNumCur);
L
Liu Jicong 已提交
1357 1358 1359 1360
        if (vgNumCur == 0) break;
        for (int32_t k = 0; k < vgNumCur; k++) {
          SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k);
          sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId);
S
Shengliang Guan 已提交
1361
          tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d build %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey);
L
Liu Jicong 已提交
1362 1363 1364 1365 1366 1367 1368 1369 1370
          taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
        }
        break;
      }
    }

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
X
Xiaoyu Wang 已提交
1371
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
L
Liu Jicong 已提交
1372 1373 1374
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
      int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      int64_t  offset = pVgEp->offset;
S
Shengliang Guan 已提交
1375
      tscDebug("consumer:%" PRId64 ", (epoch %d) original offset of vgId:%d is %" PRId64, tmq->consumerId, epoch, pVgEp->vgId, offset);
L
Liu Jicong 已提交
1376 1377
      if (pOffset != NULL) {
        offset = *pOffset;
S
Shengliang Guan 已提交
1378
        tscDebug("consumer:%" PRId64 ", (epoch %d) receive offset of vgId:%d, full key is %s", tmq->consumerId, epoch, pVgEp->vgId,
1379
                 vgKey);
L
Liu Jicong 已提交
1380
      }
S
Shengliang Guan 已提交
1381
      tscDebug("consumer:%" PRId64 ", (epoch %d) offset of vgId:%d updated to %" PRId64, tmq->consumerId, epoch, pVgEp->vgId, offset);
X
Xiaoyu Wang 已提交
1382 1383
      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1384
          .currentOffset = offset,
X
Xiaoyu Wang 已提交
1385 1386 1387
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
L
Liu Jicong 已提交
1388
          .vgSkipCnt = 0,
X
Xiaoyu Wang 已提交
1389 1390 1391 1392
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
L
Liu Jicong 已提交
1393
    taosArrayPush(newTopics, &topic);
X
Xiaoyu Wang 已提交
1394
  }
L
Liu Jicong 已提交
1395
  if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
L
Liu Jicong 已提交
1396
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
1397
  tmq->clientTopics = newTopics;
1398

1399 1400 1401 1402
  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
1403

X
Xiaoyu Wang 已提交
1404 1405 1406
  atomic_store_32(&tmq->epoch, epoch);
  return set;
}
L
Liu Jicong 已提交
1407
#endif
X
Xiaoyu Wang 已提交
1408

D
dapan1121 已提交
1409
int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
1410
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1411
  tmq_t*           tmq = pParam->tmq;
L
Liu Jicong 已提交
1412
  int8_t           async = pParam->async;
L
Liu Jicong 已提交
1413
  pParam->code = code;
1414
  if (code != 0) {
S
Shengliang Guan 已提交
1415
    tscError("consumer:%" PRId64 ", get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->async);
L
Liu Jicong 已提交
1416
    goto END;
1417
  }
L
Liu Jicong 已提交
1418

L
Liu Jicong 已提交
1419
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1420
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1421
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1422 1423
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
S
Shengliang Guan 已提交
1424
  tscDebug("consumer:%" PRId64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1425 1426
  if (head->epoch <= epoch) {
    goto END;
1427
  }
L
Liu Jicong 已提交
1428

L
Liu Jicong 已提交
1429
  if (!async) {
L
Liu Jicong 已提交
1430 1431
    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
S
Shengliang Guan 已提交
1432 1433
    /*printf("rsp epoch %" PRId64 " sz %" PRId64 "\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %" PRId64 " sz %" PRId64 "\n", tmq->epoch, tmq->clientTopics->size);*/
L
Liu Jicong 已提交
1434
    tmqUpdateEp2(tmq, head->epoch, &rsp);
L
Liu Jicong 已提交
1435
    tDeleteSMqAskEpRsp(&rsp);
X
Xiaoyu Wang 已提交
1436
  } else {
1437
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1438
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1439
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1440 1441
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1442
    }
L
Liu Jicong 已提交
1443 1444 1445
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1446
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1447

L
Liu Jicong 已提交
1448
    taosWriteQitem(tmq->mqueue, pWrapper);
1449
    tsem_post(&tmq->rspSem);
1450
  }
L
Liu Jicong 已提交
1451 1452

END:
L
Liu Jicong 已提交
1453
  /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1454
  if (!async) {
L
Liu Jicong 已提交
1455
    tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
1456 1457
  } else {
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
1458 1459
  }
  return code;
1460 1461
}

L
Liu Jicong 已提交
1462
int32_t tmqAskEp(tmq_t* tmq, bool async) {
L
Liu Jicong 已提交
1463
  int32_t code = 0;
L
Liu Jicong 已提交
1464
#if 0
L
Liu Jicong 已提交
1465
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
L
fix txn  
Liu Jicong 已提交
1466
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
1467
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
S
Shengliang Guan 已提交
1468
    tscTrace("consumer:%" PRId64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
1469
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
1470
  }
L
temp  
Liu Jicong 已提交
1471
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
1472
#endif
L
Liu Jicong 已提交
1473
  int32_t      tlen = sizeof(SMqAskEpReq);
L
Liu Jicong 已提交
1474
  SMqAskEpReq* req = taosMemoryCalloc(1, tlen);
L
Liu Jicong 已提交
1475
  if (req == NULL) {
L
Liu Jicong 已提交
1476
    tscError("failed to malloc get subscribe ep buf");
L
Liu Jicong 已提交
1477
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1478
    return -1;
L
Liu Jicong 已提交
1479
  }
L
Liu Jicong 已提交
1480 1481 1482
  req->consumerId = htobe64(tmq->consumerId);
  req->epoch = htonl(tmq->epoch);
  strcpy(req->cgroup, tmq->groupId);
1483

L
Liu Jicong 已提交
1484
  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
1485 1486
  if (pParam == NULL) {
    tscError("failed to malloc subscribe param");
wafwerar's avatar
wafwerar 已提交
1487
    taosMemoryFree(req);
L
Liu Jicong 已提交
1488
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1489
    return -1;
L
Liu Jicong 已提交
1490 1491
  }
  pParam->tmq = tmq;
L
Liu Jicong 已提交
1492
  pParam->async = async;
X
Xiaoyu Wang 已提交
1493
  tsem_init(&pParam->rspSem, 0, 0);
1494

1495
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1496 1497
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1498 1499
    taosMemoryFree(pParam);
    taosMemoryFree(req);
L
Liu Jicong 已提交
1500
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1501 1502 1503 1504 1505 1506 1507 1508 1509 1510
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
      .pData = req,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
L
Liu Jicong 已提交
1511 1512 1513
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1514
  sendInfo->msgType = TDMT_MND_MQ_ASK_EP;
1515

L
Liu Jicong 已提交
1516 1517
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

S
Shengliang Guan 已提交
1518
  tscDebug("consumer:%" PRId64 ", ask ep", tmq->consumerId);
L
add log  
Liu Jicong 已提交
1519

L
Liu Jicong 已提交
1520 1521
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1522

L
Liu Jicong 已提交
1523
  if (!async) {
L
Liu Jicong 已提交
1524 1525 1526 1527 1528
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }
  return code;
1529 1530
}

L
Liu Jicong 已提交
1531 1532
#if 0
int32_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
L
Liu Jicong 已提交
1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545
  const SMqOffset* pOffset = &offset->offset;
  if (strcmp(pOffset->cgroup, tmq->groupId) != 0) {
    return TMQ_RESP_ERR__FAIL;
  }
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* clientTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(clientTopic->topicName, pOffset->topicName) == 0) {
      int32_t vgSz = taosArrayGetSize(clientTopic->vgs);
      for (int32_t j = 0; j < vgSz; j++) {
        SMqClientVg* pVg = taosArrayGet(clientTopic->vgs, j);
        if (pVg->vgId == pOffset->vgId) {
          pVg->currentOffset = pOffset->offset;
L
Liu Jicong 已提交
1546
          tmqClearUnhandleMsg(tmq);
L
Liu Jicong 已提交
1547 1548 1549 1550 1551 1552 1553
          return TMQ_RESP_ERR__SUCCESS;
        }
      }
    }
  }
  return TMQ_RESP_ERR__FAIL;
}
L
Liu Jicong 已提交
1554
#endif
L
Liu Jicong 已提交
1555

1556
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1557 1558 1559 1560 1561 1562 1563 1564 1565 1566
  /*int64_t reqOffset;*/
  /*if (pVg->currentOffset >= 0) {*/
  /*reqOffset = pVg->currentOffset;*/
  /*} else {*/
  /*if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {*/
  /*tscError("unable to poll since no committed offset but reset offset is set to none");*/
  /*return NULL;*/
  /*}*/
  /*reqOffset = tmq->resetOffsetCfg;*/
  /*}*/
L
Liu Jicong 已提交
1567

L
Liu Jicong 已提交
1568
  SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq));
1569 1570 1571
  if (pReq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
1572

L
Liu Jicong 已提交
1573 1574 1575
  /*strcpy(pReq->topic, pTopic->topicName);*/
  /*strcpy(pReq->cgroup, tmq->groupId);*/

L
Liu Jicong 已提交
1576 1577 1578 1579
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1580

1581
  pReq->withTbName = tmq->withTbName;
1582
  pReq->timeout = timeout;
L
Liu Jicong 已提交
1583
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1584
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1585 1586
  /*pReq->currentOffset = reqOffset;*/
  pReq->reqOffset = pVg->currentOffsetNew;
L
Liu Jicong 已提交
1587
  pReq->reqId = generateRequestId();
1588

L
Liu Jicong 已提交
1589 1590
  pReq->useSnapshot = tmq->useSnapshot;

1591
  pReq->head.vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
1592
  pReq->head.contLen = htonl(sizeof(SMqPollReq));
1593 1594 1595
  return pReq;
}

L
Liu Jicong 已提交
1596 1597
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
L
Liu Jicong 已提交
1598
  pRspObj->resType = RES_TYPE__TMQ_META;
L
Liu Jicong 已提交
1599 1600 1601 1602 1603 1604 1605 1606
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

L
Liu Jicong 已提交
1607 1608 1609
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1610 1611
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1612
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1613
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1614
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
L
Liu Jicong 已提交
1615

L
Liu Jicong 已提交
1616 1617
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
L
Liu Jicong 已提交
1618
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1619 1620
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1621

L
Liu Jicong 已提交
1622
  return pRspObj;
X
Xiaoyu Wang 已提交
1623 1624
}

1625
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1626
  /*tscDebug("call poll");*/
X
Xiaoyu Wang 已提交
1627 1628 1629 1630 1631 1632
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
      if (vgStatus != TMQ_VG_STATUS__IDLE) {
L
Liu Jicong 已提交
1633
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
L
Liu Jicong 已提交
1634 1635
        tscTrace("consumer:%" PRId64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId,
                 vgSkipCnt);
X
Xiaoyu Wang 已提交
1636
        continue;
L
Liu Jicong 已提交
1637
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1638 1639 1640 1641
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
S
Shengliang Guan 已提交
1642
        tscDebug("consumer:%" PRId64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId);
L
temp  
Liu Jicong 已提交
1643 1644
        }
#endif
X
Xiaoyu Wang 已提交
1645
      }
L
Liu Jicong 已提交
1646
      atomic_store_32(&pVg->vgSkipCnt, 0);
1647
      SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, timeout, pTopic, pVg);
X
Xiaoyu Wang 已提交
1648 1649
      if (pReq == NULL) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1650
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1651 1652
        return -1;
      }
wafwerar's avatar
wafwerar 已提交
1653
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1654
      if (pParam == NULL) {
wafwerar's avatar
wafwerar 已提交
1655
        taosMemoryFree(pReq);
X
Xiaoyu Wang 已提交
1656
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1657
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1658 1659
        return -1;
      }
L
Liu Jicong 已提交
1660 1661
      pParam->tmq = tmq;
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1662
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1663
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1664 1665
      pParam->epoch = tmq->epoch;

1666
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1667
      if (sendInfo == NULL) {
wafwerar's avatar
wafwerar 已提交
1668 1669
        taosMemoryFree(pReq);
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1670
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1671
        tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1672 1673 1674 1675
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1676
          .pData = pReq,
L
Liu Jicong 已提交
1677
          .len = sizeof(SMqPollReq),
X
Xiaoyu Wang 已提交
1678 1679
          .handle = NULL,
      };
L
Liu Jicong 已提交
1680
      sendInfo->requestId = pReq->reqId;
X
Xiaoyu Wang 已提交
1681
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1682
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1683
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1684
      sendInfo->msgType = TDMT_VND_CONSUME;
X
Xiaoyu Wang 已提交
1685 1686

      int64_t transporterId = 0;
L
fix  
Liu Jicong 已提交
1687
      /*printf("send poll\n");*/
L
Liu Jicong 已提交
1688

1689 1690
      char offsetFormatBuf[80];
      tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffsetNew);
L
Liu Jicong 已提交
1691 1692
      tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64,
               tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId);
S
Shengliang Guan 已提交
1693
      /*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/
X
Xiaoyu Wang 已提交
1694 1695 1696 1697 1698 1699 1700 1701
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
  return 0;
}

L
Liu Jicong 已提交
1702 1703
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1704
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1705 1706
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1707
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
L
Liu Jicong 已提交
1708
      tmqUpdateEp2(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1709
      /*tmqClearUnhandleMsg(tmq);*/
X
Xiaoyu Wang 已提交
1710 1711 1712 1713 1714 1715 1716 1717 1718 1719
      *pReset = true;
    } else {
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
1720
void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
X
Xiaoyu Wang 已提交
1721
  while (1) {
L
Liu Jicong 已提交
1722 1723 1724
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1725
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1726 1727
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
      if (rspWrapper == NULL) return NULL;
X
Xiaoyu Wang 已提交
1728 1729
    }

L
Liu Jicong 已提交
1730 1731 1732 1733 1734
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(rspWrapper);
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
      return NULL;
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1735
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1736
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
1737
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1738
      if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1739
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
L
Liu Jicong 已提交
1740 1741
        /*printf("vgId:%d offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
         * rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1742
        pVg->currentOffsetNew = pollRspWrapper->dataRsp.rspOffset;
X
Xiaoyu Wang 已提交
1743
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1744
        if (pollRspWrapper->dataRsp.blockNum == 0) {
L
Liu Jicong 已提交
1745 1746
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1747 1748
          continue;
        }
L
Liu Jicong 已提交
1749
        // build rsp
L
Liu Jicong 已提交
1750
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1751
        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1752
        return pRsp;
X
Xiaoyu Wang 已提交
1753
      } else {
L
Liu Jicong 已提交
1754 1755 1756 1757 1758 1759 1760
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
                 pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
        taosFreeQitem(pollRspWrapper);
      }
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1761
      if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1762
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
L
Liu Jicong 已提交
1763 1764
        /*printf("vgId:%d offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
         * rspMsg->msg.rspOffset);*/
wmmhello's avatar
wmmhello 已提交
1765 1766
        pVg->currentOffsetNew.version = pollRspWrapper->metaRsp.rspOffset;
        pVg->currentOffsetNew.type = TMQ_OFFSET__LOG;
L
Liu Jicong 已提交
1767 1768
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // build rsp
L
Liu Jicong 已提交
1769
        SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1770 1771 1772 1773
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
L
Liu Jicong 已提交
1774
                 pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1775
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1776 1777
      }
    } else {
L
fix  
Liu Jicong 已提交
1778
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1779
      bool reset = false;
L
Liu Jicong 已提交
1780 1781
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1782
      if (pollIfReset && reset) {
S
Shengliang Guan 已提交
1783
        tscDebug("consumer:%" PRId64 ", reset and repoll", tmq->consumerId);
1784
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
1785 1786 1787 1788 1789
      }
    }
  }
}

1790
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1791
  /*tscDebug("call poll1");*/
L
Liu Jicong 已提交
1792 1793
  void*   rspObj;
  int64_t startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1794

1795 1796 1797
#if 0
  tmqHandleAllDelayedTask(tmq);
  tmqPollImpl(tmq, timeout);
1798
  rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1799 1800
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1801
  }
1802
#endif
X
Xiaoyu Wang 已提交
1803

L
Liu Jicong 已提交
1804
  // in no topic status also need process delayed task
L
Liu Jicong 已提交
1805
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1806 1807 1808
    return NULL;
  }

X
Xiaoyu Wang 已提交
1809
  while (1) {
L
Liu Jicong 已提交
1810
    tmqHandleAllDelayedTask(tmq);
1811
    if (tmqPollImpl(tmq, timeout) < 0) return NULL;
L
Liu Jicong 已提交
1812

1813
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1814 1815
    if (rspObj) {
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
1816 1817
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      return NULL;
X
Xiaoyu Wang 已提交
1818
    }
1819
    if (timeout != -1) {
X
Xiaoyu Wang 已提交
1820
      int64_t endTime = taosGetTimestampMs();
1821
      int64_t leftTime = endTime - startTime;
1822
      if (leftTime > timeout) {
S
Shengliang Guan 已提交
1823
        tscDebug("consumer:%" PRId64 ", (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
X
Xiaoyu Wang 已提交
1824 1825
        return NULL;
      }
1826
      tsem_timewait(&tmq->rspSem, leftTime * 1000);
L
Liu Jicong 已提交
1827 1828 1829
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
      tsem_timewait(&tmq->rspSem, 500 * 1000);
X
Xiaoyu Wang 已提交
1830 1831 1832 1833
    }
  }
}

L
Liu Jicong 已提交
1834
int32_t tmq_consumer_close(tmq_t* tmq) {
1835
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
L
Liu Jicong 已提交
1836 1837
    int32_t rsp = tmq_commit_sync(tmq, NULL);
    if (rsp != 0) {
L
Liu Jicong 已提交
1838
      return rsp;
1839 1840 1841 1842
    }

    tmq_list_t* lst = tmq_list_new();
    rsp = tmq_subscribe(tmq, lst);
1843
    tmq_list_destroy(lst);
1844

L
Liu Jicong 已提交
1845
    if (rsp != 0) {
L
Liu Jicong 已提交
1846
      return rsp;
1847
    }
L
Liu Jicong 已提交
1848
  }
1849
  // TODO: free resources
L
Liu Jicong 已提交
1850
  return 0;
1851
}
L
Liu Jicong 已提交
1852

L
Liu Jicong 已提交
1853 1854
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
1855
    return "success";
L
Liu Jicong 已提交
1856
  } else if (err == -1) {
L
Liu Jicong 已提交
1857 1858 1859
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
1860 1861
  }
}
L
Liu Jicong 已提交
1862

L
Liu Jicong 已提交
1863 1864 1865 1866 1867 1868 1869 1870 1871 1872
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
    return TMQ_RES_TABLE_META;
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
1873
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
1874 1875
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
1876
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1877 1878 1879
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1880 1881 1882 1883 1884
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1885 1886 1887 1888
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1889 1890 1891
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1892 1893 1894 1895 1896
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1897 1898 1899 1900
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
1901 1902 1903
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
L
Liu Jicong 已提交
1904 1905 1906 1907
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
1908 1909 1910 1911 1912 1913 1914 1915

const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
1916
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
L
Liu Jicong 已提交
1917 1918 1919
  }
  return NULL;
}
1920

1921
tmq_raw_data* tmq_get_raw_meta(TAOS_RES* res) {
L
Liu Jicong 已提交
1922
  if (TD_RES_TMQ_META(res)) {
1923
    tmq_raw_data*  raw = taosMemoryCalloc(1, sizeof(tmq_raw_data));
L
Liu Jicong 已提交
1924
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
wmmhello's avatar
wmmhello 已提交
1925 1926 1927 1928
    raw->raw_meta = pMetaRspObj->metaRsp.metaRsp;
    raw->raw_meta_len = pMetaRspObj->metaRsp.metaRspLen;
    raw->raw_meta_type = pMetaRspObj->metaRsp.resMsgType;
    return raw;
L
Liu Jicong 已提交
1929
  }
wmmhello's avatar
wmmhello 已提交
1930
  return NULL;
L
Liu Jicong 已提交
1931 1932
}

1933 1934
static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id,
                                  int8_t t) {
wmmhello's avatar
wmmhello 已提交
1935 1936 1937 1938 1939 1940 1941
  char*  string = NULL;
  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    return string;
  }
  cJSON* type = cJSON_CreateString("create");
  cJSON_AddItemToObject(json, "type", type);
wmmhello's avatar
wmmhello 已提交
1942

1943 1944 1945 1946
  //  char uid[32] = {0};
  //  sprintf(uid, "%"PRIi64, id);
  //  cJSON* id_ = cJSON_CreateString(uid);
  //  cJSON_AddItemToObject(json, "id", id_);
wmmhello's avatar
wmmhello 已提交
1947 1948 1949 1950
  cJSON* tableName = cJSON_CreateString(name);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super");
  cJSON_AddItemToObject(json, "tableType", tableType);
1951 1952
  //  cJSON* version = cJSON_CreateNumber(1);
  //  cJSON_AddItemToObject(json, "version", version);
wmmhello's avatar
wmmhello 已提交
1953 1954

  cJSON* columns = cJSON_CreateArray();
1955 1956 1957 1958
  for (int i = 0; i < schemaRow->nCols; i++) {
    cJSON*   column = cJSON_CreateObject();
    SSchema* s = schemaRow->pSchema + i;
    cJSON*   cname = cJSON_CreateString(s->name);
wmmhello's avatar
wmmhello 已提交
1959 1960 1961
    cJSON_AddItemToObject(column, "name", cname);
    cJSON* ctype = cJSON_CreateNumber(s->type);
    cJSON_AddItemToObject(column, "type", ctype);
1962
    if (s->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
1963
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
1964
      cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1965
      cJSON_AddItemToObject(column, "length", cbytes);
1966 1967 1968
    } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
      cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1969 1970
      cJSON_AddItemToObject(column, "length", cbytes);
    }
wmmhello's avatar
wmmhello 已提交
1971 1972 1973 1974 1975
    cJSON_AddItemToArray(columns, column);
  }
  cJSON_AddItemToObject(json, "columns", columns);

  cJSON* tags = cJSON_CreateArray();
1976 1977 1978 1979
  for (int i = 0; schemaTag && i < schemaTag->nCols; i++) {
    cJSON*   tag = cJSON_CreateObject();
    SSchema* s = schemaTag->pSchema + i;
    cJSON*   tname = cJSON_CreateString(s->name);
wmmhello's avatar
wmmhello 已提交
1980 1981 1982
    cJSON_AddItemToObject(tag, "name", tname);
    cJSON* ttype = cJSON_CreateNumber(s->type);
    cJSON_AddItemToObject(tag, "type", ttype);
1983
    if (s->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
1984
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
1985
      cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1986
      cJSON_AddItemToObject(tag, "length", cbytes);
1987 1988 1989
    } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
      cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1990 1991
      cJSON_AddItemToObject(tag, "length", cbytes);
    }
wmmhello's avatar
wmmhello 已提交
1992 1993 1994 1995 1996 1997 1998 1999 2000
    cJSON_AddItemToArray(tags, tag);
  }
  cJSON_AddItemToObject(json, "tags", tags);

  string = cJSON_PrintUnformatted(json);
  cJSON_Delete(json);
  return string;
}

2001 2002 2003 2004
static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) {
  SMAlterStbReq req = {0};
  cJSON*        json = NULL;
  char*         string = NULL;
wmmhello's avatar
wmmhello 已提交
2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015

  if (tDeserializeSMAlterStbReq(alterData, alterDataLen, &req) != 0) {
    goto end;
  }

  json = cJSON_CreateObject();
  if (json == NULL) {
    goto end;
  }
  cJSON* type = cJSON_CreateString("alter");
  cJSON_AddItemToObject(json, "type", type);
2016 2017
  //  cJSON* uid = cJSON_CreateNumber(id);
  //  cJSON_AddItemToObject(json, "uid", uid);
wmmhello's avatar
wmmhello 已提交
2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029
  SName name = {0};
  tNameFromString(&name, req.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
  cJSON* tableName = cJSON_CreateString(name.tname);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString("super");
  cJSON_AddItemToObject(json, "tableType", tableType);

  cJSON* alterType = cJSON_CreateNumber(req.alterType);
  cJSON_AddItemToObject(json, "alterType", alterType);
  switch (req.alterType) {
    case TSDB_ALTER_TABLE_ADD_TAG:
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
2030 2031
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
      cJSON*      colName = cJSON_CreateString(field->name);
wmmhello's avatar
wmmhello 已提交
2032 2033 2034 2035
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colType = cJSON_CreateNumber(field->type);
      cJSON_AddItemToObject(json, "colType", colType);

2036
      if (field->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
2037
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
2038
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2039
        cJSON_AddItemToObject(json, "colLength", cbytes);
2040 2041 2042
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2043 2044 2045 2046 2047
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
      break;
    }
    case TSDB_ALTER_TABLE_DROP_TAG:
2048 2049 2050
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
      cJSON*      colName = cJSON_CreateString(field->name);
wmmhello's avatar
wmmhello 已提交
2051 2052 2053 2054
      cJSON_AddItemToObject(json, "colName", colName);
      break;
    }
    case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
2055 2056 2057
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
      cJSON*      colName = cJSON_CreateString(field->name);
wmmhello's avatar
wmmhello 已提交
2058 2059 2060
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colType = cJSON_CreateNumber(field->type);
      cJSON_AddItemToObject(json, "colType", colType);
2061
      if (field->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
2062
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
2063
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2064
        cJSON_AddItemToObject(json, "colLength", cbytes);
2065 2066 2067
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2068 2069 2070 2071 2072
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
      break;
    }
    case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
2073 2074 2075 2076
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
      TAOS_FIELD* oldField = taosArrayGet(req.pFields, 0);
      TAOS_FIELD* newField = taosArrayGet(req.pFields, 1);
      cJSON*      colName = cJSON_CreateString(oldField->name);
wmmhello's avatar
wmmhello 已提交
2077 2078 2079 2080 2081 2082 2083 2084 2085 2086
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colNewName = cJSON_CreateString(newField->name);
      cJSON_AddItemToObject(json, "colNewName", colNewName);
      break;
    }
    default:
      break;
  }
  string = cJSON_PrintUnformatted(json);

2087
end:
wmmhello's avatar
wmmhello 已提交
2088 2089 2090 2091 2092
  cJSON_Delete(json);
  tFreeSMAltertbReq(&req);
  return string;
}

2093
static char* processCreateStb(SMqMetaRsp* metaRsp) {
wmmhello's avatar
wmmhello 已提交
2094 2095
  SVCreateStbReq req = {0};
  SDecoder       coder;
2096
  char*          string = NULL;
wmmhello's avatar
wmmhello 已提交
2097 2098

  // decode and process req
2099
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2100 2101 2102 2103 2104 2105 2106 2107 2108 2109
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
    goto _err;
  }
  string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE);
  tDecoderClear(&coder);
  return string;

2110
_err:
wmmhello's avatar
wmmhello 已提交
2111 2112 2113 2114
  tDecoderClear(&coder);
  return string;
}

2115
static char* processAlterStb(SMqMetaRsp* metaRsp) {
wmmhello's avatar
wmmhello 已提交
2116 2117
  SVCreateStbReq req = {0};
  SDecoder       coder;
2118
  char*          string = NULL;
wmmhello's avatar
wmmhello 已提交
2119 2120

  // decode and process req
2121
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2122 2123 2124 2125 2126 2127 2128 2129 2130 2131
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
    goto _err;
  }
  string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen);
  tDecoderClear(&coder);
  return string;

2132
_err:
wmmhello's avatar
wmmhello 已提交
2133 2134 2135 2136
  tDecoderClear(&coder);
  return string;
}

2137 2138
static char* buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray* tagName, int64_t id) {
  char*   string = NULL;
wmmhello's avatar
wmmhello 已提交
2139
  SArray* pTagVals = NULL;
2140
  cJSON*  json = cJSON_CreateObject();
wmmhello's avatar
wmmhello 已提交
2141 2142 2143 2144 2145
  if (json == NULL) {
    return string;
  }
  cJSON* type = cJSON_CreateString("create");
  cJSON_AddItemToObject(json, "type", type);
2146 2147 2148 2149
  //  char cid[32] = {0};
  //  sprintf(cid, "%"PRIi64, id);
  //  cJSON* cid_ = cJSON_CreateString(cid);
  //  cJSON_AddItemToObject(json, "id", cid_);
wmmhello's avatar
wmmhello 已提交
2150

wmmhello's avatar
wmmhello 已提交
2151 2152 2153 2154
  cJSON* tableName = cJSON_CreateString(name);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString("child");
  cJSON_AddItemToObject(json, "tableType", tableType);
wmmhello's avatar
wmmhello 已提交
2155
  cJSON* using = cJSON_CreateString(sname);
wmmhello's avatar
wmmhello 已提交
2156
  cJSON_AddItemToObject(json, "using", using);
2157 2158
  //  cJSON* version = cJSON_CreateNumber(1);
  //  cJSON_AddItemToObject(json, "version", version);
wmmhello's avatar
wmmhello 已提交
2159

2160
  cJSON*  tags = cJSON_CreateArray();
wmmhello's avatar
wmmhello 已提交
2161 2162 2163 2164
  int32_t code = tTagToValArray(pTag, &pTagVals);
  if (code) {
    goto end;
  }
wmmhello's avatar
wmmhello 已提交
2165

wmmhello's avatar
wmmhello 已提交
2166 2167
  if (tTagIsJson(pTag)) {
    STag* p = (STag*)pTag;
2168
    if (p->nTag == 0) {
wmmhello's avatar
wmmhello 已提交
2169 2170
      goto end;
    }
2171 2172
    char*    pJson = parseTagDatatoJson(pTag);
    cJSON*   tag = cJSON_CreateObject();
wmmhello's avatar
wmmhello 已提交
2173 2174
    STagVal* pTagVal = taosArrayGet(pTagVals, 0);

wmmhello's avatar
wmmhello 已提交
2175 2176
    char*  ptname = taosArrayGet(tagName, 0);
    cJSON* tname = cJSON_CreateString(ptname);
wmmhello's avatar
wmmhello 已提交
2177
    cJSON_AddItemToObject(tag, "name", tname);
2178 2179
    //    cJSON* cid_ = cJSON_CreateString("");
    //    cJSON_AddItemToObject(tag, "cid", cid_);
wmmhello's avatar
wmmhello 已提交
2180 2181
    cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON);
    cJSON_AddItemToObject(tag, "type", ttype);
wmmhello's avatar
wmmhello 已提交
2182
    cJSON* tvalue = cJSON_CreateString(pJson);
wmmhello's avatar
wmmhello 已提交
2183 2184
    cJSON_AddItemToObject(tag, "value", tvalue);
    cJSON_AddItemToArray(tags, tag);
wmmhello's avatar
wmmhello 已提交
2185
    taosMemoryFree(pJson);
wmmhello's avatar
wmmhello 已提交
2186 2187 2188
    goto end;
  }

2189
  for (int i = 0; i < taosArrayGetSize(pTagVals); i++) {
wmmhello's avatar
wmmhello 已提交
2190 2191 2192
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);

    cJSON* tag = cJSON_CreateObject();
wmmhello's avatar
wmmhello 已提交
2193

wmmhello's avatar
wmmhello 已提交
2194 2195
    char*  ptname = taosArrayGet(tagName, i);
    cJSON* tname = cJSON_CreateString(ptname);
wmmhello's avatar
wmmhello 已提交
2196
    cJSON_AddItemToObject(tag, "name", tname);
2197 2198
    //    cJSON* cid = cJSON_CreateNumber(pTagVal->cid);
    //    cJSON_AddItemToObject(tag, "cid", cid);
wmmhello's avatar
wmmhello 已提交
2199 2200
    cJSON* ttype = cJSON_CreateNumber(pTagVal->type);
    cJSON_AddItemToObject(tag, "type", ttype);
wmmhello's avatar
wmmhello 已提交
2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212

    char* buf = NULL;
    if (IS_VAR_DATA_TYPE(pTagVal->type)) {
      buf = taosMemoryCalloc(pTagVal->nData + 1, 1);
      dataConverToStr(buf, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL);
    } else {
      buf = taosMemoryCalloc(32, 1);
      dataConverToStr(buf, pTagVal->type, &pTagVal->i64, tDataTypes[pTagVal->type].bytes, NULL);
    }

    cJSON* tvalue = cJSON_CreateString(buf);
    taosMemoryFree(buf);
wmmhello's avatar
wmmhello 已提交
2213 2214 2215
    cJSON_AddItemToObject(tag, "value", tvalue);
    cJSON_AddItemToArray(tags, tag);
  }
wmmhello's avatar
wmmhello 已提交
2216

2217
end:
wmmhello's avatar
wmmhello 已提交
2218 2219 2220
  cJSON_AddItemToObject(json, "tags", tags);
  string = cJSON_PrintUnformatted(json);
  cJSON_Delete(json);
wmmhello's avatar
wmmhello 已提交
2221
  taosArrayDestroy(pTagVals);
wmmhello's avatar
wmmhello 已提交
2222 2223 2224
  return string;
}

2225
static char* processCreateTable(SMqMetaRsp* metaRsp) {
wmmhello's avatar
wmmhello 已提交
2226 2227
  SDecoder           decoder = {0};
  SVCreateTbBatchReq req = {0};
2228 2229
  SVCreateTbReq*     pCreateReq;
  char*              string = NULL;
wmmhello's avatar
wmmhello 已提交
2230
  // decode
2231
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2232 2233 2234 2235 2236 2237 2238 2239 2240
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
    goto _exit;
  }

  // loop to create table
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;
2241 2242 2243 2244 2245 2246
    if (pCreateReq->type == TSDB_CHILD_TABLE) {
      string = buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.name, pCreateReq->name,
                                     pCreateReq->ctb.tagName, pCreateReq->uid);
    } else if (pCreateReq->type == TSDB_NORMAL_TABLE) {
      string =
          buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE);
wmmhello's avatar
wmmhello 已提交
2247 2248 2249 2250 2251
    }
  }

  tDecoderClear(&decoder);

2252
_exit:
wmmhello's avatar
wmmhello 已提交
2253 2254 2255 2256
  tDecoderClear(&decoder);
  return string;
}

2257 2258 2259 2260
static char* processAlterTable(SMqMetaRsp* metaRsp) {
  SDecoder     decoder = {0};
  SVAlterTbReq vAlterTbReq = {0};
  char*        string = NULL;
wmmhello's avatar
wmmhello 已提交
2261 2262

  // decode
2263
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVAlterTbReq(&decoder, &vAlterTbReq) < 0) {
    goto _exit;
  }

  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    goto _exit;
  }
  cJSON* type = cJSON_CreateString("alter");
  cJSON_AddItemToObject(json, "type", type);
2276 2277
  //  cJSON* uid = cJSON_CreateNumber(id);
  //  cJSON_AddItemToObject(json, "uid", uid);
wmmhello's avatar
wmmhello 已提交
2278 2279
  cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName);
  cJSON_AddItemToObject(json, "tableName", tableName);
wmmhello's avatar
wmmhello 已提交
2280
  cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ? "child" : "normal");
wmmhello's avatar
wmmhello 已提交
2281
  cJSON_AddItemToObject(json, "tableType", tableType);
wmmhello's avatar
wmmhello 已提交
2282 2283
  cJSON* alterType = cJSON_CreateNumber(vAlterTbReq.action);
  cJSON_AddItemToObject(json, "alterType", alterType);
wmmhello's avatar
wmmhello 已提交
2284 2285 2286 2287 2288 2289 2290

  switch (vAlterTbReq.action) {
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
      cJSON_AddItemToObject(json, "colType", colType);
wmmhello's avatar
wmmhello 已提交
2291

2292
      if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
2293
        int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
2294
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2295
        cJSON_AddItemToObject(json, "colLength", cbytes);
2296 2297 2298
      } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
        int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2299 2300
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
wmmhello's avatar
wmmhello 已提交
2301 2302
      break;
    }
2303
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
wmmhello's avatar
wmmhello 已提交
2304 2305 2306 2307
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
      break;
    }
2308
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
wmmhello's avatar
wmmhello 已提交
2309 2310
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
wmmhello's avatar
wmmhello 已提交
2311
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.colModType);
wmmhello's avatar
wmmhello 已提交
2312
      cJSON_AddItemToObject(json, "colType", colType);
2313
      if (vAlterTbReq.colModType == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
2314
        int32_t length = vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE;
2315
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2316
        cJSON_AddItemToObject(json, "colLength", cbytes);
2317 2318 2319
      } else if (vAlterTbReq.colModType == TSDB_DATA_TYPE_NCHAR) {
        int32_t length = (vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2320 2321
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
wmmhello's avatar
wmmhello 已提交
2322 2323
      break;
    }
2324
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
wmmhello's avatar
wmmhello 已提交
2325 2326 2327 2328 2329 2330
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colNewName = cJSON_CreateString(vAlterTbReq.colNewName);
      cJSON_AddItemToObject(json, "colNewName", colNewName);
      break;
    }
2331
    case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: {
wmmhello's avatar
wmmhello 已提交
2332 2333
      cJSON* tagName = cJSON_CreateString(vAlterTbReq.tagName);
      cJSON_AddItemToObject(json, "colName", tagName);
wmmhello's avatar
wmmhello 已提交
2334

wmmhello's avatar
wmmhello 已提交
2335
      bool isNull = vAlterTbReq.isNull;
2336 2337 2338
      if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
        STag* jsonTag = (STag*)vAlterTbReq.pTagVal;
        if (jsonTag->nTag == 0) isNull = true;
wmmhello's avatar
wmmhello 已提交
2339
      }
2340
      if (!isNull) {
wmmhello's avatar
wmmhello 已提交
2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355
        char* buf = NULL;

        if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
          ASSERT(tTagIsJson(vAlterTbReq.pTagVal) == true);
          buf = parseTagDatatoJson(vAlterTbReq.pTagVal);
        } else {
          buf = taosMemoryCalloc(vAlterTbReq.nTagVal + 1, 1);
          dataConverToStr(buf, vAlterTbReq.tagType, vAlterTbReq.pTagVal, vAlterTbReq.nTagVal, NULL);
        }

        cJSON* colValue = cJSON_CreateString(buf);
        cJSON_AddItemToObject(json, "colValue", colValue);
        taosMemoryFree(buf);
      }

wmmhello's avatar
wmmhello 已提交
2356 2357
      cJSON* isNullCJson = cJSON_CreateBool(isNull);
      cJSON_AddItemToObject(json, "colValueNull", isNullCJson);
wmmhello's avatar
wmmhello 已提交
2358 2359
      break;
    }
wmmhello's avatar
wmmhello 已提交
2360 2361 2362 2363 2364
    default:
      break;
  }
  string = cJSON_PrintUnformatted(json);

2365
_exit:
wmmhello's avatar
wmmhello 已提交
2366 2367 2368 2369
  tDecoderClear(&decoder);
  return string;
}

2370 2371 2372 2373
static char* processDropSTable(SMqMetaRsp* metaRsp) {
  SDecoder     decoder = {0};
  SVDropStbReq req = {0};
  char*        string = NULL;
wmmhello's avatar
wmmhello 已提交
2374 2375

  // decode
2376
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
    goto _exit;
  }

  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    goto _exit;
  }
  cJSON* type = cJSON_CreateString("drop");
  cJSON_AddItemToObject(json, "type", type);
  cJSON* tableName = cJSON_CreateString(req.name);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString("super");
  cJSON_AddItemToObject(json, "tableType", tableType);

  string = cJSON_PrintUnformatted(json);

2396
_exit:
wmmhello's avatar
wmmhello 已提交
2397 2398 2399 2400
  tDecoderClear(&decoder);
  return string;
}

2401 2402 2403 2404
static char* processDropTable(SMqMetaRsp* metaRsp) {
  SDecoder         decoder = {0};
  SVDropTbBatchReq req = {0};
  char*            string = NULL;
wmmhello's avatar
wmmhello 已提交
2405 2406

  // decode
2407
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVDropTbBatchReq(&decoder, &req) < 0) {
    goto _exit;
  }

  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    goto _exit;
  }
  cJSON* type = cJSON_CreateString("drop");
  cJSON_AddItemToObject(json, "type", type);
2420 2421 2422 2423
  //  cJSON* uid = cJSON_CreateNumber(id);
  //  cJSON_AddItemToObject(json, "uid", uid);
  //  cJSON* tableType = cJSON_CreateString("normal");
  //  cJSON_AddItemToObject(json, "tableType", tableType);
wmmhello's avatar
wmmhello 已提交
2424 2425 2426 2427 2428

  cJSON* tableNameList = cJSON_CreateArray();
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;

wmmhello's avatar
wmmhello 已提交
2429
    cJSON* tableName = cJSON_CreateString(pDropTbReq->name);
wmmhello's avatar
wmmhello 已提交
2430 2431 2432 2433 2434 2435
    cJSON_AddItemToArray(tableNameList, tableName);
  }
  cJSON_AddItemToObject(json, "tableNameList", tableNameList);

  string = cJSON_PrintUnformatted(json);

2436
_exit:
wmmhello's avatar
wmmhello 已提交
2437 2438 2439 2440
  tDecoderClear(&decoder);
  return string;
}

2441
char* tmq_get_json_meta(TAOS_RES* res) {
wmmhello's avatar
wmmhello 已提交
2442 2443 2444 2445 2446
  if (!TD_RES_TMQ_META(res)) {
    return NULL;
  }

  SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
2447
  if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB) {
wmmhello's avatar
wmmhello 已提交
2448
    return processCreateStb(&pMetaRspObj->metaRsp);
2449
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_STB) {
wmmhello's avatar
wmmhello 已提交
2450
    return processAlterStb(&pMetaRspObj->metaRsp);
2451
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_STB) {
wmmhello's avatar
wmmhello 已提交
2452
    return processDropSTable(&pMetaRspObj->metaRsp);
2453
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_TABLE) {
wmmhello's avatar
wmmhello 已提交
2454
    return processCreateTable(&pMetaRspObj->metaRsp);
2455
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_TABLE) {
wmmhello's avatar
wmmhello 已提交
2456
    return processAlterTable(&pMetaRspObj->metaRsp);
2457
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE) {
wmmhello's avatar
wmmhello 已提交
2458 2459 2460 2461 2462
    return processDropTable(&pMetaRspObj->metaRsp);
  }
  return NULL;
}

2463
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
wmmhello's avatar
wmmhello 已提交
2464

2465
static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
wmmhello's avatar
wmmhello 已提交
2466 2467 2468
  SVCreateStbReq req = {0};
  SDecoder       coder;
  SMCreateStbReq pReq = {0};
2469 2470
  int32_t        code = TSDB_CODE_SUCCESS;
  SRequestObj*   pRequest = NULL;
wmmhello's avatar
wmmhello 已提交
2471

2472
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2473 2474 2475 2476
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2477
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2478 2479 2480 2481
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2482
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2483 2484 2485 2486 2487 2488 2489 2490
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }
  // build create stable
  pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SField));
2491
  for (int32_t i = 0; i < req.schemaRow.nCols; i++) {
wmmhello's avatar
wmmhello 已提交
2492
    SSchema* pSchema = req.schemaRow.pSchema + i;
2493
    SField   field = {.type = pSchema->type, .bytes = pSchema->bytes};
wmmhello's avatar
wmmhello 已提交
2494 2495 2496 2497
    strcpy(field.name, pSchema->name);
    taosArrayPush(pReq.pColumns, &field);
  }
  pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField));
2498
  for (int32_t i = 0; i < req.schemaTag.nCols; i++) {
wmmhello's avatar
wmmhello 已提交
2499
    SSchema* pSchema = req.schemaTag.pSchema + i;
2500
    SField   field = {.type = pSchema->type, .bytes = pSchema->bytes};
wmmhello's avatar
wmmhello 已提交
2501 2502 2503
    strcpy(field.name, pSchema->name);
    taosArrayPush(pReq.pTags, &field);
  }
2504

2505 2506
  pReq.colVer = req.schemaRow.version;
  pReq.tagVer = req.schemaTag.version;
wmmhello's avatar
wmmhello 已提交
2507 2508 2509 2510
  pReq.numOfColumns = req.schemaRow.nCols;
  pReq.numOfTags = req.schemaTag.nCols;
  pReq.commentLen = -1;
  pReq.suid = req.suid;
wmmhello's avatar
wmmhello 已提交
2511
  pReq.source = TD_REQ_FROM_TAOX;
wmmhello's avatar
wmmhello 已提交
2512
  pReq.igExists = true;
wmmhello's avatar
wmmhello 已提交
2513

2514
  STscObj* pTscObj = pRequest->pTscObj;
2515
  SName    tableName;
wmmhello's avatar
wmmhello 已提交
2516
  tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
wmmhello's avatar
wmmhello 已提交
2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528

  SCmdMsgInfo pCmdMsg = {0};
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
  if (NULL == pCmdMsg.pMsg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);

2529
  SQuery pQuery = {0};
wmmhello's avatar
wmmhello 已提交
2530 2531 2532 2533 2534 2535 2536 2537 2538
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
  pQuery.pCmdMsg = &pCmdMsg;
  pQuery.msgType = pQuery.pCmdMsg->msgType;
  pQuery.stableQuery = true;

  launchQueryImpl(pRequest, &pQuery, true, NULL);
  code = pRequest->code;
  taosMemoryFree(pCmdMsg.pMsg);

2539
end:
wmmhello's avatar
wmmhello 已提交
2540 2541 2542 2543 2544 2545
  destroyRequest(pRequest);
  tFreeSMCreateStbReq(&pReq);
  tDecoderClear(&coder);
  return code;
}

2546
static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
wmmhello's avatar
wmmhello 已提交
2547 2548 2549
  SVDropStbReq req = {0};
  SDecoder     coder;
  SMDropStbReq pReq = {0};
2550
  int32_t      code = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2551 2552
  SRequestObj* pRequest = NULL;

2553
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2554 2555 2556 2557
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2558
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2559 2560 2561 2562
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2563
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2564 2565 2566 2567 2568 2569 2570 2571 2572
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVDropStbReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

  // build drop stable
  pReq.igNotExists = true;
wmmhello's avatar
wmmhello 已提交
2573
  pReq.source = TD_REQ_FROM_TAOX;
wmmhello's avatar
wmmhello 已提交
2574
  pReq.suid = req.suid;
2575 2576

  STscObj* pTscObj = pRequest->pTscObj;
2577
  SName    tableName;
wmmhello's avatar
wmmhello 已提交
2578
  tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
wmmhello's avatar
wmmhello 已提交
2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590

  SCmdMsgInfo pCmdMsg = {0};
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  pCmdMsg.msgType = TDMT_MND_DROP_STB;
  pCmdMsg.msgLen = tSerializeSMDropStbReq(NULL, 0, &pReq);
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
  if (NULL == pCmdMsg.pMsg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  tSerializeSMDropStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);

2591
  SQuery pQuery = {0};
wmmhello's avatar
wmmhello 已提交
2592 2593 2594 2595 2596 2597 2598 2599 2600
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
  pQuery.pCmdMsg = &pCmdMsg;
  pQuery.msgType = pQuery.pCmdMsg->msgType;
  pQuery.stableQuery = true;

  launchQueryImpl(pRequest, &pQuery, true, NULL);
  code = pRequest->code;
  taosMemoryFree(pCmdMsg.pMsg);

2601
end:
wmmhello's avatar
wmmhello 已提交
2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  return code;
}

typedef struct SVgroupCreateTableBatch {
  SVCreateTbBatchReq req;
  SVgroupInfo        info;
  char               dbName[TSDB_DB_NAME_LEN];
} SVgroupCreateTableBatch;

static void destroyCreateTbReqBatch(void* data) {
2614
  SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*)data;
wmmhello's avatar
wmmhello 已提交
2615 2616 2617
  taosArrayDestroy(pTbBatch->req.pArray);
}

L
Liu Jicong 已提交
2618 2619 2620 2621 2622 2623 2624
static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
  SVCreateTbBatchReq req = {0};
  SDecoder           coder = {0};
  int32_t            code = TSDB_CODE_SUCCESS;
  SRequestObj*       pRequest = NULL;
  SQuery*            pQuery = NULL;
  SHashObj*          pVgroupHashmap = NULL;
wmmhello's avatar
wmmhello 已提交
2625

L
Liu Jicong 已提交
2626
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2627 2628 2629 2630
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2631
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2632 2633 2634 2635
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2636
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2637 2638 2639 2640 2641 2642 2643
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

2644 2645
  STscObj* pTscObj = pRequest->pTscObj;

L
Liu Jicong 已提交
2646 2647
  SVCreateTbReq* pCreateReq = NULL;
  SCatalog*      pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
  if (NULL == pVgroupHashmap) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);

  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
2661 2662 2663
                           .requestId = pRequest->requestId,
                           .requestObjRefId = pRequest->self,
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
wmmhello's avatar
wmmhello 已提交
2664 2665 2666 2667 2668
  // loop to create table
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;

    SVgroupInfo pInfo = {0};
2669
    SName       pName;
wmmhello's avatar
wmmhello 已提交
2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700
    toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName);
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
    if (code != TSDB_CODE_SUCCESS) {
      goto end;
    }

    SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
    if (pTableBatch == NULL) {
      SVgroupCreateTableBatch tBatch = {0};
      tBatch.info = pInfo;
      strcpy(tBatch.dbName, pRequest->pDb);

      tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
      taosArrayPush(tBatch.req.pArray, pCreateReq);

      taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch));
    } else {  // add to the correct vgroup
      taosArrayPush(pTableBatch->req.pArray, pCreateReq);
    }
  }

  SArray* pBufArray = serializeVgroupsCreateTableBatch(pVgroupHashmap);
  if (NULL == pBufArray) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->msgType = TDMT_VND_CREATE_TABLE;
  pQuery->stableQuery = false;
2701
  pQuery->pRoot = nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT);
wmmhello's avatar
wmmhello 已提交
2702 2703 2704 2705 2706 2707 2708

  code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  launchQueryImpl(pRequest, pQuery, false, NULL);
2709 2710
  pQuery = NULL;  // no need to free in the end
  code = pRequest->code;
wmmhello's avatar
wmmhello 已提交
2711

2712
end:
wmmhello's avatar
wmmhello 已提交
2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730
  taosHashCleanup(pVgroupHashmap);
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  qDestroyQuery(pQuery);
  return code;
}

typedef struct SVgroupDropTableBatch {
  SVDropTbBatchReq req;
  SVgroupInfo      info;
  char             dbName[TSDB_DB_NAME_LEN];
} SVgroupDropTableBatch;

static void destroyDropTbReqBatch(void* data) {
  SVgroupDropTableBatch* pTbBatch = (SVgroupDropTableBatch*)data;
  taosArrayDestroy(pTbBatch->req.pArray);
}

L
Liu Jicong 已提交
2731 2732 2733 2734 2735 2736 2737
static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
  SVDropTbBatchReq req = {0};
  SDecoder         coder = {0};
  int32_t          code = TSDB_CODE_SUCCESS;
  SRequestObj*     pRequest = NULL;
  SQuery*          pQuery = NULL;
  SHashObj*        pVgroupHashmap = NULL;
wmmhello's avatar
wmmhello 已提交
2738

2739
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2740 2741 2742 2743
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2744
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2745 2746 2747 2748
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2749
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2750 2751 2752 2753 2754 2755 2756
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVDropTbBatchReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

2757 2758
  STscObj* pTscObj = pRequest->pTscObj;

L
Liu Jicong 已提交
2759 2760
  SVDropTbReq* pDropReq = NULL;
  SCatalog*    pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
  if (NULL == pVgroupHashmap) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);

  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
2774 2775 2776
                           .requestId = pRequest->requestId,
                           .requestObjRefId = pRequest->self,
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
wmmhello's avatar
wmmhello 已提交
2777 2778 2779
  // loop to create table
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pDropReq = req.pReqs + iReq;
wmmhello's avatar
wmmhello 已提交
2780
    pDropReq->igNotExists = true;
wmmhello's avatar
wmmhello 已提交
2781 2782

    SVgroupInfo pInfo = {0};
2783
    SName       pName;
wmmhello's avatar
wmmhello 已提交
2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811 2812
    toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName);
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
    if (code != TSDB_CODE_SUCCESS) {
      goto end;
    }

    SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
    if (pTableBatch == NULL) {
      SVgroupDropTableBatch tBatch = {0};
      tBatch.info = pInfo;
      tBatch.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
      taosArrayPush(tBatch.req.pArray, pDropReq);

      taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch));
    } else {  // add to the correct vgroup
      taosArrayPush(pTableBatch->req.pArray, pDropReq);
    }
  }

  SArray* pBufArray = serializeVgroupsDropTableBatch(pVgroupHashmap);
  if (NULL == pBufArray) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->msgType = TDMT_VND_DROP_TABLE;
  pQuery->stableQuery = false;
2813
  pQuery->pRoot = nodesMakeNode(QUERY_NODE_DROP_TABLE_STMT);
wmmhello's avatar
wmmhello 已提交
2814 2815 2816 2817 2818 2819 2820

  code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  launchQueryImpl(pRequest, pQuery, false, NULL);
2821 2822
  pQuery = NULL;  // no need to free in the end
  code = pRequest->code;
wmmhello's avatar
wmmhello 已提交
2823

2824
end:
wmmhello's avatar
wmmhello 已提交
2825 2826 2827 2828 2829 2830 2831
  taosHashCleanup(pVgroupHashmap);
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  qDestroyQuery(pQuery);
  return code;
}

2832 2833 2834 2835 2836 2837 2838 2839
static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
  SVAlterTbReq   req = {0};
  SDecoder       coder = {0};
  int32_t        code = TSDB_CODE_SUCCESS;
  SRequestObj*   pRequest = NULL;
  SQuery*        pQuery = NULL;
  SArray*        pArray = NULL;
  SVgDataBlocks* pVgData = NULL;
wmmhello's avatar
wmmhello 已提交
2840

2841
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2842 2843 2844 2845 2846

  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2847
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2848 2849 2850 2851
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2852
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2853 2854 2855 2856 2857 2858 2859 2860
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVAlterTbReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

  // do not deal TSDB_ALTER_TABLE_UPDATE_OPTIONS
2861
  if (req.action == TSDB_ALTER_TABLE_UPDATE_OPTIONS) {
wmmhello's avatar
wmmhello 已提交
2862 2863 2864
    goto end;
  }

2865
  STscObj*  pTscObj = pRequest->pTscObj;
2866
  SCatalog* pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2867 2868 2869 2870 2871 2872
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
2873 2874 2875
                           .requestId = pRequest->requestId,
                           .requestObjRefId = pRequest->self,
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
wmmhello's avatar
wmmhello 已提交
2876 2877

  SVgroupInfo pInfo = {0};
2878
  SName       pName = {0};
wmmhello's avatar
wmmhello 已提交
2879 2880 2881 2882 2883 2884 2885 2886 2887 2888 2889 2890 2891 2892 2893 2894 2895 2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911
  toName(pTscObj->acctId, pRequest->pDb, req.tbName, &pName);
  code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  pArray = taosArrayInit(1, sizeof(void*));
  if (NULL == pArray) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }

  pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
  if (NULL == pVgData) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  pVgData->vg = pInfo;
  pVgData->pData = taosMemoryMalloc(metaLen);
  if (NULL == pVgData->pData) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  memcpy(pVgData->pData, meta, metaLen);
  ((SMsgHead*)pVgData->pData)->vgId = htonl(pInfo.vgId);
  pVgData->size = metaLen;
  pVgData->numOfTables = 1;
  taosArrayPush(pArray, &pVgData);

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->msgType = TDMT_VND_ALTER_TABLE;
  pQuery->stableQuery = false;
2912
  pQuery->pRoot = nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT);
wmmhello's avatar
wmmhello 已提交
2913 2914 2915 2916 2917 2918 2919

  code = rewriteToVnodeModifyOpStmt(pQuery, pArray);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  launchQueryImpl(pRequest, pQuery, false, NULL);
2920
  pQuery = NULL;  // no need to free in the end
wmmhello's avatar
wmmhello 已提交
2921
  pVgData = NULL;
2922 2923 2924
  pArray = NULL;
  code = pRequest->code;
  if (code == TSDB_CODE_VND_TABLE_NOT_EXIST) {
wmmhello's avatar
wmmhello 已提交
2925 2926
    code = 0;
  }
wmmhello's avatar
wmmhello 已提交
2927

2928
end:
wmmhello's avatar
wmmhello 已提交
2929
  taosArrayDestroy(pArray);
2930
  if (pVgData) taosMemoryFreeClear(pVgData->pData);
wmmhello's avatar
wmmhello 已提交
2931 2932 2933 2934 2935 2936 2937
  taosMemoryFreeClear(pVgData);
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  qDestroyQuery(pQuery);
  return code;
}

2938
int32_t taos_write_raw_meta(TAOS* taos, tmq_raw_data* raw_meta) {
wmmhello's avatar
wmmhello 已提交
2939 2940 2941 2942
  if (!taos || !raw_meta) {
    return TSDB_CODE_INVALID_PARA;
  }

2943
  if (raw_meta->raw_meta_type == TDMT_VND_CREATE_STB) {
wmmhello's avatar
wmmhello 已提交
2944
    return taosCreateStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
2945
  } else if (raw_meta->raw_meta_type == TDMT_VND_ALTER_STB) {
wmmhello's avatar
wmmhello 已提交
2946
    return taosCreateStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
2947
  } else if (raw_meta->raw_meta_type == TDMT_VND_DROP_STB) {
wmmhello's avatar
wmmhello 已提交
2948
    return taosDropStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
2949
  } else if (raw_meta->raw_meta_type == TDMT_VND_CREATE_TABLE) {
wmmhello's avatar
wmmhello 已提交
2950
    return taosCreateTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
2951
  } else if (raw_meta->raw_meta_type == TDMT_VND_ALTER_TABLE) {
wmmhello's avatar
wmmhello 已提交
2952
    return taosAlterTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
2953
  } else if (raw_meta->raw_meta_type == TDMT_VND_DROP_TABLE) {
wmmhello's avatar
wmmhello 已提交
2954 2955 2956 2957 2958
    return taosDropTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
  }
  return TSDB_CODE_INVALID_PARA;
}

2959 2960
void tmq_free_raw_meta(tmq_raw_data* rawMeta) {
  //
wmmhello's avatar
wmmhello 已提交
2961 2962 2963
  taosMemoryFreeClear(rawMeta);
}

L
Liu Jicong 已提交
2964
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
2965
  //
L
Liu Jicong 已提交
2966
  tmqCommitInner2(tmq, msg, 0, 1, cb, param);
L
Liu Jicong 已提交
2967 2968
}

2969 2970 2971 2972
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) {
  //
  return tmqCommitInner2(tmq, msg, 0, 0, NULL, NULL);
}