tmq.c 90.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "cJSON.h"
17 18 19
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22 23
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
24
#include "tqueue.h"
25
#include "tref.h"
L
Liu Jicong 已提交
26 27
#include "ttimer.h"

L
Liu Jicong 已提交
28 29 30 31 32 33 34 35
int32_t tmqAskEp(tmq_t* tmq, bool async);

typedef struct {
  int8_t inited;
  tmr_h  timer;
} SMqMgmt;

static SMqMgmt tmqMgmt = {0};
36

L
Liu Jicong 已提交
37 38 39 40 41 42
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
43 44 45
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
46 47
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
48
struct tmq_list_t {
L
Liu Jicong 已提交
49
  SArray container;
L
Liu Jicong 已提交
50
};
L
Liu Jicong 已提交
51

L
Liu Jicong 已提交
52
struct tmq_conf_t {
53 54 55 56 57 58 59 60 61 62
  char    clientId[256];
  char    groupId[TSDB_CGROUP_LEN];
  int8_t  autoCommit;
  int8_t  resetOffset;
  int8_t  withTbName;
  int8_t  ssEnable;
  int32_t ssBatchSize;

  bool hbBgEnable;

63 64 65 66 67
  uint16_t       port;
  int32_t        autoCommitInterval;
  char*          ip;
  char*          user;
  char*          pass;
68
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
69
  void*          commitCbUserParam;
L
Liu Jicong 已提交
70 71 72
};

struct tmq_t {
L
Liu Jicong 已提交
73
  // conf
74 75 76 77 78 79 80 81 82 83 84
  char    groupId[TSDB_CGROUP_LEN];
  char    clientId[256];
  int8_t  withTbName;
  int8_t  useSnapshot;
  int8_t  autoCommit;
  int32_t autoCommitInterval;
  int32_t resetOffsetCfg;
  int64_t consumerId;

  bool hbBgEnable;

L
Liu Jicong 已提交
85 86
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
87 88 89 90

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
91 92
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
93
  int32_t epSkipCnt;
L
Liu Jicong 已提交
94
#endif
L
Liu Jicong 已提交
95 96
  int64_t pollCnt;

L
Liu Jicong 已提交
97
  // timer
98 99
  tmr_h hbLiveTimer;
  tmr_h epTimer;
L
Liu Jicong 已提交
100 101 102
  tmr_h reportTimer;
  tmr_h commitTimer;

L
Liu Jicong 已提交
103 104 105 106
  // connection
  STscObj* pTscObj;

  // container
L
Liu Jicong 已提交
107
  SArray*     clientTopics;  // SArray<SMqClientTopic>
L
Liu Jicong 已提交
108
  STaosQueue* mqueue;        // queue of rsp
L
Liu Jicong 已提交
109
  STaosQall*  qall;
L
Liu Jicong 已提交
110 111 112 113
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit

  // ctl
  tsem_t rspSem;
L
Liu Jicong 已提交
114 115
};

L
Liu Jicong 已提交
116 117
struct tmq_raw_data {
  void*   raw_meta;
wmmhello's avatar
wmmhello 已提交
118 119 120 121
  int32_t raw_meta_len;
  int16_t raw_meta_type;
};

X
Xiaoyu Wang 已提交
122 123 124 125 126 127 128 129
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
130
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
131 132
};

L
Liu Jicong 已提交
133
enum {
134
  TMQ_DELAYED_TASK__ASK_EP = 1,
L
Liu Jicong 已提交
135 136 137 138
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

L
Liu Jicong 已提交
139
typedef struct {
140 141 142
  // statistics
  int64_t pollCnt;
  // offset
L
Liu Jicong 已提交
143 144 145 146
  /*int64_t      committedOffset;*/
  /*int64_t      currentOffset;*/
  STqOffsetVal committedOffsetNew;
  STqOffsetVal currentOffsetNew;
L
Liu Jicong 已提交
147
  // connection info
148
  int32_t vgId;
X
Xiaoyu Wang 已提交
149
  int32_t vgStatus;
L
Liu Jicong 已提交
150
  int32_t vgSkipCnt;
151 152 153
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
154
typedef struct {
155
  // subscribe info
L
Liu Jicong 已提交
156
  char* topicName;
L
Liu Jicong 已提交
157
  char  db[TSDB_DB_FNAME_LEN];
L
Liu Jicong 已提交
158 159 160

  SArray* vgs;  // SArray<SMqClientVg>

L
Liu Jicong 已提交
161 162
  int8_t         isSchemaAdaptive;
  SSchemaWrapper schema;
163 164
} SMqClientTopic;

L
Liu Jicong 已提交
165 166 167 168 169
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
170
  union {
L
Liu Jicong 已提交
171 172
    SMqDataRsp dataRsp;
    SMqMetaRsp metaRsp;
L
Liu Jicong 已提交
173
  };
L
Liu Jicong 已提交
174 175
} SMqPollRspWrapper;

L
Liu Jicong 已提交
176
typedef struct {
L
Liu Jicong 已提交
177 178 179
  tmq_t*  tmq;
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
180
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
181

L
Liu Jicong 已提交
182
typedef struct {
183
  tmq_t*  tmq;
L
Liu Jicong 已提交
184
  int32_t code;
L
Liu Jicong 已提交
185
  int32_t async;
X
Xiaoyu Wang 已提交
186
  tsem_t  rspSem;
187 188
} SMqAskEpCbParam;

L
Liu Jicong 已提交
189
typedef struct {
L
Liu Jicong 已提交
190 191
  tmq_t*          tmq;
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
192
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
193
  int32_t         epoch;
L
Liu Jicong 已提交
194
  int32_t         vgId;
L
Liu Jicong 已提交
195
  tsem_t          rspSem;
X
Xiaoyu Wang 已提交
196
} SMqPollCbParam;
197

L
Liu Jicong 已提交
198
#if 0
L
Liu Jicong 已提交
199
typedef struct {
L
Liu Jicong 已提交
200
  tmq_t*         tmq;
L
Liu Jicong 已提交
201 202
  int8_t         async;
  int8_t         automatic;
L
Liu Jicong 已提交
203
  int8_t         freeOffsets;
L
Liu Jicong 已提交
204
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
205
  tsem_t         rspSem;
L
Liu Jicong 已提交
206
  int32_t        rspErr;
L
Liu Jicong 已提交
207
  SArray*        offsets;
L
Liu Jicong 已提交
208
  void*          userParam;
L
Liu Jicong 已提交
209
} SMqCommitCbParam;
L
Liu Jicong 已提交
210
#endif
L
Liu Jicong 已提交
211

212
typedef struct {
L
Liu Jicong 已提交
213 214 215 216
  tmq_t* tmq;
  int8_t automatic;
  int8_t async;
  /*int8_t         freeOffsets;*/
L
Liu Jicong 已提交
217 218
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
L
Liu Jicong 已提交
219
  int32_t        rspErr;
220
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
221 222 223 224
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
  void*  userParam;
  tsem_t rspSem;
225 226 227 228 229 230 231
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
  STqOffset*           pOffset;
} SMqCommitCbParam2;

232
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
233
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
234
  conf->withTbName = false;
L
Liu Jicong 已提交
235
  conf->autoCommit = true;
L
Liu Jicong 已提交
236
  conf->autoCommitInterval = 5000;
X
Xiaoyu Wang 已提交
237
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
238 239 240
  return conf;
}

L
Liu Jicong 已提交
241
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
242 243 244 245 246 247
  if (conf) {
    if (conf->ip) taosMemoryFree(conf->ip);
    if (conf->user) taosMemoryFree(conf->user);
    if (conf->pass) taosMemoryFree(conf->pass);
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
248 249 250
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
251 252
  if (strcmp(key, "group.id") == 0) {
    strcpy(conf->groupId, value);
L
Liu Jicong 已提交
253
    return TMQ_CONF_OK;
254
  }
L
Liu Jicong 已提交
255

256 257
  if (strcmp(key, "client.id") == 0) {
    strcpy(conf->clientId, value);
L
Liu Jicong 已提交
258 259
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
260

L
Liu Jicong 已提交
261 262
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
263
      conf->autoCommit = true;
L
Liu Jicong 已提交
264 265
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
266
      conf->autoCommit = false;
L
Liu Jicong 已提交
267 268 269 270
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
271
  }
L
Liu Jicong 已提交
272

L
Liu Jicong 已提交
273 274 275 276 277
  if (strcmp(key, "auto.commit.interval.ms") == 0) {
    conf->autoCommitInterval = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
278 279 280 281 282 283 284 285 286 287 288 289 290 291
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
292

293 294
  if (strcmp(key, "msg.with.table.name") == 0) {
    if (strcmp(value, "true") == 0) {
295
      conf->withTbName = true;
L
Liu Jicong 已提交
296
      return TMQ_CONF_OK;
297
    } else if (strcmp(value, "false") == 0) {
298
      conf->withTbName = false;
L
Liu Jicong 已提交
299
      return TMQ_CONF_OK;
300 301 302 303 304
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
305
  if (strcmp(key, "experimental.snapshot.enable") == 0) {
L
Liu Jicong 已提交
306
    if (strcmp(value, "true") == 0) {
307 308 309 310 311 312 313 314 315 316 317 318 319
      conf->ssEnable = true;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
      conf->ssEnable = false;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

  if (strcmp(key, "enable.heartbeat.background") == 0) {
    if (strcmp(value, "true") == 0) {
      conf->hbBgEnable = true;
L
Liu Jicong 已提交
320 321
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
322
      conf->hbBgEnable = false;
L
Liu Jicong 已提交
323 324 325 326
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
327
    return TMQ_CONF_OK;
L
Liu Jicong 已提交
328 329
  }

L
Liu Jicong 已提交
330
  if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
331
    conf->ssBatchSize = atoi(value);
L
Liu Jicong 已提交
332 333 334
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
335
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
336 337 338
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
339
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
340 341 342
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
343
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
344 345 346
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
347
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
348 349 350
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
351
  if (strcmp(key, "td.connect.db") == 0) {
352
    /*conf->db = strdup(value);*/
L
Liu Jicong 已提交
353 354 355
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
356
  return TMQ_CONF_UNKNOWN;
357 358 359
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
360 361
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
362 363
}

L
Liu Jicong 已提交
364 365
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
366
  char*   topic = strdup(src);
L
fix  
Liu Jicong 已提交
367
  if (taosArrayPush(container, &topic) == NULL) return -1;
368 369 370
  return 0;
}

L
Liu Jicong 已提交
371
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
372
  SArray* container = &list->container;
L
Liu Jicong 已提交
373
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
374 375
}

L
Liu Jicong 已提交
376 377 378 379 380 381 382 383 384 385
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

L
Liu Jicong 已提交
386 387 388 389
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

L
Liu Jicong 已提交
390
#if 0
L
Liu Jicong 已提交
391 392
int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
  SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
L
Liu Jicong 已提交
393
  pParam->rspErr = code;
L
Liu Jicong 已提交
394 395
  if (pParam->async) {
    if (pParam->automatic && pParam->tmq->commitCb) {
L
Liu Jicong 已提交
396
      pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, pParam->tmq->commitCbUserParam);
L
Liu Jicong 已提交
397
    } else if (!pParam->automatic && pParam->userCb) {
L
Liu Jicong 已提交
398
      pParam->userCb(pParam->tmq, pParam->rspErr, pParam->userParam);
L
Liu Jicong 已提交
399 400
    }

L
Liu Jicong 已提交
401
    if (pParam->freeOffsets) {
L
Liu Jicong 已提交
402 403 404 405 406 407 408 409 410
      taosArrayDestroy(pParam->offsets);
    }

    taosMemoryFree(pParam);
  } else {
    tsem_post(&pParam->rspSem);
  }
  return 0;
}
L
Liu Jicong 已提交
411
#endif
L
Liu Jicong 已提交
412

D
dapan1121 已提交
413
int32_t tmqCommitCb2(void* param, SDataBuf* pBuf, int32_t code) {
414 415 416
  SMqCommitCbParam2*   pParam = (SMqCommitCbParam2*)param;
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
  // push into array
L
Liu Jicong 已提交
417
#if 0
418 419 420 421 422
  if (code == 0) {
    taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset);
  } else {
    taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
  }
L
Liu Jicong 已提交
423
#endif
L
Liu Jicong 已提交
424

S
Shengliang Guan 已提交
425
  /*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
L
Liu Jicong 已提交
426 427
   * pOffset->version);*/

428
  // count down waiting rsp
L
Liu Jicong 已提交
429
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
430 431 432 433 434 435 436
  ASSERT(waitingRspNum >= 0);

  if (waitingRspNum == 0) {
    // if no more waiting rsp
    if (pParamSet->async) {
      // call async cb func
      if (pParamSet->automatic && pParamSet->tmq->commitCb) {
L
Liu Jicong 已提交
437
        pParamSet->tmq->commitCb(pParamSet->tmq, pParamSet->rspErr, pParamSet->tmq->commitCbUserParam);
438 439
      } else if (!pParamSet->automatic && pParamSet->userCb) {
        // sem post
L
Liu Jicong 已提交
440
        pParamSet->userCb(pParamSet->tmq, pParamSet->rspErr, pParamSet->userParam);
441
      }
L
Liu Jicong 已提交
442 443
    } else {
      tsem_post(&pParamSet->rspSem);
444 445
    }

L
Liu Jicong 已提交
446
#if 0
447 448
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
L
Liu Jicong 已提交
449
#endif
450 451 452 453
  }
  return 0;
}

L
Liu Jicong 已提交
454 455 456 457 458 459 460
static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pTopic, SMqCommitCbParamSet* pParamSet) {
  STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
  if (pOffset == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pOffset->val = pVg->currentOffsetNew;
461

L
Liu Jicong 已提交
462 463 464 465
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pOffset->subKey, tmq->groupId, groupLen);
  pOffset->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);
L
Liu Jicong 已提交
466

L
Liu Jicong 已提交
467 468 469 470 471 472 473 474 475 476
  int32_t len;
  int32_t code;
  tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
  if (code < 0) {
    ASSERT(0);
    return -1;
  }
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
  if (buf == NULL) return -1;
  ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
477

L
Liu Jicong 已提交
478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
  tEncodeSTqOffset(&encoder, pOffset);

  // build param
  SMqCommitCbParam2* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam2));
  pParam->params = pParamSet;
  pParam->pOffset = pOffset;

  // build send info
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
    return -1;
  }
  pMsgSendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = sizeof(SMsgHead) + len,
      .handle = NULL,
  };

S
Shengliang Guan 已提交
500 501
  tscDebug("consumer:%" PRId64 ", commit offset of %s on vgId:%d, offset is %" PRId64, tmq->consumerId, pOffset->subKey,
           pVg->vgId, pOffset->val.version);
L
Liu Jicong 已提交
502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548

  // TODO: put into cb
  pVg->committedOffsetNew = pVg->currentOffsetNew;

  pMsgSendInfo->requestId = generateRequestId();
  pMsgSendInfo->requestObjRefId = 0;
  pMsgSendInfo->param = pParam;
  pMsgSendInfo->fp = tmqCommitCb2;
  pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
  // send msg

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
  pParamSet->waitingRspNum++;
  pParamSet->totalRspNum++;
  return 0;
}

int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
  char*   topic;
  int32_t vgId;
  ASSERT(msg != NULL);
  if (TD_RES_TMQ(msg)) {
    SMqRspObj* pRspObj = (SMqRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
  } else if (TD_RES_TMQ_META(msg)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
    topic = pMetaRspObj->topic;
    vgId = pMetaRspObj->vgId;
  } else {
    return TSDB_CODE_TMQ_INVALID_MSG;
  }

  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pParamSet->tmq = tmq;
  pParamSet->automatic = 0;
  pParamSet->async = async;
  /*pParamSet->freeOffsets = 1;*/
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

L
Liu Jicong 已提交
549 550
  int32_t code = -1;

L
Liu Jicong 已提交
551 552 553 554 555 556 557 558 559 560
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(pTopic->topicName, topic) != 0) continue;
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      if (pVg->vgId != vgId) continue;

      if (pVg->currentOffsetNew.type > 0 && !tOffsetEqual(&pVg->currentOffsetNew, &pVg->committedOffsetNew)) {
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          goto FAIL;
L
Liu Jicong 已提交
561
        }
L
Liu Jicong 已提交
562
        goto HANDLE_RSP;
L
Liu Jicong 已提交
563 564
      }
    }
L
Liu Jicong 已提交
565
  }
L
Liu Jicong 已提交
566

L
Liu Jicong 已提交
567 568 569 570
HANDLE_RSP:
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
571 572 573
    return 0;
  }

L
Liu Jicong 已提交
574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
    return code;
  } else {
    code = 0;
  }

FAIL:
  if (code != 0 && async) {
    userCb(tmq, code, userParam);
  }
  return 0;
}

int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                        void* userParam) {
  int32_t code = -1;

  if (msg != NULL) {
    return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
  }

598 599 600 601 602 603 604 605
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pParamSet->tmq = tmq;
  pParamSet->automatic = automatic;
  pParamSet->async = async;
L
Liu Jicong 已提交
606
  /*pParamSet->freeOffsets = 1;*/
607 608 609 610 611 612
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
613

S
Shengliang Guan 已提交
614
    tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgNum %d", tmq->consumerId, pTopic->topicName,
L
Liu Jicong 已提交
615 616
             (int32_t)taosArrayGetSize(pTopic->vgs));

617
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
L
Liu Jicong 已提交
618 619
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);

S
Shengliang Guan 已提交
620 621
      tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgId:%d", tmq->consumerId, pTopic->topicName,
               pVg->vgId);
L
Liu Jicong 已提交
622

L
Liu Jicong 已提交
623 624 625 626
      if (pVg->currentOffsetNew.type > 0 && !tOffsetEqual(&pVg->currentOffsetNew, &pVg->committedOffsetNew)) {
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          continue;
        }
627 628 629 630
      }
    }
  }

L
Liu Jicong 已提交
631 632 633 634 635 636
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
    return 0;
  }

637 638 639 640 641 642 643 644 645 646
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
  } else {
    code = 0;
  }

  if (code != 0 && async) {
    if (automatic) {
L
Liu Jicong 已提交
647
      tmq->commitCb(tmq, code, tmq->commitCbUserParam);
648
    } else {
L
Liu Jicong 已提交
649
      userCb(tmq, code, userParam);
650 651 652
    }
  }

L
Liu Jicong 已提交
653
#if 0
654 655 656 657
  if (!async) {
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
  }
L
Liu Jicong 已提交
658
#endif
659 660 661 662

  return 0;
}

L
Liu Jicong 已提交
663 664
#if 0
int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async,
L
Liu Jicong 已提交
665 666 667 668 669 670
                       tmq_commit_cb* userCb, void* userParam) {
  SMqCMCommitOffsetReq req;
  SArray*              pOffsets = NULL;
  void*                buf = NULL;
  SMqCommitCbParam*    pParam = NULL;
  SMsgSendInfo*        sendInfo = NULL;
L
Liu Jicong 已提交
671 672
  int8_t               freeOffsets;
  int32_t              code = -1;
L
Liu Jicong 已提交
673

L
Liu Jicong 已提交
674
  if (msg == NULL) {
L
Liu Jicong 已提交
675
    freeOffsets = 1;
L
Liu Jicong 已提交
676 677 678 679 680 681 682 683 684 685 686 687 688 689
    pOffsets = taosArrayInit(0, sizeof(SMqOffset));
    for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
      for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
        SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
        SMqOffset    offset;
        tstrncpy(offset.topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
        tstrncpy(offset.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
        offset.vgId = pVg->vgId;
        offset.offset = pVg->currentOffset;
        taosArrayPush(pOffsets, &offset);
      }
    }
  } else {
L
Liu Jicong 已提交
690
    freeOffsets = 0;
L
Liu Jicong 已提交
691
    pOffsets = (SArray*)&msg->container;
L
Liu Jicong 已提交
692 693 694 695 696
  }

  req.num = (int32_t)pOffsets->size;
  req.offsets = pOffsets->pData;

L
Liu Jicong 已提交
697 698 699 700
  SEncoder encoder;

  tEncoderInit(&encoder, NULL, 0);
  code = tEncodeSMqCMCommitOffsetReq(&encoder, &req);
L
Liu Jicong 已提交
701 702 703
  if (code < 0) {
    goto END;
  }
L
Liu Jicong 已提交
704
  int32_t tlen = encoder.pos;
L
Liu Jicong 已提交
705 706
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) {
L
Liu Jicong 已提交
707
    tEncoderClear(&encoder);
L
Liu Jicong 已提交
708 709
    goto END;
  }
L
Liu Jicong 已提交
710 711
  tEncoderClear(&encoder);

L
Liu Jicong 已提交
712 713 714 715 716 717 718 719 720 721 722 723
  tEncoderInit(&encoder, buf, tlen);
  tEncodeSMqCMCommitOffsetReq(&encoder, &req);
  tEncoderClear(&encoder);

  pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
  if (pParam == NULL) {
    goto END;
  }
  pParam->tmq = tmq;
  pParam->automatic = automatic;
  pParam->async = async;
  pParam->offsets = pOffsets;
L
Liu Jicong 已提交
724
  pParam->freeOffsets = freeOffsets;
L
Liu Jicong 已提交
725 726 727 728
  pParam->userCb = userCb;
  pParam->userParam = userParam;
  if (!async) tsem_init(&pParam->rspSem, 0, 0);

729
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752
  if (sendInfo == NULL) goto END;
  sendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqCommitCb;
  sendInfo->msgType = TDMT_MND_MQ_COMMIT_OFFSET;

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

  if (!async) {
    tsem_wait(&pParam->rspSem);
    code = pParam->rspErr;
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
753 754
  } else {
    code = 0;
L
Liu Jicong 已提交
755 756 757 758 759 760 761
  }

  // avoid double free if msg is sent
  buf = NULL;

END:
  if (buf) taosMemoryFree(buf);
L
Liu Jicong 已提交
762 763
  /*if (pParam) taosMemoryFree(pParam);*/
  /*if (sendInfo) taosMemoryFree(sendInfo);*/
L
Liu Jicong 已提交
764 765 766

  if (code != 0 && async) {
    if (automatic) {
L
Liu Jicong 已提交
767
      tmq->commitCb(tmq, code, (tmq_topic_vgroup_list_t*)pOffsets, tmq->commitCbUserParam);
L
Liu Jicong 已提交
768
    } else {
L
Liu Jicong 已提交
769
      userCb(tmq, code, (tmq_topic_vgroup_list_t*)pOffsets, userParam);
L
Liu Jicong 已提交
770 771 772
    }
  }

L
Liu Jicong 已提交
773
  if (!async && freeOffsets) {
L
Liu Jicong 已提交
774 775 776 777
    taosArrayDestroy(pOffsets);
  }
  return code;
}
L
Liu Jicong 已提交
778
#endif
L
Liu Jicong 已提交
779

780
void tmqAssignAskEpTask(void* param, void* tmrId) {
L
Liu Jicong 已提交
781
  tmq_t*  tmq = (tmq_t*)param;
782
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
783
  *pTaskType = TMQ_DELAYED_TASK__ASK_EP;
L
Liu Jicong 已提交
784
  taosWriteQitem(tmq->delayedTask, pTaskType);
785
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
786 787 788 789
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
790
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
791 792
  *pTaskType = TMQ_DELAYED_TASK__COMMIT;
  taosWriteQitem(tmq->delayedTask, pTaskType);
793
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
794 795 796 797
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
798
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
799 800
  *pTaskType = TMQ_DELAYED_TASK__REPORT;
  taosWriteQitem(tmq->delayedTask, pTaskType);
801
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
802 803
}

804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
  if (pMsg && pMsg->pData) taosMemoryFree(pMsg->pData);
  return 0;
}

void tmqSendHbReq(void* param, void* tmrId) {
  // TODO replace with ref
  tmq_t*    tmq = (tmq_t*)param;
  int64_t   consumerId = tmq->consumerId;
  int32_t   epoch = tmq->epoch;
  SMqHbReq* pReq = taosMemoryMalloc(sizeof(SMqHbReq));
  if (pReq == NULL) goto OVER;
  pReq->consumerId = consumerId;
  pReq->epoch = epoch;

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pReq);
  }
  sendInfo->msgInfo = (SDataBuf){
      .pData = pReq,
      .len = sizeof(SMqHbReq),
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = NULL;
  sendInfo->fp = tmqHbCb;
  sendInfo->msgType = TDMT_MND_MQ_HB;

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

OVER:
  taosTmrReset(tmqSendHbReq, 1000, tmq, tmqMgmt.timer, &tmq->hbLiveTimer);
}

L
Liu Jicong 已提交
844 845 846 847 848 849 850 851
int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
  STaosQall* qall = taosAllocateQall();
  taosReadAllQitems(tmq->delayedTask, qall);
  while (1) {
    int8_t* pTaskType = NULL;
    taosGetQitem(qall, (void**)&pTaskType);
    if (pTaskType == NULL) break;

852
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
L
Liu Jicong 已提交
853
      tmqAskEp(tmq, true);
854
      taosTmrReset(tmqAssignAskEpTask, 1000, tmq, tmqMgmt.timer, &tmq->epTimer);
L
Liu Jicong 已提交
855
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
L
Liu Jicong 已提交
856
      tmqCommitInner2(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
L
Liu Jicong 已提交
857 858 859 860 861
      taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer);
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    } else {
      ASSERT(0);
    }
L
Liu Jicong 已提交
862
    taosFreeQitem(pTaskType);
L
Liu Jicong 已提交
863 864 865 866 867
  }
  taosFreeQall(qall);
  return 0;
}

L
Liu Jicong 已提交
868
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
869
  SMqRspWrapper* msg = NULL;
L
Liu Jicong 已提交
870 871 872 873 874 875 876 877
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }

L
fix  
Liu Jicong 已提交
878
  msg = NULL;
L
Liu Jicong 已提交
879 880 881 882 883 884 885 886 887 888
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }
}

D
dapan1121 已提交
889
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
890 891
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
L
Liu Jicong 已提交
892
  /*tmq_t* tmq = pParam->tmq;*/
L
Liu Jicong 已提交
893 894 895
  tsem_post(&pParam->rspSem);
  return 0;
}
896

L
Liu Jicong 已提交
897
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
X
Xiaoyu Wang 已提交
898 899 900 901
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
902
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
903
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
904
  }
L
Liu Jicong 已提交
905
  return 0;
X
Xiaoyu Wang 已提交
906 907
}

L
Liu Jicong 已提交
908 909 910
int32_t tmq_unsubscribe(tmq_t* tmq) {
  tmq_list_t* lst = tmq_list_new();
  int32_t     rsp = tmq_subscribe(tmq, lst);
L
Liu Jicong 已提交
911 912
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
913 914
}

L
Liu Jicong 已提交
915
#if 0
916
tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
wafwerar's avatar
wafwerar 已提交
917
  tmq_t* pTmq = taosMemoryCalloc(sizeof(tmq_t), 1);
918 919 920 921 922 923
  if (pTmq == NULL) {
    return NULL;
  }
  pTmq->pTscObj = (STscObj*)conn;
  pTmq->status = 0;
  pTmq->pollCnt = 0;
L
Liu Jicong 已提交
924
  pTmq->epoch = 0;
L
fix txn  
Liu Jicong 已提交
925
  pTmq->epStatus = 0;
L
temp  
Liu Jicong 已提交
926
  pTmq->epSkipCnt = 0;
L
Liu Jicong 已提交
927
  // set conf
928 929
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
L
Liu Jicong 已提交
930
  pTmq->autoCommit = conf->autoCommit;
931
  pTmq->commit_cb = conf->commit_cb;
L
Liu Jicong 已提交
932
  pTmq->resetOffsetCfg = conf->resetOffset;
L
Liu Jicong 已提交
933

L
Liu Jicong 已提交
934 935 936 937 938 939 940 941 942 943
  pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  if (pTmq->clientTopics == NULL) {
    taosMemoryFree(pTmq);
    return NULL;
  }

  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();

L
Liu Jicong 已提交
944 945
  tsem_init(&pTmq->rspSem, 0, 0);

L
Liu Jicong 已提交
946 947
  return pTmq;
}
L
Liu Jicong 已提交
948
#endif
L
Liu Jicong 已提交
949

L
Liu Jicong 已提交
950
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
L
Liu Jicong 已提交
951 952 953 954 955 956 957 958 959 960 961
  // init timer
  int8_t inited = atomic_val_compare_exchange_8(&tmqMgmt.inited, 0, 1);
  if (inited == 0) {
    tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
    if (tmqMgmt.timer == NULL) {
      atomic_store_8(&tmqMgmt.inited, 0);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
  }

L
Liu Jicong 已提交
962 963
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
L
Liu Jicong 已提交
964 965
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
L
Liu Jicong 已提交
966 967
    return NULL;
  }
L
Liu Jicong 已提交
968

L
Liu Jicong 已提交
969 970 971 972 973
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

  ASSERT(user);
  ASSERT(pass);
L
Liu Jicong 已提交
974
  ASSERT(conf->groupId[0]);
L
Liu Jicong 已提交
975

L
Liu Jicong 已提交
976 977 978 979 980 981
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
  pTmq->delayedTask = taosOpenQueue();

  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
L
Liu Jicong 已提交
982 983
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
L
Liu Jicong 已提交
984 985
    goto FAIL;
  }
L
Liu Jicong 已提交
986

L
Liu Jicong 已提交
987 988
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
989 990
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
991 992
  /*pTmq->epStatus = 0;*/
  /*pTmq->epSkipCnt = 0;*/
L
Liu Jicong 已提交
993

L
Liu Jicong 已提交
994 995 996
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
997
  pTmq->withTbName = conf->withTbName;
998
  pTmq->useSnapshot = conf->ssEnable;
L
Liu Jicong 已提交
999
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
1000
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
1001 1002
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
1003 1004
  pTmq->resetOffsetCfg = conf->resetOffset;

1005 1006
  pTmq->hbBgEnable = conf->hbBgEnable;

L
Liu Jicong 已提交
1007
  // assign consumerId
L
Liu Jicong 已提交
1008
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
1009

L
Liu Jicong 已提交
1010 1011
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
L
Liu Jicong 已提交
1012
    tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
L
Liu Jicong 已提交
1013 1014
    goto FAIL;
  }
L
Liu Jicong 已提交
1015

L
Liu Jicong 已提交
1016 1017 1018
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
L
Liu Jicong 已提交
1019
    tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
L
Liu Jicong 已提交
1020 1021 1022
    tsem_destroy(&pTmq->rspSem);
    goto FAIL;
  }
L
Liu Jicong 已提交
1023

1024 1025 1026 1027
  if (pTmq->hbBgEnable) {
    pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pTmq, tmqMgmt.timer);
  }

L
Liu Jicong 已提交
1028 1029
  tscInfo("consumer %ld is setup, consumer group %s", pTmq->consumerId, pTmq->groupId);

1030
  return pTmq;
L
Liu Jicong 已提交
1031 1032 1033 1034 1035 1036 1037 1038

FAIL:
  if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
  if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
  if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
  if (pTmq->qall) taosFreeQall(pTmq->qall);
  taosMemoryFree(pTmq);
  return NULL;
1039 1040
}

L
Liu Jicong 已提交
1041 1042
#if 0
int32_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
L
Liu Jicong 已提交
1043
  return tmqCommitInner2(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam);
L
Liu Jicong 已提交
1044
}
L
Liu Jicong 已提交
1045
#endif
L
Liu Jicong 已提交
1046

L
Liu Jicong 已提交
1047
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
L
Liu Jicong 已提交
1048 1049 1050 1051 1052
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
  SCMSubscribeReq req = {0};
  int32_t         code = -1;
1053 1054

  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
1055
  tstrncpy(req.clientId, tmq->clientId, 256);
L
Liu Jicong 已提交
1056
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
1057
  req.topicNames = taosArrayInit(sz, sizeof(void*));
L
Liu Jicong 已提交
1058
  if (req.topicNames == NULL) goto FAIL;
1059

L
Liu Jicong 已提交
1060 1061
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
1062 1063

    SName name = {0};
L
Liu Jicong 已提交
1064
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
1065

L
Liu Jicong 已提交
1066 1067 1068
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
1069
    }
L
Liu Jicong 已提交
1070
    tNameExtractFullName(&name, topicFName);
1071

L
Liu Jicong 已提交
1072 1073 1074
    tscDebug("subscribe topic: %s", topicFName);

    taosArrayPush(req.topicNames, &topicFName);
1075 1076
  }

L
Liu Jicong 已提交
1077 1078 1079 1080
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) goto FAIL;

1081 1082 1083
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

1084
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1085
  if (sendInfo == NULL) goto FAIL;
1086

X
Xiaoyu Wang 已提交
1087
  SMqSubscribeCbParam param = {
L
Liu Jicong 已提交
1088
      .rspErr = 0,
X
Xiaoyu Wang 已提交
1089 1090
      .tmq = tmq,
  };
L
Liu Jicong 已提交
1091

L
Liu Jicong 已提交
1092 1093 1094
  if (tsem_init(&param.rspSem, 0, 0) != 0) goto FAIL;

  sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1095 1096 1097 1098
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
1099

L
Liu Jicong 已提交
1100 1101
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1102 1103
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
1104 1105
  sendInfo->msgType = TDMT_MND_SUBSCRIBE;

1106 1107 1108 1109 1110
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
1111 1112 1113
  // avoid double free if msg is sent
  buf = NULL;

L
Liu Jicong 已提交
1114 1115
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
1116

L
Liu Jicong 已提交
1117 1118 1119
  code = param.rspErr;
  if (code != 0) goto FAIL;

L
Liu Jicong 已提交
1120
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
L
fix  
Liu Jicong 已提交
1121
    tscDebug("consumer not ready, retry");
L
Liu Jicong 已提交
1122 1123
    taosMsleep(500);
  }
1124

1125 1126 1127
  // init ep timer
  if (tmq->epTimer == NULL) {
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, tmq, tmqMgmt.timer);
1128
  }
L
Liu Jicong 已提交
1129 1130

  // init auto commit timer
1131
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
L
Liu Jicong 已提交
1132 1133 1134
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer);
  }

L
Liu Jicong 已提交
1135 1136 1137
  code = 0;
FAIL:
  if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
1138
  if (code != 0 && buf) {
L
Liu Jicong 已提交
1139 1140 1141
    taosMemoryFree(buf);
  }
  return code;
1142 1143
}

L
Liu Jicong 已提交
1144
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
1145
  //
1146
  conf->commitCb = cb;
L
Liu Jicong 已提交
1147
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
1148
}
1149

D
dapan1121 已提交
1150
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1151 1152
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
1153
  SMqClientTopic* pTopic = pParam->pTopic;
X
Xiaoyu Wang 已提交
1154
  tmq_t*          tmq = pParam->tmq;
L
Liu Jicong 已提交
1155 1156 1157
  int32_t         vgId = pParam->vgId;
  int32_t         epoch = pParam->epoch;
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
1158
  if (code != 0) {
S
Shengliang Guan 已提交
1159
    tscWarn("msg discard from vgId:%d, epoch %d, code:%x", vgId, epoch, code);
L
Liu Jicong 已提交
1160
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1161 1162 1163 1164
    if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
      if (pRspWrapper == NULL) {
        taosMemoryFree(pMsg->pData);
S
Shengliang Guan 已提交
1165
        tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
Liu Jicong 已提交
1166 1167 1168 1169 1170 1171 1172 1173
        goto CREATE_MSG_FAIL;
      }
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      /*pRspWrapper->vgHandle = pVg;*/
      /*pRspWrapper->topicHandle = pTopic;*/
      taosWriteQitem(tmq->mqueue, pRspWrapper);
      tsem_post(&tmq->rspSem);
    }
L
fix txn  
Liu Jicong 已提交
1174
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1175 1176
  }

X
Xiaoyu Wang 已提交
1177 1178 1179
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
1180
    // do not write into queue since updating epoch reset
S
Shengliang Guan 已提交
1181
    tscWarn("msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d", vgId, msgEpoch,
L
Liu Jicong 已提交
1182
            tmqEpoch);
1183
    tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1184
    taosMemoryFree(pMsg->pData);
X
Xiaoyu Wang 已提交
1185 1186 1187 1188
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
S
Shengliang Guan 已提交
1189
    tscWarn("mismatch rsp from vgId:%d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
X
Xiaoyu Wang 已提交
1190 1191
  }

L
Liu Jicong 已提交
1192 1193 1194
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

1195
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1196
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1197
    taosMemoryFree(pMsg->pData);
S
Shengliang Guan 已提交
1198
    tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
fix txn  
Liu Jicong 已提交
1199
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1200
  }
L
Liu Jicong 已提交
1201

L
Liu Jicong 已提交
1202
  pRspWrapper->tmqRspType = rspType;
L
Liu Jicong 已提交
1203 1204
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
1205

L
Liu Jicong 已提交
1206
  if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1207 1208 1209
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
L
Liu Jicong 已提交
1210
    memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1211 1212 1213
  } else {
    ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP);
    tDecodeSMqMetaRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->metaRsp);
L
Liu Jicong 已提交
1214
    memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1215
  }
L
Liu Jicong 已提交
1216

L
Liu Jicong 已提交
1217
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1218

S
Shengliang Guan 已提交
1219 1220 1221
  tscDebug("consumer:%" PRId64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d",
           tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version,
           rspType);
L
fix  
Liu Jicong 已提交
1222

L
Liu Jicong 已提交
1223
  taosWriteQitem(tmq->mqueue, pRspWrapper);
1224
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1225

L
Liu Jicong 已提交
1226
  return 0;
L
fix txn  
Liu Jicong 已提交
1227
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1228
  if (epoch == tmq->epoch) {
L
Liu Jicong 已提交
1229 1230
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
1231
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1232
  return -1;
1233 1234
}

1235 1236 1237 1238 1239
bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
  bool set = false;

  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
S
Shengliang Guan 已提交
1240
  tscDebug("consumer:%" PRId64 ", update ep epoch %d to epoch %d, topic num:%d", tmq->consumerId, tmq->epoch, epoch,
1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258
           topicNumGet);

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
S
Shengliang Guan 已提交
1259
      tscDebug("consumer:%" PRId64 ", new vg num: %d", tmq->consumerId, vgNumCur);
1260 1261 1262
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
        sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
L
Liu Jicong 已提交
1263 1264 1265 1266
        char buf[80];
        tFormatOffset(buf, 80, &pVgCur->currentOffsetNew);
        tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch,
                 pVgCur->vgId, vgKey, buf);
L
Liu Jicong 已提交
1267
        taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffsetNew, sizeof(STqOffsetVal));
1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
    topic.schema = pTopicEp->schema;
    topic.topicName = strdup(pTopicEp->topic);
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);

S
Shengliang Guan 已提交
1279
    tscDebug("consumer:%" PRId64 ", update topic: %s", tmq->consumerId, topic.topicName);
1280 1281 1282 1283 1284 1285

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
L
Liu Jicong 已提交
1286 1287
      STqOffsetVal* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      STqOffsetVal  offsetNew = {.type = tmq->resetOffsetCfg};
1288
      if (pOffset != NULL) {
L
Liu Jicong 已提交
1289
        offsetNew = *pOffset;
1290 1291 1292 1293
      }

      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1294
          .currentOffsetNew = offsetNew,
1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
          .vgSkipCnt = 0,
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
    taosArrayPush(newTopics, &topic);
  }
  if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
  taosHashCleanup(pHash);
  tmq->clientTopics = newTopics;

  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);

  atomic_store_32(&tmq->epoch, epoch);
  return set;
}

L
Liu Jicong 已提交
1318
#if 0
L
Liu Jicong 已提交
1319
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
L
Liu Jicong 已提交
1320
  /*printf("call update ep %d\n", epoch);*/
X
Xiaoyu Wang 已提交
1321
  bool    set = false;
L
Liu Jicong 已提交
1322 1323
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
S
Shengliang Guan 已提交
1324
  tscDebug("consumer:%" PRId64 ", update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch,
L
Liu Jicong 已提交
1325
           topicNumGet);
L
Liu Jicong 已提交
1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337
  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }
  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }

  // find topic, build hash
  for (int32_t i = 0; i < topicNumGet; i++) {
X
Xiaoyu Wang 已提交
1338 1339
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
L
Liu Jicong 已提交
1340
    topic.schema = pTopicEp->schema;
L
Liu Jicong 已提交
1341
    taosHashClear(pHash);
X
Xiaoyu Wang 已提交
1342
    topic.topicName = strdup(pTopicEp->topic);
L
Liu Jicong 已提交
1343
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1344

S
Shengliang Guan 已提交
1345
    tscDebug("consumer:%" PRId64 ", update topic: %s", tmq->consumerId, topic.topicName);
L
Liu Jicong 已提交
1346 1347 1348 1349 1350 1351
    int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
    for (int32_t j = 0; j < topicNumCur; j++) {
      // find old topic
      SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
      if (pTopicCur->vgs && strcmp(pTopicCur->topicName, pTopicEp->topic) == 0) {
        int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
S
Shengliang Guan 已提交
1352
        tscDebug("consumer:%" PRId64 ", new vg num: %d", tmq->consumerId, vgNumCur);
L
Liu Jicong 已提交
1353 1354 1355 1356
        if (vgNumCur == 0) break;
        for (int32_t k = 0; k < vgNumCur; k++) {
          SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k);
          sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId);
S
Shengliang Guan 已提交
1357
          tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d build %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey);
L
Liu Jicong 已提交
1358 1359 1360 1361 1362 1363 1364 1365 1366
          taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
        }
        break;
      }
    }

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
X
Xiaoyu Wang 已提交
1367
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
L
Liu Jicong 已提交
1368 1369 1370
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
      int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      int64_t  offset = pVgEp->offset;
S
Shengliang Guan 已提交
1371
      tscDebug("consumer:%" PRId64 ", (epoch %d) original offset of vgId:%d is %" PRId64, tmq->consumerId, epoch, pVgEp->vgId, offset);
L
Liu Jicong 已提交
1372 1373
      if (pOffset != NULL) {
        offset = *pOffset;
S
Shengliang Guan 已提交
1374
        tscDebug("consumer:%" PRId64 ", (epoch %d) receive offset of vgId:%d, full key is %s", tmq->consumerId, epoch, pVgEp->vgId,
1375
                 vgKey);
L
Liu Jicong 已提交
1376
      }
S
Shengliang Guan 已提交
1377
      tscDebug("consumer:%" PRId64 ", (epoch %d) offset of vgId:%d updated to %" PRId64, tmq->consumerId, epoch, pVgEp->vgId, offset);
X
Xiaoyu Wang 已提交
1378 1379
      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1380
          .currentOffset = offset,
X
Xiaoyu Wang 已提交
1381 1382 1383
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
L
Liu Jicong 已提交
1384
          .vgSkipCnt = 0,
X
Xiaoyu Wang 已提交
1385 1386 1387 1388
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
L
Liu Jicong 已提交
1389
    taosArrayPush(newTopics, &topic);
X
Xiaoyu Wang 已提交
1390
  }
L
Liu Jicong 已提交
1391
  if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
L
Liu Jicong 已提交
1392
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
1393
  tmq->clientTopics = newTopics;
1394

1395 1396 1397 1398
  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
1399

X
Xiaoyu Wang 已提交
1400 1401 1402
  atomic_store_32(&tmq->epoch, epoch);
  return set;
}
L
Liu Jicong 已提交
1403
#endif
X
Xiaoyu Wang 已提交
1404

D
dapan1121 已提交
1405
int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
1406
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1407
  tmq_t*           tmq = pParam->tmq;
L
Liu Jicong 已提交
1408
  int8_t           async = pParam->async;
L
Liu Jicong 已提交
1409
  pParam->code = code;
1410
  if (code != 0) {
S
Shengliang Guan 已提交
1411
    tscError("consumer:%" PRId64 ", get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->async);
L
Liu Jicong 已提交
1412
    goto END;
1413
  }
L
Liu Jicong 已提交
1414

L
Liu Jicong 已提交
1415
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1416
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1417
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1418 1419
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
S
Shengliang Guan 已提交
1420
  tscDebug("consumer:%" PRId64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1421 1422
  if (head->epoch <= epoch) {
    goto END;
1423
  }
L
Liu Jicong 已提交
1424

L
Liu Jicong 已提交
1425
  if (!async) {
L
Liu Jicong 已提交
1426 1427
    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
S
Shengliang Guan 已提交
1428 1429
    /*printf("rsp epoch %" PRId64 " sz %" PRId64 "\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %" PRId64 " sz %" PRId64 "\n", tmq->epoch, tmq->clientTopics->size);*/
L
Liu Jicong 已提交
1430
    tmqUpdateEp2(tmq, head->epoch, &rsp);
L
Liu Jicong 已提交
1431
    tDeleteSMqAskEpRsp(&rsp);
X
Xiaoyu Wang 已提交
1432
  } else {
1433
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1434
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1435
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1436 1437
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1438
    }
L
Liu Jicong 已提交
1439 1440 1441
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1442
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1443

L
Liu Jicong 已提交
1444
    taosWriteQitem(tmq->mqueue, pWrapper);
1445
    tsem_post(&tmq->rspSem);
1446
  }
L
Liu Jicong 已提交
1447 1448

END:
L
Liu Jicong 已提交
1449
  /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1450
  if (!async) {
L
Liu Jicong 已提交
1451
    tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
1452 1453
  } else {
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
1454 1455
  }
  return code;
1456 1457
}

L
Liu Jicong 已提交
1458
int32_t tmqAskEp(tmq_t* tmq, bool async) {
L
Liu Jicong 已提交
1459
  int32_t code = 0;
L
Liu Jicong 已提交
1460
#if 0
L
Liu Jicong 已提交
1461
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
L
fix txn  
Liu Jicong 已提交
1462
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
1463
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
S
Shengliang Guan 已提交
1464
    tscTrace("consumer:%" PRId64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
1465
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
1466
  }
L
temp  
Liu Jicong 已提交
1467
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
1468
#endif
L
Liu Jicong 已提交
1469
  int32_t      tlen = sizeof(SMqAskEpReq);
L
Liu Jicong 已提交
1470
  SMqAskEpReq* req = taosMemoryCalloc(1, tlen);
L
Liu Jicong 已提交
1471
  if (req == NULL) {
L
Liu Jicong 已提交
1472
    tscError("failed to malloc get subscribe ep buf");
L
Liu Jicong 已提交
1473
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1474
    return -1;
L
Liu Jicong 已提交
1475
  }
L
Liu Jicong 已提交
1476 1477 1478
  req->consumerId = htobe64(tmq->consumerId);
  req->epoch = htonl(tmq->epoch);
  strcpy(req->cgroup, tmq->groupId);
1479

L
Liu Jicong 已提交
1480
  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
1481 1482
  if (pParam == NULL) {
    tscError("failed to malloc subscribe param");
wafwerar's avatar
wafwerar 已提交
1483
    taosMemoryFree(req);
L
Liu Jicong 已提交
1484
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1485
    return -1;
L
Liu Jicong 已提交
1486 1487
  }
  pParam->tmq = tmq;
L
Liu Jicong 已提交
1488
  pParam->async = async;
X
Xiaoyu Wang 已提交
1489
  tsem_init(&pParam->rspSem, 0, 0);
1490

1491
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1492 1493
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1494 1495
    taosMemoryFree(pParam);
    taosMemoryFree(req);
L
Liu Jicong 已提交
1496
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1497 1498 1499 1500 1501 1502 1503 1504 1505 1506
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
      .pData = req,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
L
Liu Jicong 已提交
1507 1508 1509
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1510
  sendInfo->msgType = TDMT_MND_MQ_ASK_EP;
1511

L
Liu Jicong 已提交
1512 1513
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

S
Shengliang Guan 已提交
1514
  tscDebug("consumer:%" PRId64 ", ask ep", tmq->consumerId);
L
add log  
Liu Jicong 已提交
1515

L
Liu Jicong 已提交
1516 1517
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1518

L
Liu Jicong 已提交
1519
  if (!async) {
L
Liu Jicong 已提交
1520 1521 1522 1523 1524
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }
  return code;
1525 1526
}

L
Liu Jicong 已提交
1527 1528
#if 0
int32_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
L
Liu Jicong 已提交
1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541
  const SMqOffset* pOffset = &offset->offset;
  if (strcmp(pOffset->cgroup, tmq->groupId) != 0) {
    return TMQ_RESP_ERR__FAIL;
  }
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* clientTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(clientTopic->topicName, pOffset->topicName) == 0) {
      int32_t vgSz = taosArrayGetSize(clientTopic->vgs);
      for (int32_t j = 0; j < vgSz; j++) {
        SMqClientVg* pVg = taosArrayGet(clientTopic->vgs, j);
        if (pVg->vgId == pOffset->vgId) {
          pVg->currentOffset = pOffset->offset;
L
Liu Jicong 已提交
1542
          tmqClearUnhandleMsg(tmq);
L
Liu Jicong 已提交
1543 1544 1545 1546 1547 1548 1549
          return TMQ_RESP_ERR__SUCCESS;
        }
      }
    }
  }
  return TMQ_RESP_ERR__FAIL;
}
L
Liu Jicong 已提交
1550
#endif
L
Liu Jicong 已提交
1551

1552
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1553 1554 1555 1556 1557 1558 1559 1560 1561 1562
  /*int64_t reqOffset;*/
  /*if (pVg->currentOffset >= 0) {*/
  /*reqOffset = pVg->currentOffset;*/
  /*} else {*/
  /*if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {*/
  /*tscError("unable to poll since no committed offset but reset offset is set to none");*/
  /*return NULL;*/
  /*}*/
  /*reqOffset = tmq->resetOffsetCfg;*/
  /*}*/
L
Liu Jicong 已提交
1563

L
Liu Jicong 已提交
1564
  SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq));
1565 1566 1567
  if (pReq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
1568

L
Liu Jicong 已提交
1569 1570 1571
  /*strcpy(pReq->topic, pTopic->topicName);*/
  /*strcpy(pReq->cgroup, tmq->groupId);*/

L
Liu Jicong 已提交
1572 1573 1574 1575
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1576

1577
  pReq->withTbName = tmq->withTbName;
1578
  pReq->timeout = timeout;
L
Liu Jicong 已提交
1579
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1580
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1581 1582
  /*pReq->currentOffset = reqOffset;*/
  pReq->reqOffset = pVg->currentOffsetNew;
L
Liu Jicong 已提交
1583
  pReq->reqId = generateRequestId();
1584

L
Liu Jicong 已提交
1585 1586
  pReq->useSnapshot = tmq->useSnapshot;

1587
  pReq->head.vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
1588
  pReq->head.contLen = htonl(sizeof(SMqPollReq));
1589 1590 1591
  return pReq;
}

L
Liu Jicong 已提交
1592 1593
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
L
Liu Jicong 已提交
1594
  pRspObj->resType = RES_TYPE__TMQ_META;
L
Liu Jicong 已提交
1595 1596 1597 1598 1599 1600 1601 1602
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

L
Liu Jicong 已提交
1603 1604 1605
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1606 1607
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1608
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1609
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1610
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
L
Liu Jicong 已提交
1611

L
Liu Jicong 已提交
1612 1613
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
L
Liu Jicong 已提交
1614
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1615 1616
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1617

L
Liu Jicong 已提交
1618
  return pRspObj;
X
Xiaoyu Wang 已提交
1619 1620
}

1621
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1622
  /*tscDebug("call poll");*/
X
Xiaoyu Wang 已提交
1623 1624 1625 1626 1627 1628
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
      if (vgStatus != TMQ_VG_STATUS__IDLE) {
L
Liu Jicong 已提交
1629
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
L
Liu Jicong 已提交
1630 1631
        tscTrace("consumer:%" PRId64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId,
                 vgSkipCnt);
X
Xiaoyu Wang 已提交
1632
        continue;
L
Liu Jicong 已提交
1633
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1634 1635 1636 1637
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
S
Shengliang Guan 已提交
1638
        tscDebug("consumer:%" PRId64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId);
L
temp  
Liu Jicong 已提交
1639 1640
        }
#endif
X
Xiaoyu Wang 已提交
1641
      }
L
Liu Jicong 已提交
1642
      atomic_store_32(&pVg->vgSkipCnt, 0);
1643
      SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, timeout, pTopic, pVg);
X
Xiaoyu Wang 已提交
1644 1645
      if (pReq == NULL) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1646
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1647 1648
        return -1;
      }
wafwerar's avatar
wafwerar 已提交
1649
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1650
      if (pParam == NULL) {
wafwerar's avatar
wafwerar 已提交
1651
        taosMemoryFree(pReq);
X
Xiaoyu Wang 已提交
1652
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1653
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1654 1655
        return -1;
      }
L
Liu Jicong 已提交
1656 1657
      pParam->tmq = tmq;
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1658
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1659
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1660 1661
      pParam->epoch = tmq->epoch;

1662
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1663
      if (sendInfo == NULL) {
wafwerar's avatar
wafwerar 已提交
1664 1665
        taosMemoryFree(pReq);
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1666
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1667
        tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1668 1669 1670 1671
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1672
          .pData = pReq,
L
Liu Jicong 已提交
1673
          .len = sizeof(SMqPollReq),
X
Xiaoyu Wang 已提交
1674 1675
          .handle = NULL,
      };
L
Liu Jicong 已提交
1676
      sendInfo->requestId = pReq->reqId;
X
Xiaoyu Wang 已提交
1677
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1678
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1679
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1680
      sendInfo->msgType = TDMT_VND_CONSUME;
X
Xiaoyu Wang 已提交
1681 1682

      int64_t transporterId = 0;
L
fix  
Liu Jicong 已提交
1683
      /*printf("send poll\n");*/
L
Liu Jicong 已提交
1684

1685 1686
      char offsetFormatBuf[80];
      tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffsetNew);
L
Liu Jicong 已提交
1687 1688
      tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64,
               tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId);
S
Shengliang Guan 已提交
1689
      /*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/
X
Xiaoyu Wang 已提交
1690 1691 1692 1693 1694 1695 1696 1697
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
  return 0;
}

L
Liu Jicong 已提交
1698 1699
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1700
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1701 1702
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1703
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
L
Liu Jicong 已提交
1704
      tmqUpdateEp2(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1705
      /*tmqClearUnhandleMsg(tmq);*/
X
Xiaoyu Wang 已提交
1706 1707 1708 1709 1710 1711 1712 1713 1714 1715
      *pReset = true;
    } else {
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
1716
void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
X
Xiaoyu Wang 已提交
1717
  while (1) {
L
Liu Jicong 已提交
1718 1719 1720
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1721
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1722 1723
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
      if (rspWrapper == NULL) return NULL;
X
Xiaoyu Wang 已提交
1724 1725
    }

L
Liu Jicong 已提交
1726 1727 1728 1729 1730
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(rspWrapper);
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
      return NULL;
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1731
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1732
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
1733
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1734
      if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1735
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
L
Liu Jicong 已提交
1736 1737
        /*printf("vgId:%d offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
         * rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1738
        pVg->currentOffsetNew = pollRspWrapper->dataRsp.rspOffset;
X
Xiaoyu Wang 已提交
1739
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1740
        if (pollRspWrapper->dataRsp.blockNum == 0) {
L
Liu Jicong 已提交
1741 1742
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1743 1744
          continue;
        }
L
Liu Jicong 已提交
1745
        // build rsp
L
Liu Jicong 已提交
1746
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1747
        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1748
        return pRsp;
X
Xiaoyu Wang 已提交
1749
      } else {
L
Liu Jicong 已提交
1750 1751 1752 1753 1754 1755 1756
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
                 pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
        taosFreeQitem(pollRspWrapper);
      }
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1757
      if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1758
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
L
Liu Jicong 已提交
1759 1760
        /*printf("vgId:%d offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
         * rspMsg->msg.rspOffset);*/
wmmhello's avatar
wmmhello 已提交
1761 1762
        pVg->currentOffsetNew.version = pollRspWrapper->metaRsp.rspOffset;
        pVg->currentOffsetNew.type = TMQ_OFFSET__LOG;
L
Liu Jicong 已提交
1763 1764
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // build rsp
L
Liu Jicong 已提交
1765
        SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1766 1767 1768 1769
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
L
Liu Jicong 已提交
1770
                 pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1771
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1772 1773
      }
    } else {
L
fix  
Liu Jicong 已提交
1774
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1775
      bool reset = false;
L
Liu Jicong 已提交
1776 1777
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1778
      if (pollIfReset && reset) {
S
Shengliang Guan 已提交
1779
        tscDebug("consumer:%" PRId64 ", reset and repoll", tmq->consumerId);
1780
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
1781 1782 1783 1784 1785
      }
    }
  }
}

1786
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1787
  /*tscDebug("call poll1");*/
L
Liu Jicong 已提交
1788 1789
  void*   rspObj;
  int64_t startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1790

1791 1792 1793
#if 0
  tmqHandleAllDelayedTask(tmq);
  tmqPollImpl(tmq, timeout);
1794
  rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1795 1796
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1797
  }
1798
#endif
X
Xiaoyu Wang 已提交
1799

L
Liu Jicong 已提交
1800
  // in no topic status also need process delayed task
L
Liu Jicong 已提交
1801
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1802 1803 1804
    return NULL;
  }

X
Xiaoyu Wang 已提交
1805
  while (1) {
L
Liu Jicong 已提交
1806
    tmqHandleAllDelayedTask(tmq);
1807
    if (tmqPollImpl(tmq, timeout) < 0) return NULL;
L
Liu Jicong 已提交
1808

1809
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1810 1811
    if (rspObj) {
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
1812 1813
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      return NULL;
X
Xiaoyu Wang 已提交
1814
    }
1815
    if (timeout != -1) {
X
Xiaoyu Wang 已提交
1816
      int64_t endTime = taosGetTimestampMs();
1817
      int64_t leftTime = endTime - startTime;
1818
      if (leftTime > timeout) {
S
Shengliang Guan 已提交
1819
        tscDebug("consumer:%" PRId64 ", (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
X
Xiaoyu Wang 已提交
1820 1821
        return NULL;
      }
1822
      tsem_timewait(&tmq->rspSem, leftTime * 1000);
L
Liu Jicong 已提交
1823 1824 1825
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
      tsem_timewait(&tmq->rspSem, 500 * 1000);
X
Xiaoyu Wang 已提交
1826 1827 1828 1829
    }
  }
}

L
Liu Jicong 已提交
1830
int32_t tmq_consumer_close(tmq_t* tmq) {
1831
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
L
Liu Jicong 已提交
1832 1833
    int32_t rsp = tmq_commit_sync(tmq, NULL);
    if (rsp != 0) {
L
Liu Jicong 已提交
1834
      return rsp;
1835 1836 1837 1838
    }

    tmq_list_t* lst = tmq_list_new();
    rsp = tmq_subscribe(tmq, lst);
1839
    tmq_list_destroy(lst);
1840

L
Liu Jicong 已提交
1841
    if (rsp != 0) {
L
Liu Jicong 已提交
1842
      return rsp;
1843
    }
L
Liu Jicong 已提交
1844
  }
1845
  // TODO: free resources
L
Liu Jicong 已提交
1846
  return 0;
1847
}
L
Liu Jicong 已提交
1848

L
Liu Jicong 已提交
1849 1850
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
1851
    return "success";
L
Liu Jicong 已提交
1852
  } else if (err == -1) {
L
Liu Jicong 已提交
1853 1854 1855
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
1856 1857
  }
}
L
Liu Jicong 已提交
1858

L
Liu Jicong 已提交
1859 1860 1861 1862 1863 1864 1865 1866 1867 1868
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
    return TMQ_RES_TABLE_META;
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
1869
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
1870 1871
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
1872
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1873 1874 1875
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1876 1877 1878 1879 1880
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1881 1882 1883 1884
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1885 1886 1887
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1888 1889 1890 1891 1892
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1893 1894 1895 1896
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
1897 1898 1899
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
L
Liu Jicong 已提交
1900 1901 1902 1903
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
1904 1905 1906 1907 1908 1909 1910 1911

const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
1912
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
L
Liu Jicong 已提交
1913 1914 1915
  }
  return NULL;
}
1916

1917
tmq_raw_data* tmq_get_raw_meta(TAOS_RES* res) {
L
Liu Jicong 已提交
1918
  if (TD_RES_TMQ_META(res)) {
1919
    tmq_raw_data*  raw = taosMemoryCalloc(1, sizeof(tmq_raw_data));
L
Liu Jicong 已提交
1920
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
wmmhello's avatar
wmmhello 已提交
1921 1922 1923 1924
    raw->raw_meta = pMetaRspObj->metaRsp.metaRsp;
    raw->raw_meta_len = pMetaRspObj->metaRsp.metaRspLen;
    raw->raw_meta_type = pMetaRspObj->metaRsp.resMsgType;
    return raw;
L
Liu Jicong 已提交
1925
  }
wmmhello's avatar
wmmhello 已提交
1926
  return NULL;
L
Liu Jicong 已提交
1927 1928
}

1929 1930
static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id,
                                  int8_t t) {
wmmhello's avatar
wmmhello 已提交
1931 1932 1933 1934 1935 1936 1937
  char*  string = NULL;
  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    return string;
  }
  cJSON* type = cJSON_CreateString("create");
  cJSON_AddItemToObject(json, "type", type);
wmmhello's avatar
wmmhello 已提交
1938

1939 1940 1941 1942
  //  char uid[32] = {0};
  //  sprintf(uid, "%"PRIi64, id);
  //  cJSON* id_ = cJSON_CreateString(uid);
  //  cJSON_AddItemToObject(json, "id", id_);
wmmhello's avatar
wmmhello 已提交
1943 1944 1945 1946
  cJSON* tableName = cJSON_CreateString(name);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super");
  cJSON_AddItemToObject(json, "tableType", tableType);
1947 1948
  //  cJSON* version = cJSON_CreateNumber(1);
  //  cJSON_AddItemToObject(json, "version", version);
wmmhello's avatar
wmmhello 已提交
1949 1950

  cJSON* columns = cJSON_CreateArray();
1951 1952 1953 1954
  for (int i = 0; i < schemaRow->nCols; i++) {
    cJSON*   column = cJSON_CreateObject();
    SSchema* s = schemaRow->pSchema + i;
    cJSON*   cname = cJSON_CreateString(s->name);
wmmhello's avatar
wmmhello 已提交
1955 1956 1957
    cJSON_AddItemToObject(column, "name", cname);
    cJSON* ctype = cJSON_CreateNumber(s->type);
    cJSON_AddItemToObject(column, "type", ctype);
1958
    if (s->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
1959
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
1960
      cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1961
      cJSON_AddItemToObject(column, "length", cbytes);
1962 1963 1964
    } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
      cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1965 1966
      cJSON_AddItemToObject(column, "length", cbytes);
    }
wmmhello's avatar
wmmhello 已提交
1967 1968 1969 1970 1971
    cJSON_AddItemToArray(columns, column);
  }
  cJSON_AddItemToObject(json, "columns", columns);

  cJSON* tags = cJSON_CreateArray();
1972 1973 1974 1975
  for (int i = 0; schemaTag && i < schemaTag->nCols; i++) {
    cJSON*   tag = cJSON_CreateObject();
    SSchema* s = schemaTag->pSchema + i;
    cJSON*   tname = cJSON_CreateString(s->name);
wmmhello's avatar
wmmhello 已提交
1976 1977 1978
    cJSON_AddItemToObject(tag, "name", tname);
    cJSON* ttype = cJSON_CreateNumber(s->type);
    cJSON_AddItemToObject(tag, "type", ttype);
1979
    if (s->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
1980
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
1981
      cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1982
      cJSON_AddItemToObject(tag, "length", cbytes);
1983 1984 1985
    } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
      cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1986 1987
      cJSON_AddItemToObject(tag, "length", cbytes);
    }
wmmhello's avatar
wmmhello 已提交
1988 1989 1990 1991 1992 1993 1994 1995 1996
    cJSON_AddItemToArray(tags, tag);
  }
  cJSON_AddItemToObject(json, "tags", tags);

  string = cJSON_PrintUnformatted(json);
  cJSON_Delete(json);
  return string;
}

1997 1998 1999 2000
static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) {
  SMAlterStbReq req = {0};
  cJSON*        json = NULL;
  char*         string = NULL;
wmmhello's avatar
wmmhello 已提交
2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011

  if (tDeserializeSMAlterStbReq(alterData, alterDataLen, &req) != 0) {
    goto end;
  }

  json = cJSON_CreateObject();
  if (json == NULL) {
    goto end;
  }
  cJSON* type = cJSON_CreateString("alter");
  cJSON_AddItemToObject(json, "type", type);
2012 2013
  //  cJSON* uid = cJSON_CreateNumber(id);
  //  cJSON_AddItemToObject(json, "uid", uid);
wmmhello's avatar
wmmhello 已提交
2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025
  SName name = {0};
  tNameFromString(&name, req.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
  cJSON* tableName = cJSON_CreateString(name.tname);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString("super");
  cJSON_AddItemToObject(json, "tableType", tableType);

  cJSON* alterType = cJSON_CreateNumber(req.alterType);
  cJSON_AddItemToObject(json, "alterType", alterType);
  switch (req.alterType) {
    case TSDB_ALTER_TABLE_ADD_TAG:
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
2026 2027
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
      cJSON*      colName = cJSON_CreateString(field->name);
wmmhello's avatar
wmmhello 已提交
2028 2029 2030 2031
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colType = cJSON_CreateNumber(field->type);
      cJSON_AddItemToObject(json, "colType", colType);

2032
      if (field->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
2033
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
2034
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2035
        cJSON_AddItemToObject(json, "colLength", cbytes);
2036 2037 2038
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2039 2040 2041 2042 2043
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
      break;
    }
    case TSDB_ALTER_TABLE_DROP_TAG:
2044 2045 2046
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
      cJSON*      colName = cJSON_CreateString(field->name);
wmmhello's avatar
wmmhello 已提交
2047 2048 2049 2050
      cJSON_AddItemToObject(json, "colName", colName);
      break;
    }
    case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
2051 2052 2053
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
      cJSON*      colName = cJSON_CreateString(field->name);
wmmhello's avatar
wmmhello 已提交
2054 2055 2056
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colType = cJSON_CreateNumber(field->type);
      cJSON_AddItemToObject(json, "colType", colType);
2057
      if (field->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
2058
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
2059
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2060
        cJSON_AddItemToObject(json, "colLength", cbytes);
2061 2062 2063
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2064 2065 2066 2067 2068
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
      break;
    }
    case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
2069 2070 2071 2072
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
      TAOS_FIELD* oldField = taosArrayGet(req.pFields, 0);
      TAOS_FIELD* newField = taosArrayGet(req.pFields, 1);
      cJSON*      colName = cJSON_CreateString(oldField->name);
wmmhello's avatar
wmmhello 已提交
2073 2074 2075 2076 2077 2078 2079 2080 2081 2082
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colNewName = cJSON_CreateString(newField->name);
      cJSON_AddItemToObject(json, "colNewName", colNewName);
      break;
    }
    default:
      break;
  }
  string = cJSON_PrintUnformatted(json);

2083
end:
wmmhello's avatar
wmmhello 已提交
2084 2085 2086 2087 2088
  cJSON_Delete(json);
  tFreeSMAltertbReq(&req);
  return string;
}

2089
static char* processCreateStb(SMqMetaRsp* metaRsp) {
wmmhello's avatar
wmmhello 已提交
2090 2091
  SVCreateStbReq req = {0};
  SDecoder       coder;
2092
  char*          string = NULL;
wmmhello's avatar
wmmhello 已提交
2093 2094

  // decode and process req
2095
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2096 2097 2098 2099 2100 2101 2102 2103 2104 2105
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
    goto _err;
  }
  string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE);
  tDecoderClear(&coder);
  return string;

2106
_err:
wmmhello's avatar
wmmhello 已提交
2107 2108 2109 2110
  tDecoderClear(&coder);
  return string;
}

2111
static char* processAlterStb(SMqMetaRsp* metaRsp) {
wmmhello's avatar
wmmhello 已提交
2112 2113
  SVCreateStbReq req = {0};
  SDecoder       coder;
2114
  char*          string = NULL;
wmmhello's avatar
wmmhello 已提交
2115 2116

  // decode and process req
2117
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2118 2119 2120 2121 2122 2123 2124 2125 2126 2127
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
    goto _err;
  }
  string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen);
  tDecoderClear(&coder);
  return string;

2128
_err:
wmmhello's avatar
wmmhello 已提交
2129 2130 2131 2132
  tDecoderClear(&coder);
  return string;
}

2133 2134
static char* buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray* tagName, int64_t id) {
  char*   string = NULL;
wmmhello's avatar
wmmhello 已提交
2135
  SArray* pTagVals = NULL;
2136
  cJSON*  json = cJSON_CreateObject();
wmmhello's avatar
wmmhello 已提交
2137 2138 2139 2140 2141
  if (json == NULL) {
    return string;
  }
  cJSON* type = cJSON_CreateString("create");
  cJSON_AddItemToObject(json, "type", type);
2142 2143 2144 2145
  //  char cid[32] = {0};
  //  sprintf(cid, "%"PRIi64, id);
  //  cJSON* cid_ = cJSON_CreateString(cid);
  //  cJSON_AddItemToObject(json, "id", cid_);
wmmhello's avatar
wmmhello 已提交
2146

wmmhello's avatar
wmmhello 已提交
2147 2148 2149 2150
  cJSON* tableName = cJSON_CreateString(name);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString("child");
  cJSON_AddItemToObject(json, "tableType", tableType);
wmmhello's avatar
wmmhello 已提交
2151
  cJSON* using = cJSON_CreateString(sname);
wmmhello's avatar
wmmhello 已提交
2152
  cJSON_AddItemToObject(json, "using", using);
2153 2154
  //  cJSON* version = cJSON_CreateNumber(1);
  //  cJSON_AddItemToObject(json, "version", version);
wmmhello's avatar
wmmhello 已提交
2155

2156
  cJSON*  tags = cJSON_CreateArray();
wmmhello's avatar
wmmhello 已提交
2157 2158 2159 2160
  int32_t code = tTagToValArray(pTag, &pTagVals);
  if (code) {
    goto end;
  }
wmmhello's avatar
wmmhello 已提交
2161

wmmhello's avatar
wmmhello 已提交
2162 2163
  if (tTagIsJson(pTag)) {
    STag* p = (STag*)pTag;
2164
    if (p->nTag == 0) {
wmmhello's avatar
wmmhello 已提交
2165 2166
      goto end;
    }
2167 2168
    char*    pJson = parseTagDatatoJson(pTag);
    cJSON*   tag = cJSON_CreateObject();
wmmhello's avatar
wmmhello 已提交
2169 2170
    STagVal* pTagVal = taosArrayGet(pTagVals, 0);

wmmhello's avatar
wmmhello 已提交
2171 2172
    char*  ptname = taosArrayGet(tagName, 0);
    cJSON* tname = cJSON_CreateString(ptname);
wmmhello's avatar
wmmhello 已提交
2173
    cJSON_AddItemToObject(tag, "name", tname);
2174 2175
    //    cJSON* cid_ = cJSON_CreateString("");
    //    cJSON_AddItemToObject(tag, "cid", cid_);
wmmhello's avatar
wmmhello 已提交
2176 2177
    cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON);
    cJSON_AddItemToObject(tag, "type", ttype);
wmmhello's avatar
wmmhello 已提交
2178
    cJSON* tvalue = cJSON_CreateString(pJson);
wmmhello's avatar
wmmhello 已提交
2179 2180
    cJSON_AddItemToObject(tag, "value", tvalue);
    cJSON_AddItemToArray(tags, tag);
wmmhello's avatar
wmmhello 已提交
2181
    taosMemoryFree(pJson);
wmmhello's avatar
wmmhello 已提交
2182 2183 2184
    goto end;
  }

2185
  for (int i = 0; i < taosArrayGetSize(pTagVals); i++) {
wmmhello's avatar
wmmhello 已提交
2186 2187 2188
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);

    cJSON* tag = cJSON_CreateObject();
wmmhello's avatar
wmmhello 已提交
2189

wmmhello's avatar
wmmhello 已提交
2190 2191
    char*  ptname = taosArrayGet(tagName, i);
    cJSON* tname = cJSON_CreateString(ptname);
wmmhello's avatar
wmmhello 已提交
2192
    cJSON_AddItemToObject(tag, "name", tname);
2193 2194
    //    cJSON* cid = cJSON_CreateNumber(pTagVal->cid);
    //    cJSON_AddItemToObject(tag, "cid", cid);
wmmhello's avatar
wmmhello 已提交
2195 2196
    cJSON* ttype = cJSON_CreateNumber(pTagVal->type);
    cJSON_AddItemToObject(tag, "type", ttype);
wmmhello's avatar
wmmhello 已提交
2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208

    char* buf = NULL;
    if (IS_VAR_DATA_TYPE(pTagVal->type)) {
      buf = taosMemoryCalloc(pTagVal->nData + 1, 1);
      dataConverToStr(buf, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL);
    } else {
      buf = taosMemoryCalloc(32, 1);
      dataConverToStr(buf, pTagVal->type, &pTagVal->i64, tDataTypes[pTagVal->type].bytes, NULL);
    }

    cJSON* tvalue = cJSON_CreateString(buf);
    taosMemoryFree(buf);
wmmhello's avatar
wmmhello 已提交
2209 2210 2211
    cJSON_AddItemToObject(tag, "value", tvalue);
    cJSON_AddItemToArray(tags, tag);
  }
wmmhello's avatar
wmmhello 已提交
2212

2213
end:
wmmhello's avatar
wmmhello 已提交
2214 2215 2216
  cJSON_AddItemToObject(json, "tags", tags);
  string = cJSON_PrintUnformatted(json);
  cJSON_Delete(json);
wmmhello's avatar
wmmhello 已提交
2217
  taosArrayDestroy(pTagVals);
wmmhello's avatar
wmmhello 已提交
2218 2219 2220
  return string;
}

2221
static char* processCreateTable(SMqMetaRsp* metaRsp) {
wmmhello's avatar
wmmhello 已提交
2222 2223
  SDecoder           decoder = {0};
  SVCreateTbBatchReq req = {0};
2224 2225
  SVCreateTbReq*     pCreateReq;
  char*              string = NULL;
wmmhello's avatar
wmmhello 已提交
2226
  // decode
2227
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2228 2229 2230 2231 2232 2233 2234 2235 2236
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
    goto _exit;
  }

  // loop to create table
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;
2237 2238 2239 2240 2241 2242
    if (pCreateReq->type == TSDB_CHILD_TABLE) {
      string = buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.name, pCreateReq->name,
                                     pCreateReq->ctb.tagName, pCreateReq->uid);
    } else if (pCreateReq->type == TSDB_NORMAL_TABLE) {
      string =
          buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE);
wmmhello's avatar
wmmhello 已提交
2243 2244 2245 2246 2247
    }
  }

  tDecoderClear(&decoder);

2248
_exit:
wmmhello's avatar
wmmhello 已提交
2249 2250 2251 2252
  tDecoderClear(&decoder);
  return string;
}

2253 2254 2255 2256
static char* processAlterTable(SMqMetaRsp* metaRsp) {
  SDecoder     decoder = {0};
  SVAlterTbReq vAlterTbReq = {0};
  char*        string = NULL;
wmmhello's avatar
wmmhello 已提交
2257 2258

  // decode
2259
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVAlterTbReq(&decoder, &vAlterTbReq) < 0) {
    goto _exit;
  }

  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    goto _exit;
  }
  cJSON* type = cJSON_CreateString("alter");
  cJSON_AddItemToObject(json, "type", type);
2272 2273
  //  cJSON* uid = cJSON_CreateNumber(id);
  //  cJSON_AddItemToObject(json, "uid", uid);
wmmhello's avatar
wmmhello 已提交
2274 2275
  cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName);
  cJSON_AddItemToObject(json, "tableName", tableName);
wmmhello's avatar
wmmhello 已提交
2276
  cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ? "child" : "normal");
wmmhello's avatar
wmmhello 已提交
2277
  cJSON_AddItemToObject(json, "tableType", tableType);
wmmhello's avatar
wmmhello 已提交
2278 2279
  cJSON* alterType = cJSON_CreateNumber(vAlterTbReq.action);
  cJSON_AddItemToObject(json, "alterType", alterType);
wmmhello's avatar
wmmhello 已提交
2280 2281 2282 2283 2284 2285 2286

  switch (vAlterTbReq.action) {
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
      cJSON_AddItemToObject(json, "colType", colType);
wmmhello's avatar
wmmhello 已提交
2287

2288
      if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
2289
        int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
2290
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2291
        cJSON_AddItemToObject(json, "colLength", cbytes);
2292 2293 2294
      } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
        int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2295 2296
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
wmmhello's avatar
wmmhello 已提交
2297 2298
      break;
    }
2299
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
wmmhello's avatar
wmmhello 已提交
2300 2301 2302 2303
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
      break;
    }
2304
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
wmmhello's avatar
wmmhello 已提交
2305 2306
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
wmmhello's avatar
wmmhello 已提交
2307
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.colModType);
wmmhello's avatar
wmmhello 已提交
2308
      cJSON_AddItemToObject(json, "colType", colType);
2309
      if (vAlterTbReq.colModType == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
2310
        int32_t length = vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE;
2311
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2312
        cJSON_AddItemToObject(json, "colLength", cbytes);
2313 2314 2315
      } else if (vAlterTbReq.colModType == TSDB_DATA_TYPE_NCHAR) {
        int32_t length = (vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2316 2317
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
wmmhello's avatar
wmmhello 已提交
2318 2319
      break;
    }
2320
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
wmmhello's avatar
wmmhello 已提交
2321 2322 2323 2324 2325 2326
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colNewName = cJSON_CreateString(vAlterTbReq.colNewName);
      cJSON_AddItemToObject(json, "colNewName", colNewName);
      break;
    }
2327
    case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: {
wmmhello's avatar
wmmhello 已提交
2328 2329
      cJSON* tagName = cJSON_CreateString(vAlterTbReq.tagName);
      cJSON_AddItemToObject(json, "colName", tagName);
wmmhello's avatar
wmmhello 已提交
2330

wmmhello's avatar
wmmhello 已提交
2331
      bool isNull = vAlterTbReq.isNull;
2332 2333 2334
      if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
        STag* jsonTag = (STag*)vAlterTbReq.pTagVal;
        if (jsonTag->nTag == 0) isNull = true;
wmmhello's avatar
wmmhello 已提交
2335
      }
2336
      if (!isNull) {
wmmhello's avatar
wmmhello 已提交
2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351
        char* buf = NULL;

        if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
          ASSERT(tTagIsJson(vAlterTbReq.pTagVal) == true);
          buf = parseTagDatatoJson(vAlterTbReq.pTagVal);
        } else {
          buf = taosMemoryCalloc(vAlterTbReq.nTagVal + 1, 1);
          dataConverToStr(buf, vAlterTbReq.tagType, vAlterTbReq.pTagVal, vAlterTbReq.nTagVal, NULL);
        }

        cJSON* colValue = cJSON_CreateString(buf);
        cJSON_AddItemToObject(json, "colValue", colValue);
        taosMemoryFree(buf);
      }

wmmhello's avatar
wmmhello 已提交
2352 2353
      cJSON* isNullCJson = cJSON_CreateBool(isNull);
      cJSON_AddItemToObject(json, "colValueNull", isNullCJson);
wmmhello's avatar
wmmhello 已提交
2354 2355
      break;
    }
wmmhello's avatar
wmmhello 已提交
2356 2357 2358 2359 2360
    default:
      break;
  }
  string = cJSON_PrintUnformatted(json);

2361
_exit:
wmmhello's avatar
wmmhello 已提交
2362 2363 2364 2365
  tDecoderClear(&decoder);
  return string;
}

2366 2367 2368 2369
static char* processDropSTable(SMqMetaRsp* metaRsp) {
  SDecoder     decoder = {0};
  SVDropStbReq req = {0};
  char*        string = NULL;
wmmhello's avatar
wmmhello 已提交
2370 2371

  // decode
2372
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
    goto _exit;
  }

  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    goto _exit;
  }
  cJSON* type = cJSON_CreateString("drop");
  cJSON_AddItemToObject(json, "type", type);
  cJSON* tableName = cJSON_CreateString(req.name);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString("super");
  cJSON_AddItemToObject(json, "tableType", tableType);

  string = cJSON_PrintUnformatted(json);

2392
_exit:
wmmhello's avatar
wmmhello 已提交
2393 2394 2395 2396
  tDecoderClear(&decoder);
  return string;
}

2397 2398 2399 2400
static char* processDropTable(SMqMetaRsp* metaRsp) {
  SDecoder         decoder = {0};
  SVDropTbBatchReq req = {0};
  char*            string = NULL;
wmmhello's avatar
wmmhello 已提交
2401 2402

  // decode
2403
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVDropTbBatchReq(&decoder, &req) < 0) {
    goto _exit;
  }

  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    goto _exit;
  }
  cJSON* type = cJSON_CreateString("drop");
  cJSON_AddItemToObject(json, "type", type);
2416 2417 2418 2419
  //  cJSON* uid = cJSON_CreateNumber(id);
  //  cJSON_AddItemToObject(json, "uid", uid);
  //  cJSON* tableType = cJSON_CreateString("normal");
  //  cJSON_AddItemToObject(json, "tableType", tableType);
wmmhello's avatar
wmmhello 已提交
2420 2421 2422 2423 2424

  cJSON* tableNameList = cJSON_CreateArray();
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;

wmmhello's avatar
wmmhello 已提交
2425
    cJSON* tableName = cJSON_CreateString(pDropTbReq->name);
wmmhello's avatar
wmmhello 已提交
2426 2427 2428 2429 2430 2431
    cJSON_AddItemToArray(tableNameList, tableName);
  }
  cJSON_AddItemToObject(json, "tableNameList", tableNameList);

  string = cJSON_PrintUnformatted(json);

2432
_exit:
wmmhello's avatar
wmmhello 已提交
2433 2434 2435 2436
  tDecoderClear(&decoder);
  return string;
}

2437
char* tmq_get_json_meta(TAOS_RES* res) {
wmmhello's avatar
wmmhello 已提交
2438 2439 2440 2441 2442
  if (!TD_RES_TMQ_META(res)) {
    return NULL;
  }

  SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
2443
  if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB) {
wmmhello's avatar
wmmhello 已提交
2444
    return processCreateStb(&pMetaRspObj->metaRsp);
2445
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_STB) {
wmmhello's avatar
wmmhello 已提交
2446
    return processAlterStb(&pMetaRspObj->metaRsp);
2447
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_STB) {
wmmhello's avatar
wmmhello 已提交
2448
    return processDropSTable(&pMetaRspObj->metaRsp);
2449
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_TABLE) {
wmmhello's avatar
wmmhello 已提交
2450
    return processCreateTable(&pMetaRspObj->metaRsp);
2451
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_TABLE) {
wmmhello's avatar
wmmhello 已提交
2452
    return processAlterTable(&pMetaRspObj->metaRsp);
2453
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE) {
wmmhello's avatar
wmmhello 已提交
2454 2455 2456 2457 2458
    return processDropTable(&pMetaRspObj->metaRsp);
  }
  return NULL;
}

2459
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
wmmhello's avatar
wmmhello 已提交
2460

2461
static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
wmmhello's avatar
wmmhello 已提交
2462 2463 2464
  SVCreateStbReq req = {0};
  SDecoder       coder;
  SMCreateStbReq pReq = {0};
2465 2466
  int32_t        code = TSDB_CODE_SUCCESS;
  SRequestObj*   pRequest = NULL;
wmmhello's avatar
wmmhello 已提交
2467

2468
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2469 2470 2471 2472
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2473
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2474 2475 2476 2477
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2478
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2479 2480 2481 2482 2483 2484 2485 2486
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }
  // build create stable
  pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SField));
2487
  for (int32_t i = 0; i < req.schemaRow.nCols; i++) {
wmmhello's avatar
wmmhello 已提交
2488
    SSchema* pSchema = req.schemaRow.pSchema + i;
2489
    SField   field = {.type = pSchema->type, .bytes = pSchema->bytes};
wmmhello's avatar
wmmhello 已提交
2490 2491 2492 2493
    strcpy(field.name, pSchema->name);
    taosArrayPush(pReq.pColumns, &field);
  }
  pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField));
2494
  for (int32_t i = 0; i < req.schemaTag.nCols; i++) {
wmmhello's avatar
wmmhello 已提交
2495
    SSchema* pSchema = req.schemaTag.pSchema + i;
2496
    SField   field = {.type = pSchema->type, .bytes = pSchema->bytes};
wmmhello's avatar
wmmhello 已提交
2497 2498 2499
    strcpy(field.name, pSchema->name);
    taosArrayPush(pReq.pTags, &field);
  }
2500

2501 2502
  pReq.colVer = req.schemaRow.version;
  pReq.tagVer = req.schemaTag.version;
wmmhello's avatar
wmmhello 已提交
2503 2504 2505 2506
  pReq.numOfColumns = req.schemaRow.nCols;
  pReq.numOfTags = req.schemaTag.nCols;
  pReq.commentLen = -1;
  pReq.suid = req.suid;
wmmhello's avatar
wmmhello 已提交
2507
  pReq.source = TD_REQ_FROM_TAOX;
wmmhello's avatar
wmmhello 已提交
2508
  pReq.igExists = true;
wmmhello's avatar
wmmhello 已提交
2509

2510
  STscObj* pTscObj = pRequest->pTscObj;
2511
  SName    tableName;
wmmhello's avatar
wmmhello 已提交
2512
  tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
wmmhello's avatar
wmmhello 已提交
2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524

  SCmdMsgInfo pCmdMsg = {0};
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
  if (NULL == pCmdMsg.pMsg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);

2525
  SQuery pQuery = {0};
wmmhello's avatar
wmmhello 已提交
2526 2527 2528 2529 2530 2531 2532 2533 2534
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
  pQuery.pCmdMsg = &pCmdMsg;
  pQuery.msgType = pQuery.pCmdMsg->msgType;
  pQuery.stableQuery = true;

  launchQueryImpl(pRequest, &pQuery, true, NULL);
  code = pRequest->code;
  taosMemoryFree(pCmdMsg.pMsg);

2535
end:
wmmhello's avatar
wmmhello 已提交
2536 2537 2538 2539 2540 2541
  destroyRequest(pRequest);
  tFreeSMCreateStbReq(&pReq);
  tDecoderClear(&coder);
  return code;
}

2542
static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
wmmhello's avatar
wmmhello 已提交
2543 2544 2545
  SVDropStbReq req = {0};
  SDecoder     coder;
  SMDropStbReq pReq = {0};
2546
  int32_t      code = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2547 2548
  SRequestObj* pRequest = NULL;

2549
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2550 2551 2552 2553
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2554
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2555 2556 2557 2558
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2559
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2560 2561 2562 2563 2564 2565 2566 2567 2568
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVDropStbReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

  // build drop stable
  pReq.igNotExists = true;
wmmhello's avatar
wmmhello 已提交
2569
  pReq.source = TD_REQ_FROM_TAOX;
wmmhello's avatar
wmmhello 已提交
2570
  pReq.suid = req.suid;
2571 2572

  STscObj* pTscObj = pRequest->pTscObj;
2573
  SName    tableName;
wmmhello's avatar
wmmhello 已提交
2574
  tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
wmmhello's avatar
wmmhello 已提交
2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586

  SCmdMsgInfo pCmdMsg = {0};
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  pCmdMsg.msgType = TDMT_MND_DROP_STB;
  pCmdMsg.msgLen = tSerializeSMDropStbReq(NULL, 0, &pReq);
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
  if (NULL == pCmdMsg.pMsg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  tSerializeSMDropStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);

2587
  SQuery pQuery = {0};
wmmhello's avatar
wmmhello 已提交
2588 2589 2590 2591 2592 2593 2594 2595 2596
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
  pQuery.pCmdMsg = &pCmdMsg;
  pQuery.msgType = pQuery.pCmdMsg->msgType;
  pQuery.stableQuery = true;

  launchQueryImpl(pRequest, &pQuery, true, NULL);
  code = pRequest->code;
  taosMemoryFree(pCmdMsg.pMsg);

2597
end:
wmmhello's avatar
wmmhello 已提交
2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608 2609
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  return code;
}

typedef struct SVgroupCreateTableBatch {
  SVCreateTbBatchReq req;
  SVgroupInfo        info;
  char               dbName[TSDB_DB_NAME_LEN];
} SVgroupCreateTableBatch;

static void destroyCreateTbReqBatch(void* data) {
2610
  SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*)data;
wmmhello's avatar
wmmhello 已提交
2611 2612 2613
  taosArrayDestroy(pTbBatch->req.pArray);
}

L
Liu Jicong 已提交
2614 2615 2616 2617 2618 2619 2620
static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
  SVCreateTbBatchReq req = {0};
  SDecoder           coder = {0};
  int32_t            code = TSDB_CODE_SUCCESS;
  SRequestObj*       pRequest = NULL;
  SQuery*            pQuery = NULL;
  SHashObj*          pVgroupHashmap = NULL;
wmmhello's avatar
wmmhello 已提交
2621

L
Liu Jicong 已提交
2622
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2623 2624 2625 2626
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2627
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2628 2629 2630 2631
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2632
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2633 2634 2635 2636 2637 2638 2639
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

2640 2641
  STscObj* pTscObj = pRequest->pTscObj;

L
Liu Jicong 已提交
2642 2643
  SVCreateTbReq* pCreateReq = NULL;
  SCatalog*      pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
  if (NULL == pVgroupHashmap) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);

  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
2657 2658 2659
                           .requestId = pRequest->requestId,
                           .requestObjRefId = pRequest->self,
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
wmmhello's avatar
wmmhello 已提交
2660 2661 2662 2663 2664
  // loop to create table
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;

    SVgroupInfo pInfo = {0};
2665
    SName       pName;
wmmhello's avatar
wmmhello 已提交
2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696
    toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName);
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
    if (code != TSDB_CODE_SUCCESS) {
      goto end;
    }

    SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
    if (pTableBatch == NULL) {
      SVgroupCreateTableBatch tBatch = {0};
      tBatch.info = pInfo;
      strcpy(tBatch.dbName, pRequest->pDb);

      tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
      taosArrayPush(tBatch.req.pArray, pCreateReq);

      taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch));
    } else {  // add to the correct vgroup
      taosArrayPush(pTableBatch->req.pArray, pCreateReq);
    }
  }

  SArray* pBufArray = serializeVgroupsCreateTableBatch(pVgroupHashmap);
  if (NULL == pBufArray) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->msgType = TDMT_VND_CREATE_TABLE;
  pQuery->stableQuery = false;
2697
  pQuery->pRoot = nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT);
wmmhello's avatar
wmmhello 已提交
2698 2699 2700 2701 2702 2703 2704

  code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  launchQueryImpl(pRequest, pQuery, false, NULL);
2705 2706
  pQuery = NULL;  // no need to free in the end
  code = pRequest->code;
wmmhello's avatar
wmmhello 已提交
2707

2708
end:
wmmhello's avatar
wmmhello 已提交
2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726
  taosHashCleanup(pVgroupHashmap);
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  qDestroyQuery(pQuery);
  return code;
}

typedef struct SVgroupDropTableBatch {
  SVDropTbBatchReq req;
  SVgroupInfo      info;
  char             dbName[TSDB_DB_NAME_LEN];
} SVgroupDropTableBatch;

static void destroyDropTbReqBatch(void* data) {
  SVgroupDropTableBatch* pTbBatch = (SVgroupDropTableBatch*)data;
  taosArrayDestroy(pTbBatch->req.pArray);
}

L
Liu Jicong 已提交
2727 2728 2729 2730 2731 2732 2733
static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
  SVDropTbBatchReq req = {0};
  SDecoder         coder = {0};
  int32_t          code = TSDB_CODE_SUCCESS;
  SRequestObj*     pRequest = NULL;
  SQuery*          pQuery = NULL;
  SHashObj*        pVgroupHashmap = NULL;
wmmhello's avatar
wmmhello 已提交
2734

2735
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2736 2737 2738 2739
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2740
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2741 2742 2743 2744
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2745
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2746 2747 2748 2749 2750 2751 2752
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVDropTbBatchReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

2753 2754
  STscObj* pTscObj = pRequest->pTscObj;

L
Liu Jicong 已提交
2755 2756
  SVDropTbReq* pDropReq = NULL;
  SCatalog*    pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
  if (NULL == pVgroupHashmap) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);

  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
2770 2771 2772
                           .requestId = pRequest->requestId,
                           .requestObjRefId = pRequest->self,
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
wmmhello's avatar
wmmhello 已提交
2773 2774 2775
  // loop to create table
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pDropReq = req.pReqs + iReq;
wmmhello's avatar
wmmhello 已提交
2776
    pDropReq->igNotExists = true;
wmmhello's avatar
wmmhello 已提交
2777 2778

    SVgroupInfo pInfo = {0};
2779
    SName       pName;
wmmhello's avatar
wmmhello 已提交
2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808
    toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName);
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
    if (code != TSDB_CODE_SUCCESS) {
      goto end;
    }

    SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
    if (pTableBatch == NULL) {
      SVgroupDropTableBatch tBatch = {0};
      tBatch.info = pInfo;
      tBatch.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
      taosArrayPush(tBatch.req.pArray, pDropReq);

      taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch));
    } else {  // add to the correct vgroup
      taosArrayPush(pTableBatch->req.pArray, pDropReq);
    }
  }

  SArray* pBufArray = serializeVgroupsDropTableBatch(pVgroupHashmap);
  if (NULL == pBufArray) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->msgType = TDMT_VND_DROP_TABLE;
  pQuery->stableQuery = false;
2809
  pQuery->pRoot = nodesMakeNode(QUERY_NODE_DROP_TABLE_STMT);
wmmhello's avatar
wmmhello 已提交
2810 2811 2812 2813 2814 2815 2816

  code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  launchQueryImpl(pRequest, pQuery, false, NULL);
2817 2818
  pQuery = NULL;  // no need to free in the end
  code = pRequest->code;
wmmhello's avatar
wmmhello 已提交
2819

2820
end:
wmmhello's avatar
wmmhello 已提交
2821 2822 2823 2824 2825 2826 2827
  taosHashCleanup(pVgroupHashmap);
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  qDestroyQuery(pQuery);
  return code;
}

2828 2829 2830 2831 2832 2833 2834 2835
static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
  SVAlterTbReq   req = {0};
  SDecoder       coder = {0};
  int32_t        code = TSDB_CODE_SUCCESS;
  SRequestObj*   pRequest = NULL;
  SQuery*        pQuery = NULL;
  SArray*        pArray = NULL;
  SVgDataBlocks* pVgData = NULL;
wmmhello's avatar
wmmhello 已提交
2836

2837
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2838 2839 2840 2841 2842

  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2843
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2844 2845 2846 2847
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2848
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2849 2850 2851 2852 2853 2854 2855 2856
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVAlterTbReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

  // do not deal TSDB_ALTER_TABLE_UPDATE_OPTIONS
2857
  if (req.action == TSDB_ALTER_TABLE_UPDATE_OPTIONS) {
wmmhello's avatar
wmmhello 已提交
2858 2859 2860
    goto end;
  }

2861
  STscObj*  pTscObj = pRequest->pTscObj;
2862
  SCatalog* pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2863 2864 2865 2866 2867 2868
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
2869 2870 2871
                           .requestId = pRequest->requestId,
                           .requestObjRefId = pRequest->self,
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
wmmhello's avatar
wmmhello 已提交
2872 2873

  SVgroupInfo pInfo = {0};
2874
  SName       pName = {0};
wmmhello's avatar
wmmhello 已提交
2875 2876 2877 2878 2879 2880 2881 2882 2883 2884 2885 2886 2887 2888 2889 2890 2891 2892 2893 2894 2895 2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907
  toName(pTscObj->acctId, pRequest->pDb, req.tbName, &pName);
  code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  pArray = taosArrayInit(1, sizeof(void*));
  if (NULL == pArray) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }

  pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
  if (NULL == pVgData) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  pVgData->vg = pInfo;
  pVgData->pData = taosMemoryMalloc(metaLen);
  if (NULL == pVgData->pData) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  memcpy(pVgData->pData, meta, metaLen);
  ((SMsgHead*)pVgData->pData)->vgId = htonl(pInfo.vgId);
  pVgData->size = metaLen;
  pVgData->numOfTables = 1;
  taosArrayPush(pArray, &pVgData);

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->msgType = TDMT_VND_ALTER_TABLE;
  pQuery->stableQuery = false;
2908
  pQuery->pRoot = nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT);
wmmhello's avatar
wmmhello 已提交
2909 2910 2911 2912 2913 2914 2915

  code = rewriteToVnodeModifyOpStmt(pQuery, pArray);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  launchQueryImpl(pRequest, pQuery, false, NULL);
2916
  pQuery = NULL;  // no need to free in the end
wmmhello's avatar
wmmhello 已提交
2917
  pVgData = NULL;
2918 2919 2920
  pArray = NULL;
  code = pRequest->code;
  if (code == TSDB_CODE_VND_TABLE_NOT_EXIST) {
wmmhello's avatar
wmmhello 已提交
2921 2922
    code = 0;
  }
wmmhello's avatar
wmmhello 已提交
2923

2924
end:
wmmhello's avatar
wmmhello 已提交
2925
  taosArrayDestroy(pArray);
2926
  if (pVgData) taosMemoryFreeClear(pVgData->pData);
wmmhello's avatar
wmmhello 已提交
2927 2928 2929 2930 2931 2932 2933
  taosMemoryFreeClear(pVgData);
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  qDestroyQuery(pQuery);
  return code;
}

2934
int32_t taos_write_raw_meta(TAOS* taos, tmq_raw_data* raw_meta) {
wmmhello's avatar
wmmhello 已提交
2935 2936 2937 2938
  if (!taos || !raw_meta) {
    return TSDB_CODE_INVALID_PARA;
  }

2939
  if (raw_meta->raw_meta_type == TDMT_VND_CREATE_STB) {
wmmhello's avatar
wmmhello 已提交
2940
    return taosCreateStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
2941
  } else if (raw_meta->raw_meta_type == TDMT_VND_ALTER_STB) {
wmmhello's avatar
wmmhello 已提交
2942
    return taosCreateStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
2943
  } else if (raw_meta->raw_meta_type == TDMT_VND_DROP_STB) {
wmmhello's avatar
wmmhello 已提交
2944
    return taosDropStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
2945
  } else if (raw_meta->raw_meta_type == TDMT_VND_CREATE_TABLE) {
wmmhello's avatar
wmmhello 已提交
2946
    return taosCreateTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
2947
  } else if (raw_meta->raw_meta_type == TDMT_VND_ALTER_TABLE) {
wmmhello's avatar
wmmhello 已提交
2948
    return taosAlterTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
2949
  } else if (raw_meta->raw_meta_type == TDMT_VND_DROP_TABLE) {
wmmhello's avatar
wmmhello 已提交
2950 2951 2952 2953 2954
    return taosDropTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
  }
  return TSDB_CODE_INVALID_PARA;
}

2955 2956
void tmq_free_raw_meta(tmq_raw_data* rawMeta) {
  //
wmmhello's avatar
wmmhello 已提交
2957 2958 2959
  taosMemoryFreeClear(rawMeta);
}

L
Liu Jicong 已提交
2960
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
2961
  //
L
Liu Jicong 已提交
2962
  tmqCommitInner2(tmq, msg, 0, 1, cb, param);
L
Liu Jicong 已提交
2963 2964
}

2965 2966 2967 2968
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) {
  //
  return tmqCommitInner2(tmq, msg, 0, 0, NULL, NULL);
}