tmq.c 100.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "cJSON.h"
17 18 19
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22 23
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
24
#include "tqueue.h"
25
#include "tref.h"
L
Liu Jicong 已提交
26 27
#include "ttimer.h"

L
Liu Jicong 已提交
28 29 30 31 32 33 34 35
int32_t tmqAskEp(tmq_t* tmq, bool async);

typedef struct {
  int8_t inited;
  tmr_h  timer;
} SMqMgmt;

static SMqMgmt tmqMgmt = {0};
36

L
Liu Jicong 已提交
37 38 39 40 41 42
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
43 44 45
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
46 47
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
48
struct tmq_list_t {
L
Liu Jicong 已提交
49
  SArray container;
L
Liu Jicong 已提交
50
};
L
Liu Jicong 已提交
51

L
Liu Jicong 已提交
52
struct tmq_conf_t {
53 54 55 56 57 58 59 60 61 62
  char    clientId[256];
  char    groupId[TSDB_CGROUP_LEN];
  int8_t  autoCommit;
  int8_t  resetOffset;
  int8_t  withTbName;
  int8_t  ssEnable;
  int32_t ssBatchSize;

  bool hbBgEnable;

63 64 65 66 67
  uint16_t       port;
  int32_t        autoCommitInterval;
  char*          ip;
  char*          user;
  char*          pass;
68
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
69
  void*          commitCbUserParam;
L
Liu Jicong 已提交
70 71 72
};

struct tmq_t {
L
Liu Jicong 已提交
73
  // conf
74 75 76 77 78 79 80 81 82 83 84
  char    groupId[TSDB_CGROUP_LEN];
  char    clientId[256];
  int8_t  withTbName;
  int8_t  useSnapshot;
  int8_t  autoCommit;
  int32_t autoCommitInterval;
  int32_t resetOffsetCfg;
  int64_t consumerId;

  bool hbBgEnable;

L
Liu Jicong 已提交
85 86
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
87 88 89 90

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
91 92
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
93
  int32_t epSkipCnt;
L
Liu Jicong 已提交
94
#endif
L
Liu Jicong 已提交
95 96
  int64_t pollCnt;

L
Liu Jicong 已提交
97
  // timer
98 99
  tmr_h hbLiveTimer;
  tmr_h epTimer;
L
Liu Jicong 已提交
100 101 102
  tmr_h reportTimer;
  tmr_h commitTimer;

L
Liu Jicong 已提交
103 104 105 106
  // connection
  STscObj* pTscObj;

  // container
L
Liu Jicong 已提交
107
  SArray*     clientTopics;  // SArray<SMqClientTopic>
L
Liu Jicong 已提交
108
  STaosQueue* mqueue;        // queue of rsp
L
Liu Jicong 已提交
109
  STaosQall*  qall;
L
Liu Jicong 已提交
110 111 112 113
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit

  // ctl
  tsem_t rspSem;
L
Liu Jicong 已提交
114 115
};

X
Xiaoyu Wang 已提交
116 117 118 119 120 121 122 123
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
124
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
125
  TMQ_CONSUMER_STATUS__RECOVER,
L
Liu Jicong 已提交
126 127
};

L
Liu Jicong 已提交
128
enum {
129
  TMQ_DELAYED_TASK__ASK_EP = 1,
L
Liu Jicong 已提交
130 131 132 133
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

L
Liu Jicong 已提交
134
typedef struct {
135 136 137
  // statistics
  int64_t pollCnt;
  // offset
L
Liu Jicong 已提交
138 139
  STqOffsetVal committedOffset;
  STqOffsetVal currentOffset;
L
Liu Jicong 已提交
140
  // connection info
141
  int32_t vgId;
X
Xiaoyu Wang 已提交
142
  int32_t vgStatus;
L
Liu Jicong 已提交
143
  int32_t vgSkipCnt;
144 145 146
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
147
typedef struct {
148
  // subscribe info
L
Liu Jicong 已提交
149
  char* topicName;
L
Liu Jicong 已提交
150
  char  db[TSDB_DB_FNAME_LEN];
L
Liu Jicong 已提交
151 152 153

  SArray* vgs;  // SArray<SMqClientVg>

L
Liu Jicong 已提交
154
  SSchemaWrapper schema;
155 156
} SMqClientTopic;

L
Liu Jicong 已提交
157 158 159 160 161
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
162
  union {
L
Liu Jicong 已提交
163 164
    SMqDataRsp dataRsp;
    SMqMetaRsp metaRsp;
L
Liu Jicong 已提交
165
  };
L
Liu Jicong 已提交
166 167
} SMqPollRspWrapper;

L
Liu Jicong 已提交
168
typedef struct {
L
Liu Jicong 已提交
169 170 171
  tmq_t*  tmq;
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
172
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
173

L
Liu Jicong 已提交
174
typedef struct {
175
  tmq_t*  tmq;
L
Liu Jicong 已提交
176
  int32_t code;
L
Liu Jicong 已提交
177
  int32_t async;
X
Xiaoyu Wang 已提交
178
  tsem_t  rspSem;
179 180
} SMqAskEpCbParam;

L
Liu Jicong 已提交
181
typedef struct {
L
Liu Jicong 已提交
182 183
  tmq_t*          tmq;
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
184
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
185
  int32_t         epoch;
L
Liu Jicong 已提交
186
  int32_t         vgId;
L
Liu Jicong 已提交
187
  tsem_t          rspSem;
X
Xiaoyu Wang 已提交
188
} SMqPollCbParam;
189

190
typedef struct {
L
Liu Jicong 已提交
191 192 193
  tmq_t*         tmq;
  int8_t         automatic;
  int8_t         async;
L
Liu Jicong 已提交
194 195
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
L
Liu Jicong 已提交
196
  int32_t        rspErr;
197
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
198 199 200 201
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
  void*  userParam;
  tsem_t rspSem;
202 203 204 205 206 207 208
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
  STqOffset*           pOffset;
} SMqCommitCbParam2;

209
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
210
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
211
  conf->withTbName = false;
L
Liu Jicong 已提交
212
  conf->autoCommit = true;
L
Liu Jicong 已提交
213
  conf->autoCommitInterval = 5000;
X
Xiaoyu Wang 已提交
214
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
215 216 217
  return conf;
}

L
Liu Jicong 已提交
218
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
219 220 221 222 223 224
  if (conf) {
    if (conf->ip) taosMemoryFree(conf->ip);
    if (conf->user) taosMemoryFree(conf->user);
    if (conf->pass) taosMemoryFree(conf->pass);
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
225 226 227
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
228 229
  if (strcmp(key, "group.id") == 0) {
    strcpy(conf->groupId, value);
L
Liu Jicong 已提交
230
    return TMQ_CONF_OK;
231
  }
L
Liu Jicong 已提交
232

233 234
  if (strcmp(key, "client.id") == 0) {
    strcpy(conf->clientId, value);
L
Liu Jicong 已提交
235 236
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
237

L
Liu Jicong 已提交
238 239
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
240
      conf->autoCommit = true;
L
Liu Jicong 已提交
241 242
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
243
      conf->autoCommit = false;
L
Liu Jicong 已提交
244 245 246 247
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
248
  }
L
Liu Jicong 已提交
249

L
Liu Jicong 已提交
250 251 252 253 254
  if (strcmp(key, "auto.commit.interval.ms") == 0) {
    conf->autoCommitInterval = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
255 256 257 258 259 260 261 262 263 264 265 266 267 268
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
269

270 271
  if (strcmp(key, "msg.with.table.name") == 0) {
    if (strcmp(value, "true") == 0) {
272
      conf->withTbName = true;
L
Liu Jicong 已提交
273
      return TMQ_CONF_OK;
274
    } else if (strcmp(value, "false") == 0) {
275
      conf->withTbName = false;
L
Liu Jicong 已提交
276
      return TMQ_CONF_OK;
277 278 279 280 281
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
282
  if (strcmp(key, "experimental.snapshot.enable") == 0) {
L
Liu Jicong 已提交
283
    if (strcmp(value, "true") == 0) {
284 285 286 287 288 289 290 291 292 293 294 295 296
      conf->ssEnable = true;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
      conf->ssEnable = false;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

  if (strcmp(key, "enable.heartbeat.background") == 0) {
    if (strcmp(value, "true") == 0) {
      conf->hbBgEnable = true;
L
Liu Jicong 已提交
297 298
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
299
      conf->hbBgEnable = false;
L
Liu Jicong 已提交
300 301 302 303
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
304
    return TMQ_CONF_OK;
L
Liu Jicong 已提交
305 306
  }

L
Liu Jicong 已提交
307
  if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
308
    conf->ssBatchSize = atoi(value);
L
Liu Jicong 已提交
309 310 311
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
312
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
313 314 315
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
316
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
317 318 319
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
320
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
321 322 323
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
324
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
325 326 327
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
328
  if (strcmp(key, "td.connect.db") == 0) {
329
    /*conf->db = strdup(value);*/
L
Liu Jicong 已提交
330 331 332
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
333
  return TMQ_CONF_UNKNOWN;
334 335 336
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
337 338
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
339 340
}

L
Liu Jicong 已提交
341 342
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
343 344 345 346 347
  if (src == NULL || src[0] == 0) return -1;
  char* topic = strdup(src);
  if (topic[0] != '`') {
    strtolower(topic, src);
  }
L
fix  
Liu Jicong 已提交
348
  if (taosArrayPush(container, &topic) == NULL) return -1;
349 350 351
  return 0;
}

L
Liu Jicong 已提交
352
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
353
  SArray* container = &list->container;
L
Liu Jicong 已提交
354
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
355 356
}

L
Liu Jicong 已提交
357 358 359 360 361 362 363 364 365 366
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

L
Liu Jicong 已提交
367 368 369 370
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

D
dapan1121 已提交
371
int32_t tmqCommitCb2(void* param, SDataBuf* pBuf, int32_t code) {
372 373 374
  SMqCommitCbParam2*   pParam = (SMqCommitCbParam2*)param;
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
  // push into array
L
Liu Jicong 已提交
375
#if 0
376 377 378 379 380
  if (code == 0) {
    taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset);
  } else {
    taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
  }
L
Liu Jicong 已提交
381
#endif
L
Liu Jicong 已提交
382

S
Shengliang Guan 已提交
383
  /*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
L
Liu Jicong 已提交
384 385
   * pOffset->version);*/

386
  // count down waiting rsp
L
Liu Jicong 已提交
387
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
388 389 390 391 392 393 394
  ASSERT(waitingRspNum >= 0);

  if (waitingRspNum == 0) {
    // if no more waiting rsp
    if (pParamSet->async) {
      // call async cb func
      if (pParamSet->automatic && pParamSet->tmq->commitCb) {
L
Liu Jicong 已提交
395
        pParamSet->tmq->commitCb(pParamSet->tmq, pParamSet->rspErr, pParamSet->tmq->commitCbUserParam);
396 397
      } else if (!pParamSet->automatic && pParamSet->userCb) {
        // sem post
L
Liu Jicong 已提交
398
        pParamSet->userCb(pParamSet->tmq, pParamSet->rspErr, pParamSet->userParam);
399
      }
L
Liu Jicong 已提交
400 401
    } else {
      tsem_post(&pParamSet->rspSem);
402 403
    }

L
Liu Jicong 已提交
404
#if 0
405 406
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
L
Liu Jicong 已提交
407
#endif
408 409 410 411
  }
  return 0;
}

L
Liu Jicong 已提交
412 413 414 415 416 417
static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pTopic, SMqCommitCbParamSet* pParamSet) {
  STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
  if (pOffset == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
418
  pOffset->val = pVg->currentOffset;
419

L
Liu Jicong 已提交
420 421 422 423
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pOffset->subKey, tmq->groupId, groupLen);
  pOffset->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);
L
Liu Jicong 已提交
424

L
Liu Jicong 已提交
425 426 427 428 429 430 431 432 433 434
  int32_t len;
  int32_t code;
  tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
  if (code < 0) {
    ASSERT(0);
    return -1;
  }
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
  if (buf == NULL) return -1;
  ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
435

L
Liu Jicong 已提交
436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
  tEncodeSTqOffset(&encoder, pOffset);

  // build param
  SMqCommitCbParam2* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam2));
  pParam->params = pParamSet;
  pParam->pOffset = pOffset;

  // build send info
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
    return -1;
  }
  pMsgSendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = sizeof(SMsgHead) + len,
      .handle = NULL,
  };

S
Shengliang Guan 已提交
458 459
  tscDebug("consumer:%" PRId64 ", commit offset of %s on vgId:%d, offset is %" PRId64, tmq->consumerId, pOffset->subKey,
           pVg->vgId, pOffset->val.version);
L
Liu Jicong 已提交
460 461

  // TODO: put into cb
L
Liu Jicong 已提交
462
  pVg->committedOffset = pVg->currentOffset;
L
Liu Jicong 已提交
463 464 465 466

  pMsgSendInfo->requestId = generateRequestId();
  pMsgSendInfo->requestObjRefId = 0;
  pMsgSendInfo->param = pParam;
L
Liu Jicong 已提交
467
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
L
Liu Jicong 已提交
468 469 470 471
  pMsgSendInfo->fp = tmqCommitCb2;
  pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
  // send msg

L
Liu Jicong 已提交
472 473 474
  atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
  atomic_add_fetch_32(&pParamSet->totalRspNum, 1);

L
Liu Jicong 已提交
475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
  return 0;
}

int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
  char*   topic;
  int32_t vgId;
  ASSERT(msg != NULL);
  if (TD_RES_TMQ(msg)) {
    SMqRspObj* pRspObj = (SMqRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
  } else if (TD_RES_TMQ_META(msg)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
    topic = pMetaRspObj->topic;
    vgId = pMetaRspObj->vgId;
  } else {
    return TSDB_CODE_TMQ_INVALID_MSG;
  }

  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pParamSet->tmq = tmq;
  pParamSet->automatic = 0;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

L
Liu Jicong 已提交
508 509
  int32_t code = -1;

L
Liu Jicong 已提交
510 511 512 513 514 515 516
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(pTopic->topicName, topic) != 0) continue;
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      if (pVg->vgId != vgId) continue;

L
Liu Jicong 已提交
517
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
L
Liu Jicong 已提交
518 519
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          goto FAIL;
L
Liu Jicong 已提交
520
        }
L
Liu Jicong 已提交
521
        goto HANDLE_RSP;
L
Liu Jicong 已提交
522 523
      }
    }
L
Liu Jicong 已提交
524
  }
L
Liu Jicong 已提交
525

L
Liu Jicong 已提交
526 527 528 529
HANDLE_RSP:
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
530 531 532
    return 0;
  }

L
Liu Jicong 已提交
533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
    return code;
  } else {
    code = 0;
  }

FAIL:
  if (code != 0 && async) {
    userCb(tmq, code, userParam);
  }
  return 0;
}

L
Liu Jicong 已提交
549 550
int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                       void* userParam) {
L
Liu Jicong 已提交
551 552 553 554 555 556
  int32_t code = -1;

  if (msg != NULL) {
    return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
  }

557 558 559 560 561 562 563 564 565 566 567 568 569 570
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pParamSet->tmq = tmq;
  pParamSet->automatic = automatic;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
571

S
Shengliang Guan 已提交
572
    tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgNum %d", tmq->consumerId, pTopic->topicName,
L
Liu Jicong 已提交
573 574
             (int32_t)taosArrayGetSize(pTopic->vgs));

575
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
L
Liu Jicong 已提交
576 577
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);

S
Shengliang Guan 已提交
578 579
      tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgId:%d", tmq->consumerId, pTopic->topicName,
               pVg->vgId);
L
Liu Jicong 已提交
580

L
Liu Jicong 已提交
581 582 583
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
        tscDebug("consumer: %ld, vg:%d, current %ld, committed %ld", tmq->consumerId, pVg->vgId,
                 pVg->currentOffset.version, pVg->committedOffset.version);
L
Liu Jicong 已提交
584 585 586
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          continue;
        }
587 588 589 590
      }
    }
  }

L
Liu Jicong 已提交
591 592 593 594 595 596
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
    return 0;
  }

597 598 599 600 601 602 603 604 605 606
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
  } else {
    code = 0;
  }

  if (code != 0 && async) {
    if (automatic) {
L
Liu Jicong 已提交
607
      tmq->commitCb(tmq, code, tmq->commitCbUserParam);
608
    } else {
L
Liu Jicong 已提交
609
      userCb(tmq, code, userParam);
610 611 612
    }
  }

L
Liu Jicong 已提交
613
#if 0
614 615 616 617
  if (!async) {
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
  }
L
Liu Jicong 已提交
618
#endif
619 620 621 622

  return 0;
}

623
void tmqAssignAskEpTask(void* param, void* tmrId) {
L
Liu Jicong 已提交
624
  tmq_t*  tmq = (tmq_t*)param;
625
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
626
  *pTaskType = TMQ_DELAYED_TASK__ASK_EP;
L
Liu Jicong 已提交
627
  taosWriteQitem(tmq->delayedTask, pTaskType);
628
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
629 630 631 632
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
633
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
634 635
  *pTaskType = TMQ_DELAYED_TASK__COMMIT;
  taosWriteQitem(tmq->delayedTask, pTaskType);
636
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
637 638 639 640
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
641
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
642 643
  *pTaskType = TMQ_DELAYED_TASK__REPORT;
  taosWriteQitem(tmq->delayedTask, pTaskType);
644
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
645 646
}

647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
  if (pMsg && pMsg->pData) taosMemoryFree(pMsg->pData);
  return 0;
}

void tmqSendHbReq(void* param, void* tmrId) {
  // TODO replace with ref
  tmq_t*    tmq = (tmq_t*)param;
  int64_t   consumerId = tmq->consumerId;
  int32_t   epoch = tmq->epoch;
  SMqHbReq* pReq = taosMemoryMalloc(sizeof(SMqHbReq));
  if (pReq == NULL) goto OVER;
  pReq->consumerId = consumerId;
  pReq->epoch = epoch;

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pReq);
  }
  sendInfo->msgInfo = (SDataBuf){
      .pData = pReq,
      .len = sizeof(SMqHbReq),
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = NULL;
  sendInfo->fp = tmqHbCb;
  sendInfo->msgType = TDMT_MND_MQ_HB;

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

OVER:
  taosTmrReset(tmqSendHbReq, 1000, tmq, tmqMgmt.timer, &tmq->hbLiveTimer);
}

L
Liu Jicong 已提交
687 688 689 690 691 692 693 694
int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
  STaosQall* qall = taosAllocateQall();
  taosReadAllQitems(tmq->delayedTask, qall);
  while (1) {
    int8_t* pTaskType = NULL;
    taosGetQitem(qall, (void**)&pTaskType);
    if (pTaskType == NULL) break;

695
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
L
Liu Jicong 已提交
696
      tmqAskEp(tmq, true);
697
      taosTmrReset(tmqAssignAskEpTask, 1000, tmq, tmqMgmt.timer, &tmq->epTimer);
L
Liu Jicong 已提交
698
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
L
Liu Jicong 已提交
699
      tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
L
Liu Jicong 已提交
700 701 702 703 704
      taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer);
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    } else {
      ASSERT(0);
    }
L
Liu Jicong 已提交
705
    taosFreeQitem(pTaskType);
L
Liu Jicong 已提交
706 707 708 709 710
  }
  taosFreeQall(qall);
  return 0;
}

L
Liu Jicong 已提交
711
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
712
  SMqRspWrapper* msg = NULL;
L
Liu Jicong 已提交
713 714 715 716 717 718 719 720
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }

L
fix  
Liu Jicong 已提交
721
  msg = NULL;
L
Liu Jicong 已提交
722 723 724 725 726 727 728 729 730 731
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }
}

D
dapan1121 已提交
732
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
733 734
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
L
Liu Jicong 已提交
735
  /*tmq_t* tmq = pParam->tmq;*/
L
Liu Jicong 已提交
736 737 738
  tsem_post(&pParam->rspSem);
  return 0;
}
739

L
Liu Jicong 已提交
740
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
X
Xiaoyu Wang 已提交
741 742 743 744
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
745
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
746
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
747
  }
L
Liu Jicong 已提交
748
  return 0;
X
Xiaoyu Wang 已提交
749 750
}

L
Liu Jicong 已提交
751 752 753
int32_t tmq_unsubscribe(tmq_t* tmq) {
  tmq_list_t* lst = tmq_list_new();
  int32_t     rsp = tmq_subscribe(tmq, lst);
L
Liu Jicong 已提交
754 755
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
756 757
}

L
Liu Jicong 已提交
758
#if 0
759
tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
wafwerar's avatar
wafwerar 已提交
760
  tmq_t* pTmq = taosMemoryCalloc(sizeof(tmq_t), 1);
761 762 763 764 765 766
  if (pTmq == NULL) {
    return NULL;
  }
  pTmq->pTscObj = (STscObj*)conn;
  pTmq->status = 0;
  pTmq->pollCnt = 0;
L
Liu Jicong 已提交
767
  pTmq->epoch = 0;
L
fix txn  
Liu Jicong 已提交
768
  pTmq->epStatus = 0;
L
temp  
Liu Jicong 已提交
769
  pTmq->epSkipCnt = 0;
L
Liu Jicong 已提交
770
  // set conf
771 772
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
L
Liu Jicong 已提交
773
  pTmq->autoCommit = conf->autoCommit;
774
  pTmq->commit_cb = conf->commit_cb;
L
Liu Jicong 已提交
775
  pTmq->resetOffsetCfg = conf->resetOffset;
L
Liu Jicong 已提交
776

L
Liu Jicong 已提交
777 778 779 780 781 782 783 784 785 786
  pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  if (pTmq->clientTopics == NULL) {
    taosMemoryFree(pTmq);
    return NULL;
  }

  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();

L
Liu Jicong 已提交
787 788
  tsem_init(&pTmq->rspSem, 0, 0);

L
Liu Jicong 已提交
789 790
  return pTmq;
}
L
Liu Jicong 已提交
791
#endif
L
Liu Jicong 已提交
792

L
Liu Jicong 已提交
793
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
L
Liu Jicong 已提交
794 795 796 797 798 799 800 801 802 803 804
  // init timer
  int8_t inited = atomic_val_compare_exchange_8(&tmqMgmt.inited, 0, 1);
  if (inited == 0) {
    tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
    if (tmqMgmt.timer == NULL) {
      atomic_store_8(&tmqMgmt.inited, 0);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
  }

L
Liu Jicong 已提交
805 806
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
L
Liu Jicong 已提交
807
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
808 809
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
810 811
    return NULL;
  }
L
Liu Jicong 已提交
812

L
Liu Jicong 已提交
813 814 815 816 817
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

  ASSERT(user);
  ASSERT(pass);
L
Liu Jicong 已提交
818
  ASSERT(conf->groupId[0]);
L
Liu Jicong 已提交
819

L
Liu Jicong 已提交
820 821 822 823 824 825
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
  pTmq->delayedTask = taosOpenQueue();

  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
L
Liu Jicong 已提交
826
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
827 828
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
829 830
    goto FAIL;
  }
L
Liu Jicong 已提交
831

L
Liu Jicong 已提交
832 833
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
834 835
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
836 837
  /*pTmq->epStatus = 0;*/
  /*pTmq->epSkipCnt = 0;*/
L
Liu Jicong 已提交
838

L
Liu Jicong 已提交
839 840 841
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
842
  pTmq->withTbName = conf->withTbName;
843
  pTmq->useSnapshot = conf->ssEnable;
L
Liu Jicong 已提交
844
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
845
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
846 847
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
848 849
  pTmq->resetOffsetCfg = conf->resetOffset;

850 851
  pTmq->hbBgEnable = conf->hbBgEnable;

L
Liu Jicong 已提交
852
  // assign consumerId
L
Liu Jicong 已提交
853
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
854

L
Liu Jicong 已提交
855 856
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
S
Shengliang Guan 已提交
857 858
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
859 860
    goto FAIL;
  }
L
Liu Jicong 已提交
861

L
Liu Jicong 已提交
862 863 864
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
S
Shengliang Guan 已提交
865 866
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
867 868 869
    tsem_destroy(&pTmq->rspSem);
    goto FAIL;
  }
L
Liu Jicong 已提交
870

871 872 873 874
  if (pTmq->hbBgEnable) {
    pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pTmq, tmqMgmt.timer);
  }

S
Shengliang Guan 已提交
875
  tscInfo("consumer %" PRId64 " is setup, consumer group %s", pTmq->consumerId, pTmq->groupId);
L
Liu Jicong 已提交
876

877
  return pTmq;
L
Liu Jicong 已提交
878 879 880 881 882 883 884 885

FAIL:
  if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
  if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
  if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
  if (pTmq->qall) taosFreeQall(pTmq->qall);
  taosMemoryFree(pTmq);
  return NULL;
886 887
}

L
Liu Jicong 已提交
888
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
L
Liu Jicong 已提交
889 890 891 892 893
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
  SCMSubscribeReq req = {0};
  int32_t         code = -1;
894 895

  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
896
  tstrncpy(req.clientId, tmq->clientId, 256);
L
Liu Jicong 已提交
897
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
898
  req.topicNames = taosArrayInit(sz, sizeof(void*));
L
Liu Jicong 已提交
899
  if (req.topicNames == NULL) goto FAIL;
900

L
Liu Jicong 已提交
901 902
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
903 904

    SName name = {0};
L
Liu Jicong 已提交
905
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
906

L
Liu Jicong 已提交
907 908 909
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
910
    }
L
Liu Jicong 已提交
911
    tNameExtractFullName(&name, topicFName);
912

L
Liu Jicong 已提交
913 914 915
    tscDebug("subscribe topic: %s", topicFName);

    taosArrayPush(req.topicNames, &topicFName);
916 917
  }

L
Liu Jicong 已提交
918 919 920 921
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) goto FAIL;

922 923 924
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

925
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
926
  if (sendInfo == NULL) goto FAIL;
927

X
Xiaoyu Wang 已提交
928
  SMqSubscribeCbParam param = {
L
Liu Jicong 已提交
929
      .rspErr = 0,
X
Xiaoyu Wang 已提交
930 931
      .tmq = tmq,
  };
L
Liu Jicong 已提交
932

L
Liu Jicong 已提交
933 934 935
  if (tsem_init(&param.rspSem, 0, 0) != 0) goto FAIL;

  sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
936 937 938 939
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
940

L
Liu Jicong 已提交
941 942
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
943 944
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
945 946
  sendInfo->msgType = TDMT_MND_SUBSCRIBE;

947 948 949 950 951
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
952 953 954
  // avoid double free if msg is sent
  buf = NULL;

L
Liu Jicong 已提交
955 956
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
957

L
Liu Jicong 已提交
958 959 960
  code = param.rspErr;
  if (code != 0) goto FAIL;

L
Liu Jicong 已提交
961
  int32_t retryCnt = 0;
L
Liu Jicong 已提交
962
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
L
Liu Jicong 已提交
963 964 965
    if (retryCnt++ > 10) {
      goto FAIL;
    }
L
fix  
Liu Jicong 已提交
966
    tscDebug("consumer not ready, retry");
L
Liu Jicong 已提交
967 968
    taosMsleep(500);
  }
969

970 971 972
  // init ep timer
  if (tmq->epTimer == NULL) {
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, tmq, tmqMgmt.timer);
973
  }
L
Liu Jicong 已提交
974 975

  // init auto commit timer
976
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
L
Liu Jicong 已提交
977 978 979
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer);
  }

L
Liu Jicong 已提交
980 981 982
  code = 0;
FAIL:
  if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
983
  if (code != 0 && buf) {
L
Liu Jicong 已提交
984 985 986
    taosMemoryFree(buf);
  }
  return code;
987 988
}

L
Liu Jicong 已提交
989
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
990
  //
991
  conf->commitCb = cb;
L
Liu Jicong 已提交
992
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
993
}
994

D
dapan1121 已提交
995
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
996 997
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
998
  SMqClientTopic* pTopic = pParam->pTopic;
X
Xiaoyu Wang 已提交
999
  tmq_t*          tmq = pParam->tmq;
L
Liu Jicong 已提交
1000 1001 1002
  int32_t         vgId = pParam->vgId;
  int32_t         epoch = pParam->epoch;
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
1003
  if (code != 0) {
L
Liu Jicong 已提交
1004
    tscWarn("msg discard from vgId:%d, epoch %d, since %s", vgId, epoch, terrstr());
L
Liu Jicong 已提交
1005
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1006 1007 1008 1009
    if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
      goto CREATE_MSG_FAIL;
    }
L
Liu Jicong 已提交
1010 1011 1012 1013
    if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
      if (pRspWrapper == NULL) {
        taosMemoryFree(pMsg->pData);
S
Shengliang Guan 已提交
1014
        tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
Liu Jicong 已提交
1015 1016 1017 1018 1019 1020 1021 1022
        goto CREATE_MSG_FAIL;
      }
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      /*pRspWrapper->vgHandle = pVg;*/
      /*pRspWrapper->topicHandle = pTopic;*/
      taosWriteQitem(tmq->mqueue, pRspWrapper);
      tsem_post(&tmq->rspSem);
    }
L
fix txn  
Liu Jicong 已提交
1023
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1024 1025
  }

X
Xiaoyu Wang 已提交
1026 1027 1028
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
1029
    // do not write into queue since updating epoch reset
S
Shengliang Guan 已提交
1030
    tscWarn("msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d", vgId, msgEpoch,
L
Liu Jicong 已提交
1031
            tmqEpoch);
1032
    tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1033
    taosMemoryFree(pMsg->pData);
X
Xiaoyu Wang 已提交
1034 1035 1036 1037
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
S
Shengliang Guan 已提交
1038
    tscWarn("mismatch rsp from vgId:%d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
X
Xiaoyu Wang 已提交
1039 1040
  }

L
Liu Jicong 已提交
1041 1042 1043
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

1044
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1045
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1046
    taosMemoryFree(pMsg->pData);
S
Shengliang Guan 已提交
1047
    tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
fix txn  
Liu Jicong 已提交
1048
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1049
  }
L
Liu Jicong 已提交
1050

L
Liu Jicong 已提交
1051
  pRspWrapper->tmqRspType = rspType;
L
Liu Jicong 已提交
1052 1053
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
1054

L
Liu Jicong 已提交
1055
  if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1056 1057 1058
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
wmmhello's avatar
wmmhello 已提交
1059
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1060
    memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1061 1062 1063
  } else {
    ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP);
    tDecodeSMqMetaRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->metaRsp);
L
Liu Jicong 已提交
1064
    memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1065
  }
L
Liu Jicong 已提交
1066

L
Liu Jicong 已提交
1067
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1068

S
Shengliang Guan 已提交
1069 1070 1071
  tscDebug("consumer:%" PRId64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d",
           tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version,
           rspType);
L
fix  
Liu Jicong 已提交
1072

L
Liu Jicong 已提交
1073
  taosWriteQitem(tmq->mqueue, pRspWrapper);
1074
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1075

L
Liu Jicong 已提交
1076
  return 0;
L
fix txn  
Liu Jicong 已提交
1077
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1078
  if (epoch == tmq->epoch) {
L
Liu Jicong 已提交
1079 1080
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
1081
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1082
  return -1;
1083 1084
}

L
Liu Jicong 已提交
1085
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
1086 1087 1088 1089
  bool set = false;

  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
S
Shengliang Guan 已提交
1090
  tscDebug("consumer:%" PRId64 ", update ep epoch %d to epoch %d, topic num:%d", tmq->consumerId, tmq->epoch, epoch,
1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108
           topicNumGet);

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
S
Shengliang Guan 已提交
1109
      tscDebug("consumer:%" PRId64 ", new vg num: %d", tmq->consumerId, vgNumCur);
1110 1111 1112
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
        sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
L
Liu Jicong 已提交
1113
        char buf[80];
L
Liu Jicong 已提交
1114
        tFormatOffset(buf, 80, &pVgCur->currentOffset);
L
Liu Jicong 已提交
1115 1116
        tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch,
                 pVgCur->vgId, vgKey, buf);
L
Liu Jicong 已提交
1117
        taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal));
1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
    topic.schema = pTopicEp->schema;
    topic.topicName = strdup(pTopicEp->topic);
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);

S
Shengliang Guan 已提交
1129
    tscDebug("consumer:%" PRId64 ", update topic: %s", tmq->consumerId, topic.topicName);
1130 1131 1132 1133 1134 1135

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
L
Liu Jicong 已提交
1136 1137
      STqOffsetVal* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      STqOffsetVal  offsetNew = {.type = tmq->resetOffsetCfg};
1138
      if (pOffset != NULL) {
L
Liu Jicong 已提交
1139
        offsetNew = *pOffset;
1140 1141 1142 1143
      }

      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1144
          .currentOffset = offsetNew,
X
Xiaoyu Wang 已提交
1145 1146 1147
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
L
Liu Jicong 已提交
1148
          .vgSkipCnt = 0,
X
Xiaoyu Wang 已提交
1149 1150 1151 1152
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
L
Liu Jicong 已提交
1153
    taosArrayPush(newTopics, &topic);
X
Xiaoyu Wang 已提交
1154
  }
L
Liu Jicong 已提交
1155
  if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
L
Liu Jicong 已提交
1156
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
1157
  tmq->clientTopics = newTopics;
1158

1159 1160 1161 1162
  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
1163

X
Xiaoyu Wang 已提交
1164 1165 1166 1167
  atomic_store_32(&tmq->epoch, epoch);
  return set;
}

D
dapan1121 已提交
1168
int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
1169
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1170
  tmq_t*           tmq = pParam->tmq;
L
Liu Jicong 已提交
1171
  int8_t           async = pParam->async;
L
Liu Jicong 已提交
1172
  pParam->code = code;
1173
  if (code != 0) {
S
Shengliang Guan 已提交
1174
    tscError("consumer:%" PRId64 ", get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->async);
L
Liu Jicong 已提交
1175
    goto END;
1176
  }
L
Liu Jicong 已提交
1177

L
Liu Jicong 已提交
1178
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1179
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1180
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1181 1182
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
S
Shengliang Guan 已提交
1183
  tscDebug("consumer:%" PRId64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1184 1185
  if (head->epoch <= epoch) {
    goto END;
1186
  }
L
Liu Jicong 已提交
1187

L
Liu Jicong 已提交
1188
  if (!async) {
L
Liu Jicong 已提交
1189 1190
    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
S
Shengliang Guan 已提交
1191 1192
    /*printf("rsp epoch %" PRId64 " sz %" PRId64 "\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %" PRId64 " sz %" PRId64 "\n", tmq->epoch, tmq->clientTopics->size);*/
L
Liu Jicong 已提交
1193
    tmqUpdateEp(tmq, head->epoch, &rsp);
L
Liu Jicong 已提交
1194
    tDeleteSMqAskEpRsp(&rsp);
X
Xiaoyu Wang 已提交
1195
  } else {
1196
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1197
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1198
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1199 1200
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1201
    }
L
Liu Jicong 已提交
1202 1203 1204
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1205
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1206

L
Liu Jicong 已提交
1207
    taosWriteQitem(tmq->mqueue, pWrapper);
1208
    tsem_post(&tmq->rspSem);
1209
  }
L
Liu Jicong 已提交
1210 1211

END:
L
Liu Jicong 已提交
1212
  /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1213
  if (!async) {
L
Liu Jicong 已提交
1214
    tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
1215 1216
  } else {
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
1217 1218
  }
  return code;
1219 1220
}

L
Liu Jicong 已提交
1221
int32_t tmqAskEp(tmq_t* tmq, bool async) {
L
Liu Jicong 已提交
1222
  int32_t code = 0;
L
Liu Jicong 已提交
1223
#if 0
L
Liu Jicong 已提交
1224
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
L
fix txn  
Liu Jicong 已提交
1225
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
1226
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
S
Shengliang Guan 已提交
1227
    tscTrace("consumer:%" PRId64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
1228
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
1229
  }
L
temp  
Liu Jicong 已提交
1230
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
1231
#endif
L
Liu Jicong 已提交
1232
  int32_t      tlen = sizeof(SMqAskEpReq);
L
Liu Jicong 已提交
1233
  SMqAskEpReq* req = taosMemoryCalloc(1, tlen);
L
Liu Jicong 已提交
1234
  if (req == NULL) {
L
Liu Jicong 已提交
1235
    tscError("failed to malloc get subscribe ep buf");
L
Liu Jicong 已提交
1236
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1237
    return -1;
L
Liu Jicong 已提交
1238
  }
L
Liu Jicong 已提交
1239 1240 1241
  req->consumerId = htobe64(tmq->consumerId);
  req->epoch = htonl(tmq->epoch);
  strcpy(req->cgroup, tmq->groupId);
1242

L
Liu Jicong 已提交
1243
  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
1244 1245
  if (pParam == NULL) {
    tscError("failed to malloc subscribe param");
wafwerar's avatar
wafwerar 已提交
1246
    taosMemoryFree(req);
L
Liu Jicong 已提交
1247
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1248
    return -1;
L
Liu Jicong 已提交
1249 1250
  }
  pParam->tmq = tmq;
L
Liu Jicong 已提交
1251
  pParam->async = async;
X
Xiaoyu Wang 已提交
1252
  tsem_init(&pParam->rspSem, 0, 0);
1253

1254
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1255 1256
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1257 1258
    taosMemoryFree(pParam);
    taosMemoryFree(req);
L
Liu Jicong 已提交
1259
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1260 1261 1262 1263 1264 1265 1266 1267 1268 1269
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
      .pData = req,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
L
Liu Jicong 已提交
1270 1271 1272
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1273
  sendInfo->msgType = TDMT_MND_MQ_ASK_EP;
1274

L
Liu Jicong 已提交
1275 1276
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

S
Shengliang Guan 已提交
1277
  tscDebug("consumer:%" PRId64 ", ask ep", tmq->consumerId);
L
add log  
Liu Jicong 已提交
1278

L
Liu Jicong 已提交
1279 1280
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1281

L
Liu Jicong 已提交
1282
  if (!async) {
L
Liu Jicong 已提交
1283 1284 1285 1286 1287
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }
  return code;
1288 1289
}

L
Liu Jicong 已提交
1290 1291
#if 0
int32_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
L
Liu Jicong 已提交
1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304
  const SMqOffset* pOffset = &offset->offset;
  if (strcmp(pOffset->cgroup, tmq->groupId) != 0) {
    return TMQ_RESP_ERR__FAIL;
  }
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* clientTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(clientTopic->topicName, pOffset->topicName) == 0) {
      int32_t vgSz = taosArrayGetSize(clientTopic->vgs);
      for (int32_t j = 0; j < vgSz; j++) {
        SMqClientVg* pVg = taosArrayGet(clientTopic->vgs, j);
        if (pVg->vgId == pOffset->vgId) {
          pVg->currentOffset = pOffset->offset;
L
Liu Jicong 已提交
1305
          tmqClearUnhandleMsg(tmq);
L
Liu Jicong 已提交
1306 1307 1308 1309 1310 1311 1312
          return TMQ_RESP_ERR__SUCCESS;
        }
      }
    }
  }
  return TMQ_RESP_ERR__FAIL;
}
L
Liu Jicong 已提交
1313
#endif
L
Liu Jicong 已提交
1314

1315
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1316 1317 1318 1319 1320 1321 1322 1323 1324 1325
  /*int64_t reqOffset;*/
  /*if (pVg->currentOffset >= 0) {*/
  /*reqOffset = pVg->currentOffset;*/
  /*} else {*/
  /*if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {*/
  /*tscError("unable to poll since no committed offset but reset offset is set to none");*/
  /*return NULL;*/
  /*}*/
  /*reqOffset = tmq->resetOffsetCfg;*/
  /*}*/
L
Liu Jicong 已提交
1326

L
Liu Jicong 已提交
1327
  SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq));
1328 1329 1330
  if (pReq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
1331

L
Liu Jicong 已提交
1332 1333 1334
  /*strcpy(pReq->topic, pTopic->topicName);*/
  /*strcpy(pReq->cgroup, tmq->groupId);*/

L
Liu Jicong 已提交
1335 1336 1337 1338
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1339

1340
  pReq->withTbName = tmq->withTbName;
1341
  pReq->timeout = timeout;
L
Liu Jicong 已提交
1342
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1343
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1344
  /*pReq->currentOffset = reqOffset;*/
L
Liu Jicong 已提交
1345
  pReq->reqOffset = pVg->currentOffset;
L
Liu Jicong 已提交
1346
  pReq->reqId = generateRequestId();
1347

L
Liu Jicong 已提交
1348 1349
  pReq->useSnapshot = tmq->useSnapshot;

1350
  pReq->head.vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
1351
  pReq->head.contLen = htonl(sizeof(SMqPollReq));
1352 1353 1354
  return pReq;
}

L
Liu Jicong 已提交
1355 1356
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
L
Liu Jicong 已提交
1357
  pRspObj->resType = RES_TYPE__TMQ_META;
L
Liu Jicong 已提交
1358 1359 1360 1361 1362 1363 1364 1365
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

L
Liu Jicong 已提交
1366 1367 1368
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1369 1370
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1371
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1372
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1373
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
L
Liu Jicong 已提交
1374

L
Liu Jicong 已提交
1375 1376
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
L
Liu Jicong 已提交
1377
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1378 1379
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1380

L
Liu Jicong 已提交
1381
  return pRspObj;
X
Xiaoyu Wang 已提交
1382 1383
}

1384
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1385
  /*tscDebug("call poll");*/
X
Xiaoyu Wang 已提交
1386 1387 1388 1389 1390 1391
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
      if (vgStatus != TMQ_VG_STATUS__IDLE) {
L
Liu Jicong 已提交
1392
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
L
Liu Jicong 已提交
1393 1394
        tscTrace("consumer:%" PRId64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId,
                 vgSkipCnt);
X
Xiaoyu Wang 已提交
1395
        continue;
L
Liu Jicong 已提交
1396
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1397 1398 1399 1400
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
S
Shengliang Guan 已提交
1401
        tscDebug("consumer:%" PRId64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId);
L
temp  
Liu Jicong 已提交
1402 1403
        }
#endif
X
Xiaoyu Wang 已提交
1404
      }
L
Liu Jicong 已提交
1405
      atomic_store_32(&pVg->vgSkipCnt, 0);
1406
      SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, timeout, pTopic, pVg);
X
Xiaoyu Wang 已提交
1407 1408
      if (pReq == NULL) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1409
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1410 1411
        return -1;
      }
wafwerar's avatar
wafwerar 已提交
1412
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1413
      if (pParam == NULL) {
wafwerar's avatar
wafwerar 已提交
1414
        taosMemoryFree(pReq);
X
Xiaoyu Wang 已提交
1415
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1416
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1417 1418
        return -1;
      }
L
Liu Jicong 已提交
1419 1420
      pParam->tmq = tmq;
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1421
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1422
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1423 1424
      pParam->epoch = tmq->epoch;

1425
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1426
      if (sendInfo == NULL) {
wafwerar's avatar
wafwerar 已提交
1427 1428
        taosMemoryFree(pReq);
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1429
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1430
        tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1431 1432 1433 1434
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1435
          .pData = pReq,
L
Liu Jicong 已提交
1436
          .len = sizeof(SMqPollReq),
X
Xiaoyu Wang 已提交
1437 1438
          .handle = NULL,
      };
L
Liu Jicong 已提交
1439
      sendInfo->requestId = pReq->reqId;
X
Xiaoyu Wang 已提交
1440
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1441
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1442
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1443
      sendInfo->msgType = TDMT_VND_CONSUME;
X
Xiaoyu Wang 已提交
1444 1445

      int64_t transporterId = 0;
L
fix  
Liu Jicong 已提交
1446
      /*printf("send poll\n");*/
L
Liu Jicong 已提交
1447

1448
      char offsetFormatBuf[80];
L
Liu Jicong 已提交
1449
      tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffset);
L
Liu Jicong 已提交
1450 1451
      tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64,
               tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId);
S
Shengliang Guan 已提交
1452
      /*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/
X
Xiaoyu Wang 已提交
1453 1454 1455 1456 1457 1458 1459 1460
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
  return 0;
}

L
Liu Jicong 已提交
1461 1462
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1463
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1464 1465
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1466
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
L
Liu Jicong 已提交
1467
      tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1468
      /*tmqClearUnhandleMsg(tmq);*/
X
Xiaoyu Wang 已提交
1469 1470 1471 1472 1473 1474 1475 1476 1477 1478
      *pReset = true;
    } else {
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
1479
void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
X
Xiaoyu Wang 已提交
1480
  while (1) {
L
Liu Jicong 已提交
1481 1482 1483
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1484
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1485 1486
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
      if (rspWrapper == NULL) return NULL;
X
Xiaoyu Wang 已提交
1487 1488
    }

L
Liu Jicong 已提交
1489 1490 1491 1492 1493
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(rspWrapper);
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
      return NULL;
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1494
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1495
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
1496
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1497
      if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1498
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1499
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1500
         * rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1501
        pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;
X
Xiaoyu Wang 已提交
1502
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1503
        if (pollRspWrapper->dataRsp.blockNum == 0) {
L
Liu Jicong 已提交
1504 1505
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1506 1507
          continue;
        }
L
Liu Jicong 已提交
1508
        // build rsp
L
Liu Jicong 已提交
1509
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1510
        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1511
        return pRsp;
X
Xiaoyu Wang 已提交
1512
      } else {
L
Liu Jicong 已提交
1513 1514 1515 1516 1517 1518 1519
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
                 pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
        taosFreeQitem(pollRspWrapper);
      }
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1520
      if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1521
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1522
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1523
         * rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1524 1525
        pVg->currentOffset.version = pollRspWrapper->metaRsp.rspOffset;
        pVg->currentOffset.type = TMQ_OFFSET__LOG;
L
Liu Jicong 已提交
1526 1527
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // build rsp
L
Liu Jicong 已提交
1528
        SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1529 1530 1531 1532
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
L
Liu Jicong 已提交
1533
                 pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1534
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1535 1536
      }
    } else {
L
fix  
Liu Jicong 已提交
1537
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1538
      bool reset = false;
L
Liu Jicong 已提交
1539 1540
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1541
      if (pollIfReset && reset) {
S
Shengliang Guan 已提交
1542
        tscDebug("consumer:%" PRId64 ", reset and repoll", tmq->consumerId);
1543
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
1544 1545 1546 1547 1548
      }
    }
  }
}

1549
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1550
  /*tscDebug("call poll1");*/
L
Liu Jicong 已提交
1551 1552
  void*   rspObj;
  int64_t startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1553

1554 1555 1556
#if 0
  tmqHandleAllDelayedTask(tmq);
  tmqPollImpl(tmq, timeout);
1557
  rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1558 1559
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1560
  }
1561
#endif
X
Xiaoyu Wang 已提交
1562

L
Liu Jicong 已提交
1563
  // in no topic status also need process delayed task
L
Liu Jicong 已提交
1564
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1565 1566 1567
    return NULL;
  }

L
Liu Jicong 已提交
1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
    int32_t retryCnt = 0;
    while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
      if (retryCnt++ > 10) {
        return NULL;
      }
      tscDebug("consumer not ready, retry");
      taosMsleep(500);
    }
  }

X
Xiaoyu Wang 已提交
1579
  while (1) {
L
Liu Jicong 已提交
1580
    tmqHandleAllDelayedTask(tmq);
1581
    if (tmqPollImpl(tmq, timeout) < 0) return NULL;
L
Liu Jicong 已提交
1582

1583
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1584 1585
    if (rspObj) {
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
1586 1587
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      return NULL;
X
Xiaoyu Wang 已提交
1588
    }
1589
    if (timeout != -1) {
X
Xiaoyu Wang 已提交
1590
      int64_t endTime = taosGetTimestampMs();
1591
      int64_t leftTime = endTime - startTime;
1592
      if (leftTime > timeout) {
S
Shengliang Guan 已提交
1593
        tscDebug("consumer:%" PRId64 ", (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
X
Xiaoyu Wang 已提交
1594 1595
        return NULL;
      }
1596
      tsem_timewait(&tmq->rspSem, leftTime * 1000);
L
Liu Jicong 已提交
1597 1598 1599
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
      tsem_timewait(&tmq->rspSem, 500 * 1000);
X
Xiaoyu Wang 已提交
1600 1601 1602 1603
    }
  }
}

L
Liu Jicong 已提交
1604
int32_t tmq_consumer_close(tmq_t* tmq) {
1605
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
L
Liu Jicong 已提交
1606 1607
    int32_t rsp = tmq_commit_sync(tmq, NULL);
    if (rsp != 0) {
L
Liu Jicong 已提交
1608
      return rsp;
1609 1610
    }

L
Liu Jicong 已提交
1611
    int32_t     retryCnt = 0;
1612
    tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
1613 1614 1615 1616 1617 1618 1619 1620 1621 1622
    while (1) {
      rsp = tmq_subscribe(tmq, lst);
      if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
        break;
      } else {
        retryCnt++;
        taosMsleep(500);
      }
    }

1623
    tmq_list_destroy(lst);
1624

L
Liu Jicong 已提交
1625 1626
    /*return rsp;*/
    return 0;
L
Liu Jicong 已提交
1627
  }
1628
  // TODO: free resources
L
Liu Jicong 已提交
1629
  return 0;
1630
}
L
Liu Jicong 已提交
1631

L
Liu Jicong 已提交
1632 1633
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
1634
    return "success";
L
Liu Jicong 已提交
1635
  } else if (err == -1) {
L
Liu Jicong 已提交
1636 1637 1638
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
1639 1640
  }
}
L
Liu Jicong 已提交
1641

L
Liu Jicong 已提交
1642 1643 1644 1645
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
wmmhello's avatar
wmmhello 已提交
1646 1647 1648 1649
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DELETE) {
      return TMQ_RES_DATA;
    }
L
Liu Jicong 已提交
1650 1651 1652 1653 1654 1655
    return TMQ_RES_TABLE_META;
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
1656
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
1657 1658
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
1659
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1660 1661 1662
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1663 1664 1665 1666 1667
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1668 1669 1670 1671
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1672 1673 1674
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1675 1676 1677 1678 1679
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1680 1681 1682 1683
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
1684 1685 1686
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
L
Liu Jicong 已提交
1687 1688 1689 1690
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
1691 1692 1693 1694 1695 1696 1697 1698

const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
1699
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
L
Liu Jicong 已提交
1700 1701 1702
  }
  return NULL;
}
1703

1704 1705
static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id,
                                  int8_t t) {
wmmhello's avatar
wmmhello 已提交
1706 1707 1708 1709 1710 1711 1712
  char*  string = NULL;
  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    return string;
  }
  cJSON* type = cJSON_CreateString("create");
  cJSON_AddItemToObject(json, "type", type);
wmmhello's avatar
wmmhello 已提交
1713

1714 1715 1716 1717
  //  char uid[32] = {0};
  //  sprintf(uid, "%"PRIi64, id);
  //  cJSON* id_ = cJSON_CreateString(uid);
  //  cJSON_AddItemToObject(json, "id", id_);
wmmhello's avatar
wmmhello 已提交
1718 1719 1720 1721
  cJSON* tableName = cJSON_CreateString(name);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super");
  cJSON_AddItemToObject(json, "tableType", tableType);
1722 1723
  //  cJSON* version = cJSON_CreateNumber(1);
  //  cJSON_AddItemToObject(json, "version", version);
wmmhello's avatar
wmmhello 已提交
1724 1725

  cJSON* columns = cJSON_CreateArray();
1726 1727 1728 1729
  for (int i = 0; i < schemaRow->nCols; i++) {
    cJSON*   column = cJSON_CreateObject();
    SSchema* s = schemaRow->pSchema + i;
    cJSON*   cname = cJSON_CreateString(s->name);
wmmhello's avatar
wmmhello 已提交
1730 1731 1732
    cJSON_AddItemToObject(column, "name", cname);
    cJSON* ctype = cJSON_CreateNumber(s->type);
    cJSON_AddItemToObject(column, "type", ctype);
1733
    if (s->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
1734
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
1735
      cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1736
      cJSON_AddItemToObject(column, "length", cbytes);
1737 1738 1739
    } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
      cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1740 1741
      cJSON_AddItemToObject(column, "length", cbytes);
    }
wmmhello's avatar
wmmhello 已提交
1742 1743 1744 1745 1746
    cJSON_AddItemToArray(columns, column);
  }
  cJSON_AddItemToObject(json, "columns", columns);

  cJSON* tags = cJSON_CreateArray();
1747 1748 1749 1750
  for (int i = 0; schemaTag && i < schemaTag->nCols; i++) {
    cJSON*   tag = cJSON_CreateObject();
    SSchema* s = schemaTag->pSchema + i;
    cJSON*   tname = cJSON_CreateString(s->name);
wmmhello's avatar
wmmhello 已提交
1751 1752 1753
    cJSON_AddItemToObject(tag, "name", tname);
    cJSON* ttype = cJSON_CreateNumber(s->type);
    cJSON_AddItemToObject(tag, "type", ttype);
1754
    if (s->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
1755
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
1756
      cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1757
      cJSON_AddItemToObject(tag, "length", cbytes);
1758 1759 1760
    } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
      cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1761 1762
      cJSON_AddItemToObject(tag, "length", cbytes);
    }
wmmhello's avatar
wmmhello 已提交
1763 1764 1765 1766 1767 1768 1769 1770 1771
    cJSON_AddItemToArray(tags, tag);
  }
  cJSON_AddItemToObject(json, "tags", tags);

  string = cJSON_PrintUnformatted(json);
  cJSON_Delete(json);
  return string;
}

1772 1773 1774 1775
static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) {
  SMAlterStbReq req = {0};
  cJSON*        json = NULL;
  char*         string = NULL;
wmmhello's avatar
wmmhello 已提交
1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786

  if (tDeserializeSMAlterStbReq(alterData, alterDataLen, &req) != 0) {
    goto end;
  }

  json = cJSON_CreateObject();
  if (json == NULL) {
    goto end;
  }
  cJSON* type = cJSON_CreateString("alter");
  cJSON_AddItemToObject(json, "type", type);
1787 1788
  //  cJSON* uid = cJSON_CreateNumber(id);
  //  cJSON_AddItemToObject(json, "uid", uid);
wmmhello's avatar
wmmhello 已提交
1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800
  SName name = {0};
  tNameFromString(&name, req.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
  cJSON* tableName = cJSON_CreateString(name.tname);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString("super");
  cJSON_AddItemToObject(json, "tableType", tableType);

  cJSON* alterType = cJSON_CreateNumber(req.alterType);
  cJSON_AddItemToObject(json, "alterType", alterType);
  switch (req.alterType) {
    case TSDB_ALTER_TABLE_ADD_TAG:
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
1801 1802
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
      cJSON*      colName = cJSON_CreateString(field->name);
wmmhello's avatar
wmmhello 已提交
1803 1804 1805 1806
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colType = cJSON_CreateNumber(field->type);
      cJSON_AddItemToObject(json, "colType", colType);

1807
      if (field->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
1808
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
1809
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1810
        cJSON_AddItemToObject(json, "colLength", cbytes);
1811 1812 1813
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1814 1815 1816 1817 1818
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
      break;
    }
    case TSDB_ALTER_TABLE_DROP_TAG:
1819 1820 1821
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
      cJSON*      colName = cJSON_CreateString(field->name);
wmmhello's avatar
wmmhello 已提交
1822 1823 1824 1825
      cJSON_AddItemToObject(json, "colName", colName);
      break;
    }
    case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
1826 1827 1828
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
      cJSON*      colName = cJSON_CreateString(field->name);
wmmhello's avatar
wmmhello 已提交
1829 1830 1831
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colType = cJSON_CreateNumber(field->type);
      cJSON_AddItemToObject(json, "colType", colType);
1832
      if (field->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
1833
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
1834
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1835
        cJSON_AddItemToObject(json, "colLength", cbytes);
1836 1837 1838
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1839 1840 1841 1842 1843
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
      break;
    }
    case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
1844 1845 1846 1847
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
      TAOS_FIELD* oldField = taosArrayGet(req.pFields, 0);
      TAOS_FIELD* newField = taosArrayGet(req.pFields, 1);
      cJSON*      colName = cJSON_CreateString(oldField->name);
wmmhello's avatar
wmmhello 已提交
1848 1849 1850 1851 1852 1853 1854 1855 1856 1857
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colNewName = cJSON_CreateString(newField->name);
      cJSON_AddItemToObject(json, "colNewName", colNewName);
      break;
    }
    default:
      break;
  }
  string = cJSON_PrintUnformatted(json);

1858
end:
wmmhello's avatar
wmmhello 已提交
1859 1860 1861 1862 1863
  cJSON_Delete(json);
  tFreeSMAltertbReq(&req);
  return string;
}

1864
static char* processCreateStb(SMqMetaRsp* metaRsp) {
wmmhello's avatar
wmmhello 已提交
1865 1866
  SVCreateStbReq req = {0};
  SDecoder       coder;
1867
  char*          string = NULL;
wmmhello's avatar
wmmhello 已提交
1868 1869

  // decode and process req
1870
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
1871 1872 1873 1874 1875 1876 1877 1878 1879 1880
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
    goto _err;
  }
  string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE);
  tDecoderClear(&coder);
  return string;

1881
_err:
wmmhello's avatar
wmmhello 已提交
1882 1883 1884 1885
  tDecoderClear(&coder);
  return string;
}

1886
static char* processAlterStb(SMqMetaRsp* metaRsp) {
wmmhello's avatar
wmmhello 已提交
1887 1888
  SVCreateStbReq req = {0};
  SDecoder       coder;
1889
  char*          string = NULL;
wmmhello's avatar
wmmhello 已提交
1890 1891

  // decode and process req
1892
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
1893 1894 1895 1896 1897 1898 1899 1900 1901 1902
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
    goto _err;
  }
  string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen);
  tDecoderClear(&coder);
  return string;

1903
_err:
wmmhello's avatar
wmmhello 已提交
1904 1905 1906 1907
  tDecoderClear(&coder);
  return string;
}

1908
static char* buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray* tagName, int64_t id, uint8_t tagNum) {
1909
  char*   string = NULL;
wmmhello's avatar
wmmhello 已提交
1910
  SArray* pTagVals = NULL;
1911
  cJSON*  json = cJSON_CreateObject();
wmmhello's avatar
wmmhello 已提交
1912 1913 1914 1915 1916
  if (json == NULL) {
    return string;
  }
  cJSON* type = cJSON_CreateString("create");
  cJSON_AddItemToObject(json, "type", type);
1917 1918 1919 1920
  //  char cid[32] = {0};
  //  sprintf(cid, "%"PRIi64, id);
  //  cJSON* cid_ = cJSON_CreateString(cid);
  //  cJSON_AddItemToObject(json, "id", cid_);
wmmhello's avatar
wmmhello 已提交
1921

wmmhello's avatar
wmmhello 已提交
1922 1923 1924 1925
  cJSON* tableName = cJSON_CreateString(name);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString("child");
  cJSON_AddItemToObject(json, "tableType", tableType);
wmmhello's avatar
wmmhello 已提交
1926
  cJSON* using = cJSON_CreateString(sname);
wmmhello's avatar
wmmhello 已提交
1927
  cJSON_AddItemToObject(json, "using", using);
1928 1929
  cJSON* tagNumJson = cJSON_CreateNumber(tagNum);
  cJSON_AddItemToObject(json, "tagNum", tagNumJson);
1930 1931
  //  cJSON* version = cJSON_CreateNumber(1);
  //  cJSON_AddItemToObject(json, "version", version);
wmmhello's avatar
wmmhello 已提交
1932

1933
  cJSON*  tags = cJSON_CreateArray();
wmmhello's avatar
wmmhello 已提交
1934 1935 1936 1937
  int32_t code = tTagToValArray(pTag, &pTagVals);
  if (code) {
    goto end;
  }
wmmhello's avatar
wmmhello 已提交
1938

wmmhello's avatar
wmmhello 已提交
1939 1940
  if (tTagIsJson(pTag)) {
    STag* p = (STag*)pTag;
1941
    if (p->nTag == 0) {
wmmhello's avatar
wmmhello 已提交
1942 1943
      goto end;
    }
1944 1945
    char*    pJson = parseTagDatatoJson(pTag);
    cJSON*   tag = cJSON_CreateObject();
wmmhello's avatar
wmmhello 已提交
1946 1947
    STagVal* pTagVal = taosArrayGet(pTagVals, 0);

wmmhello's avatar
wmmhello 已提交
1948 1949
    char*  ptname = taosArrayGet(tagName, 0);
    cJSON* tname = cJSON_CreateString(ptname);
wmmhello's avatar
wmmhello 已提交
1950
    cJSON_AddItemToObject(tag, "name", tname);
1951 1952
    //    cJSON* cid_ = cJSON_CreateString("");
    //    cJSON_AddItemToObject(tag, "cid", cid_);
wmmhello's avatar
wmmhello 已提交
1953 1954
    cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON);
    cJSON_AddItemToObject(tag, "type", ttype);
wmmhello's avatar
wmmhello 已提交
1955
    cJSON* tvalue = cJSON_CreateString(pJson);
wmmhello's avatar
wmmhello 已提交
1956 1957
    cJSON_AddItemToObject(tag, "value", tvalue);
    cJSON_AddItemToArray(tags, tag);
wmmhello's avatar
wmmhello 已提交
1958
    taosMemoryFree(pJson);
wmmhello's avatar
wmmhello 已提交
1959 1960 1961
    goto end;
  }

1962
  for (int i = 0; i < taosArrayGetSize(pTagVals); i++) {
wmmhello's avatar
wmmhello 已提交
1963 1964 1965
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);

    cJSON* tag = cJSON_CreateObject();
wmmhello's avatar
wmmhello 已提交
1966

wmmhello's avatar
wmmhello 已提交
1967 1968
    char*  ptname = taosArrayGet(tagName, i);
    cJSON* tname = cJSON_CreateString(ptname);
wmmhello's avatar
wmmhello 已提交
1969
    cJSON_AddItemToObject(tag, "name", tname);
1970 1971
    //    cJSON* cid = cJSON_CreateNumber(pTagVal->cid);
    //    cJSON_AddItemToObject(tag, "cid", cid);
wmmhello's avatar
wmmhello 已提交
1972 1973
    cJSON* ttype = cJSON_CreateNumber(pTagVal->type);
    cJSON_AddItemToObject(tag, "type", ttype);
wmmhello's avatar
wmmhello 已提交
1974

wmmhello's avatar
wmmhello 已提交
1975
    cJSON* tvalue = NULL;
wmmhello's avatar
wmmhello 已提交
1976
    if (IS_VAR_DATA_TYPE(pTagVal->type)) {
wmmhello's avatar
wmmhello 已提交
1977
      char* buf = taosMemoryCalloc(pTagVal->nData + 3, 1);
L
Liu Jicong 已提交
1978
      if (!buf) goto end;
wmmhello's avatar
wmmhello 已提交
1979
      dataConverToStr(buf, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL);
wmmhello's avatar
wmmhello 已提交
1980 1981
      tvalue = cJSON_CreateString(buf);
      taosMemoryFree(buf);
wmmhello's avatar
wmmhello 已提交
1982
    } else {
wmmhello's avatar
wmmhello 已提交
1983 1984 1985
      double val = 0;
      GET_TYPED_DATA(val, double, pTagVal->type, &pTagVal->i64);
      tvalue = cJSON_CreateNumber(val);
wmmhello's avatar
wmmhello 已提交
1986 1987
    }

wmmhello's avatar
wmmhello 已提交
1988 1989 1990
    cJSON_AddItemToObject(tag, "value", tvalue);
    cJSON_AddItemToArray(tags, tag);
  }
wmmhello's avatar
wmmhello 已提交
1991

1992
end:
wmmhello's avatar
wmmhello 已提交
1993 1994 1995
  cJSON_AddItemToObject(json, "tags", tags);
  string = cJSON_PrintUnformatted(json);
  cJSON_Delete(json);
wmmhello's avatar
wmmhello 已提交
1996
  taosArrayDestroy(pTagVals);
wmmhello's avatar
wmmhello 已提交
1997 1998 1999
  return string;
}

2000
static char* processCreateTable(SMqMetaRsp* metaRsp) {
wmmhello's avatar
wmmhello 已提交
2001 2002
  SDecoder           decoder = {0};
  SVCreateTbBatchReq req = {0};
2003 2004
  SVCreateTbReq*     pCreateReq;
  char*              string = NULL;
wmmhello's avatar
wmmhello 已提交
2005
  // decode
2006
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2007 2008 2009 2010 2011 2012 2013 2014 2015
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
    goto _exit;
  }

  // loop to create table
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;
2016 2017
    if (pCreateReq->type == TSDB_CHILD_TABLE) {
      string = buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.name, pCreateReq->name,
2018
                                     pCreateReq->ctb.tagName, pCreateReq->uid, pCreateReq->ctb.tagNum);
2019 2020 2021
    } else if (pCreateReq->type == TSDB_NORMAL_TABLE) {
      string =
          buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE);
wmmhello's avatar
wmmhello 已提交
2022 2023 2024 2025 2026
    }
  }

  tDecoderClear(&decoder);

2027
_exit:
wmmhello's avatar
wmmhello 已提交
2028 2029 2030 2031
  tDecoderClear(&decoder);
  return string;
}

2032 2033 2034 2035
static char* processAlterTable(SMqMetaRsp* metaRsp) {
  SDecoder     decoder = {0};
  SVAlterTbReq vAlterTbReq = {0};
  char*        string = NULL;
wmmhello's avatar
wmmhello 已提交
2036 2037

  // decode
2038
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVAlterTbReq(&decoder, &vAlterTbReq) < 0) {
    goto _exit;
  }

  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    goto _exit;
  }
  cJSON* type = cJSON_CreateString("alter");
  cJSON_AddItemToObject(json, "type", type);
2051 2052
  //  cJSON* uid = cJSON_CreateNumber(id);
  //  cJSON_AddItemToObject(json, "uid", uid);
wmmhello's avatar
wmmhello 已提交
2053 2054
  cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName);
  cJSON_AddItemToObject(json, "tableName", tableName);
wmmhello's avatar
wmmhello 已提交
2055
  cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ? "child" : "normal");
wmmhello's avatar
wmmhello 已提交
2056
  cJSON_AddItemToObject(json, "tableType", tableType);
wmmhello's avatar
wmmhello 已提交
2057 2058
  cJSON* alterType = cJSON_CreateNumber(vAlterTbReq.action);
  cJSON_AddItemToObject(json, "alterType", alterType);
wmmhello's avatar
wmmhello 已提交
2059 2060 2061 2062 2063 2064 2065

  switch (vAlterTbReq.action) {
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
      cJSON_AddItemToObject(json, "colType", colType);
wmmhello's avatar
wmmhello 已提交
2066

2067
      if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
2068
        int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
2069
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2070
        cJSON_AddItemToObject(json, "colLength", cbytes);
2071 2072 2073
      } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
        int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2074 2075
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
wmmhello's avatar
wmmhello 已提交
2076 2077
      break;
    }
2078
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
wmmhello's avatar
wmmhello 已提交
2079 2080 2081 2082
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
      break;
    }
2083
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
wmmhello's avatar
wmmhello 已提交
2084 2085
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
wmmhello's avatar
wmmhello 已提交
2086
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.colModType);
wmmhello's avatar
wmmhello 已提交
2087
      cJSON_AddItemToObject(json, "colType", colType);
2088
      if (vAlterTbReq.colModType == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
2089
        int32_t length = vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE;
2090
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2091
        cJSON_AddItemToObject(json, "colLength", cbytes);
2092 2093 2094
      } else if (vAlterTbReq.colModType == TSDB_DATA_TYPE_NCHAR) {
        int32_t length = (vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2095 2096
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
wmmhello's avatar
wmmhello 已提交
2097 2098
      break;
    }
2099
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
wmmhello's avatar
wmmhello 已提交
2100 2101 2102 2103 2104 2105
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colNewName = cJSON_CreateString(vAlterTbReq.colNewName);
      cJSON_AddItemToObject(json, "colNewName", colNewName);
      break;
    }
2106
    case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: {
wmmhello's avatar
wmmhello 已提交
2107 2108
      cJSON* tagName = cJSON_CreateString(vAlterTbReq.tagName);
      cJSON_AddItemToObject(json, "colName", tagName);
wmmhello's avatar
wmmhello 已提交
2109

wmmhello's avatar
wmmhello 已提交
2110
      bool isNull = vAlterTbReq.isNull;
2111 2112 2113
      if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
        STag* jsonTag = (STag*)vAlterTbReq.pTagVal;
        if (jsonTag->nTag == 0) isNull = true;
wmmhello's avatar
wmmhello 已提交
2114
      }
2115
      if (!isNull) {
wmmhello's avatar
wmmhello 已提交
2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130
        char* buf = NULL;

        if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
          ASSERT(tTagIsJson(vAlterTbReq.pTagVal) == true);
          buf = parseTagDatatoJson(vAlterTbReq.pTagVal);
        } else {
          buf = taosMemoryCalloc(vAlterTbReq.nTagVal + 1, 1);
          dataConverToStr(buf, vAlterTbReq.tagType, vAlterTbReq.pTagVal, vAlterTbReq.nTagVal, NULL);
        }

        cJSON* colValue = cJSON_CreateString(buf);
        cJSON_AddItemToObject(json, "colValue", colValue);
        taosMemoryFree(buf);
      }

wmmhello's avatar
wmmhello 已提交
2131 2132
      cJSON* isNullCJson = cJSON_CreateBool(isNull);
      cJSON_AddItemToObject(json, "colValueNull", isNullCJson);
wmmhello's avatar
wmmhello 已提交
2133 2134
      break;
    }
wmmhello's avatar
wmmhello 已提交
2135 2136 2137 2138 2139
    default:
      break;
  }
  string = cJSON_PrintUnformatted(json);

2140
_exit:
wmmhello's avatar
wmmhello 已提交
2141 2142 2143 2144
  tDecoderClear(&decoder);
  return string;
}

2145 2146 2147 2148
static char* processDropSTable(SMqMetaRsp* metaRsp) {
  SDecoder     decoder = {0};
  SVDropStbReq req = {0};
  char*        string = NULL;
wmmhello's avatar
wmmhello 已提交
2149 2150

  // decode
2151
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
    goto _exit;
  }

  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    goto _exit;
  }
  cJSON* type = cJSON_CreateString("drop");
  cJSON_AddItemToObject(json, "type", type);
  cJSON* tableName = cJSON_CreateString(req.name);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString("super");
  cJSON_AddItemToObject(json, "tableType", tableType);

  string = cJSON_PrintUnformatted(json);

2171
_exit:
wmmhello's avatar
wmmhello 已提交
2172 2173 2174 2175
  tDecoderClear(&decoder);
  return string;
}

2176 2177 2178 2179
static char* processDropTable(SMqMetaRsp* metaRsp) {
  SDecoder         decoder = {0};
  SVDropTbBatchReq req = {0};
  char*            string = NULL;
wmmhello's avatar
wmmhello 已提交
2180 2181

  // decode
2182
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVDropTbBatchReq(&decoder, &req) < 0) {
    goto _exit;
  }

  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    goto _exit;
  }
  cJSON* type = cJSON_CreateString("drop");
  cJSON_AddItemToObject(json, "type", type);
2195 2196 2197 2198
  //  cJSON* uid = cJSON_CreateNumber(id);
  //  cJSON_AddItemToObject(json, "uid", uid);
  //  cJSON* tableType = cJSON_CreateString("normal");
  //  cJSON_AddItemToObject(json, "tableType", tableType);
wmmhello's avatar
wmmhello 已提交
2199 2200 2201 2202 2203

  cJSON* tableNameList = cJSON_CreateArray();
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;

wmmhello's avatar
wmmhello 已提交
2204
    cJSON* tableName = cJSON_CreateString(pDropTbReq->name);
wmmhello's avatar
wmmhello 已提交
2205 2206 2207 2208 2209 2210
    cJSON_AddItemToArray(tableNameList, tableName);
  }
  cJSON_AddItemToObject(json, "tableNameList", tableNameList);

  string = cJSON_PrintUnformatted(json);

2211
_exit:
wmmhello's avatar
wmmhello 已提交
2212 2213 2214 2215
  tDecoderClear(&decoder);
  return string;
}

2216
static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
wmmhello's avatar
wmmhello 已提交
2217 2218 2219
  SVCreateStbReq req = {0};
  SDecoder       coder;
  SMCreateStbReq pReq = {0};
2220 2221
  int32_t        code = TSDB_CODE_SUCCESS;
  SRequestObj*   pRequest = NULL;
wmmhello's avatar
wmmhello 已提交
2222

2223
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2224 2225 2226 2227
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2228
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2229 2230 2231 2232
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2233
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2234 2235 2236 2237 2238 2239 2240 2241
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }
  // build create stable
  pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SField));
2242
  for (int32_t i = 0; i < req.schemaRow.nCols; i++) {
wmmhello's avatar
wmmhello 已提交
2243
    SSchema* pSchema = req.schemaRow.pSchema + i;
2244
    SField   field = {.type = pSchema->type, .bytes = pSchema->bytes};
wmmhello's avatar
wmmhello 已提交
2245 2246 2247 2248
    strcpy(field.name, pSchema->name);
    taosArrayPush(pReq.pColumns, &field);
  }
  pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField));
2249
  for (int32_t i = 0; i < req.schemaTag.nCols; i++) {
wmmhello's avatar
wmmhello 已提交
2250
    SSchema* pSchema = req.schemaTag.pSchema + i;
2251
    SField   field = {.type = pSchema->type, .bytes = pSchema->bytes};
wmmhello's avatar
wmmhello 已提交
2252 2253 2254
    strcpy(field.name, pSchema->name);
    taosArrayPush(pReq.pTags, &field);
  }
2255

2256 2257
  pReq.colVer = req.schemaRow.version;
  pReq.tagVer = req.schemaTag.version;
wmmhello's avatar
wmmhello 已提交
2258 2259 2260 2261
  pReq.numOfColumns = req.schemaRow.nCols;
  pReq.numOfTags = req.schemaTag.nCols;
  pReq.commentLen = -1;
  pReq.suid = req.suid;
wmmhello's avatar
wmmhello 已提交
2262
  pReq.source = TD_REQ_FROM_TAOX;
wmmhello's avatar
wmmhello 已提交
2263
  pReq.igExists = true;
wmmhello's avatar
wmmhello 已提交
2264

2265
  STscObj* pTscObj = pRequest->pTscObj;
2266
  SName    tableName;
wmmhello's avatar
wmmhello 已提交
2267
  tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
wmmhello's avatar
wmmhello 已提交
2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279

  SCmdMsgInfo pCmdMsg = {0};
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
  if (NULL == pCmdMsg.pMsg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);

2280
  SQuery pQuery = {0};
wmmhello's avatar
wmmhello 已提交
2281 2282 2283 2284 2285 2286
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
  pQuery.pCmdMsg = &pCmdMsg;
  pQuery.msgType = pQuery.pCmdMsg->msgType;
  pQuery.stableQuery = true;

  launchQueryImpl(pRequest, &pQuery, true, NULL);
wmmhello's avatar
wmmhello 已提交
2287

L
Liu Jicong 已提交
2288 2289
  if (pRequest->code == TSDB_CODE_SUCCESS) {
    SCatalog* pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2290 2291 2292 2293
    catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
    catalogRemoveTableMeta(pCatalog, &tableName);
  }

wmmhello's avatar
wmmhello 已提交
2294 2295 2296
  code = pRequest->code;
  taosMemoryFree(pCmdMsg.pMsg);

2297
end:
wmmhello's avatar
wmmhello 已提交
2298 2299 2300 2301 2302 2303
  destroyRequest(pRequest);
  tFreeSMCreateStbReq(&pReq);
  tDecoderClear(&coder);
  return code;
}

2304
static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
wmmhello's avatar
wmmhello 已提交
2305 2306 2307
  SVDropStbReq req = {0};
  SDecoder     coder;
  SMDropStbReq pReq = {0};
2308
  int32_t      code = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2309 2310
  SRequestObj* pRequest = NULL;

2311
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2312 2313 2314 2315
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2316
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2317 2318 2319 2320
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2321
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2322 2323 2324 2325 2326 2327 2328 2329 2330
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVDropStbReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

  // build drop stable
  pReq.igNotExists = true;
wmmhello's avatar
wmmhello 已提交
2331
  pReq.source = TD_REQ_FROM_TAOX;
wmmhello's avatar
wmmhello 已提交
2332
  pReq.suid = req.suid;
2333 2334

  STscObj* pTscObj = pRequest->pTscObj;
wmmhello's avatar
wmmhello 已提交
2335
  SName    tableName = {0};
wmmhello's avatar
wmmhello 已提交
2336
  tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
wmmhello's avatar
wmmhello 已提交
2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348

  SCmdMsgInfo pCmdMsg = {0};
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  pCmdMsg.msgType = TDMT_MND_DROP_STB;
  pCmdMsg.msgLen = tSerializeSMDropStbReq(NULL, 0, &pReq);
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
  if (NULL == pCmdMsg.pMsg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  tSerializeSMDropStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);

2349
  SQuery pQuery = {0};
wmmhello's avatar
wmmhello 已提交
2350 2351 2352 2353 2354 2355
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
  pQuery.pCmdMsg = &pCmdMsg;
  pQuery.msgType = pQuery.pCmdMsg->msgType;
  pQuery.stableQuery = true;

  launchQueryImpl(pRequest, &pQuery, true, NULL);
wmmhello's avatar
wmmhello 已提交
2356

L
Liu Jicong 已提交
2357 2358
  if (pRequest->code == TSDB_CODE_SUCCESS) {
    SCatalog* pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2359 2360 2361 2362
    catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
    catalogRemoveTableMeta(pCatalog, &tableName);
  }

wmmhello's avatar
wmmhello 已提交
2363 2364 2365
  code = pRequest->code;
  taosMemoryFree(pCmdMsg.pMsg);

2366
end:
wmmhello's avatar
wmmhello 已提交
2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  return code;
}

typedef struct SVgroupCreateTableBatch {
  SVCreateTbBatchReq req;
  SVgroupInfo        info;
  char               dbName[TSDB_DB_NAME_LEN];
} SVgroupCreateTableBatch;

static void destroyCreateTbReqBatch(void* data) {
2379
  SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*)data;
wmmhello's avatar
wmmhello 已提交
2380 2381 2382
  taosArrayDestroy(pTbBatch->req.pArray);
}

L
Liu Jicong 已提交
2383 2384 2385 2386 2387 2388 2389
static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
  SVCreateTbBatchReq req = {0};
  SDecoder           coder = {0};
  int32_t            code = TSDB_CODE_SUCCESS;
  SRequestObj*       pRequest = NULL;
  SQuery*            pQuery = NULL;
  SHashObj*          pVgroupHashmap = NULL;
wmmhello's avatar
wmmhello 已提交
2390

L
Liu Jicong 已提交
2391
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2392 2393 2394 2395
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2396
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2397 2398 2399 2400
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2401
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2402 2403 2404 2405 2406 2407 2408
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

2409 2410
  STscObj* pTscObj = pRequest->pTscObj;

L
Liu Jicong 已提交
2411 2412
  SVCreateTbReq* pCreateReq = NULL;
  SCatalog*      pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
  if (NULL == pVgroupHashmap) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);

  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
2426 2427 2428
                           .requestId = pRequest->requestId,
                           .requestObjRefId = pRequest->self,
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
wmmhello's avatar
wmmhello 已提交
2429 2430

  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
wmmhello's avatar
wmmhello 已提交
2431 2432 2433 2434 2435
  // loop to create table
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;

    SVgroupInfo pInfo = {0};
wmmhello's avatar
wmmhello 已提交
2436
    SName       pName = {0};
wmmhello's avatar
wmmhello 已提交
2437 2438 2439 2440 2441
    toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName);
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
    if (code != TSDB_CODE_SUCCESS) {
      goto end;
    }
wmmhello's avatar
wmmhello 已提交
2442
    taosArrayPush(pRequest->tableList, &pName);
wmmhello's avatar
wmmhello 已提交
2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468

    SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
    if (pTableBatch == NULL) {
      SVgroupCreateTableBatch tBatch = {0};
      tBatch.info = pInfo;
      strcpy(tBatch.dbName, pRequest->pDb);

      tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
      taosArrayPush(tBatch.req.pArray, pCreateReq);

      taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch));
    } else {  // add to the correct vgroup
      taosArrayPush(pTableBatch->req.pArray, pCreateReq);
    }
  }

  SArray* pBufArray = serializeVgroupsCreateTableBatch(pVgroupHashmap);
  if (NULL == pBufArray) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->msgType = TDMT_VND_CREATE_TABLE;
  pQuery->stableQuery = false;
2469
  pQuery->pRoot = nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT);
wmmhello's avatar
wmmhello 已提交
2470 2471 2472 2473 2474 2475

  code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
2476
  launchQueryImpl(pRequest, pQuery, true, NULL);
L
Liu Jicong 已提交
2477
  if (pRequest->code == TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2478 2479 2480
    removeMeta(pTscObj, pRequest->tableList);
  }

2481
  code = pRequest->code;
wmmhello's avatar
wmmhello 已提交
2482

2483
end:
wmmhello's avatar
wmmhello 已提交
2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501
  taosHashCleanup(pVgroupHashmap);
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  qDestroyQuery(pQuery);
  return code;
}

typedef struct SVgroupDropTableBatch {
  SVDropTbBatchReq req;
  SVgroupInfo      info;
  char             dbName[TSDB_DB_NAME_LEN];
} SVgroupDropTableBatch;

static void destroyDropTbReqBatch(void* data) {
  SVgroupDropTableBatch* pTbBatch = (SVgroupDropTableBatch*)data;
  taosArrayDestroy(pTbBatch->req.pArray);
}

L
Liu Jicong 已提交
2502 2503 2504 2505 2506 2507 2508
static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
  SVDropTbBatchReq req = {0};
  SDecoder         coder = {0};
  int32_t          code = TSDB_CODE_SUCCESS;
  SRequestObj*     pRequest = NULL;
  SQuery*          pQuery = NULL;
  SHashObj*        pVgroupHashmap = NULL;
wmmhello's avatar
wmmhello 已提交
2509

2510
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2511 2512 2513 2514
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2515
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2516 2517 2518 2519
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2520
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2521 2522 2523 2524 2525 2526 2527
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVDropTbBatchReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

2528 2529
  STscObj* pTscObj = pRequest->pTscObj;

L
Liu Jicong 已提交
2530 2531
  SVDropTbReq* pDropReq = NULL;
  SCatalog*    pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
  if (NULL == pVgroupHashmap) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);

  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
2545 2546 2547
                           .requestId = pRequest->requestId,
                           .requestObjRefId = pRequest->self,
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
wmmhello's avatar
wmmhello 已提交
2548
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
wmmhello's avatar
wmmhello 已提交
2549 2550 2551
  // loop to create table
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pDropReq = req.pReqs + iReq;
wmmhello's avatar
wmmhello 已提交
2552
    pDropReq->igNotExists = true;
wmmhello's avatar
wmmhello 已提交
2553 2554

    SVgroupInfo pInfo = {0};
wmmhello's avatar
wmmhello 已提交
2555
    SName       pName = {0};
wmmhello's avatar
wmmhello 已提交
2556 2557 2558 2559 2560 2561
    toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName);
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
    if (code != TSDB_CODE_SUCCESS) {
      goto end;
    }

wmmhello's avatar
wmmhello 已提交
2562
    taosArrayPush(pRequest->tableList, &pName);
wmmhello's avatar
wmmhello 已提交
2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585
    SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
    if (pTableBatch == NULL) {
      SVgroupDropTableBatch tBatch = {0};
      tBatch.info = pInfo;
      tBatch.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
      taosArrayPush(tBatch.req.pArray, pDropReq);

      taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch));
    } else {  // add to the correct vgroup
      taosArrayPush(pTableBatch->req.pArray, pDropReq);
    }
  }

  SArray* pBufArray = serializeVgroupsDropTableBatch(pVgroupHashmap);
  if (NULL == pBufArray) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->msgType = TDMT_VND_DROP_TABLE;
  pQuery->stableQuery = false;
2586
  pQuery->pRoot = nodesMakeNode(QUERY_NODE_DROP_TABLE_STMT);
wmmhello's avatar
wmmhello 已提交
2587 2588 2589 2590 2591 2592

  code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
2593
  launchQueryImpl(pRequest, pQuery, true, NULL);
L
Liu Jicong 已提交
2594
  if (pRequest->code == TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2595 2596
    removeMeta(pTscObj, pRequest->tableList);
  }
2597
  code = pRequest->code;
wmmhello's avatar
wmmhello 已提交
2598

2599
end:
wmmhello's avatar
wmmhello 已提交
2600 2601 2602 2603 2604 2605 2606
  taosHashCleanup(pVgroupHashmap);
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  qDestroyQuery(pQuery);
  return code;
}

wmmhello's avatar
wmmhello 已提交
2607 2608
// delete from db.tabl where ..       -> delete from tabl where ..
// delete from db    .tabl where ..   -> delete from tabl where ..
L
Liu Jicong 已提交
2609
// static void getTbName(char *sql){
2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637
//  char *ch = sql;
//
//  bool inBackQuote = false;
//  int8_t dotIndex = 0;
//  while(*ch != '\0'){
//    if(!inBackQuote && *ch == '`'){
//      inBackQuote = true;
//      ch++;
//      continue;
//    }
//
//    if(inBackQuote && *ch == '`'){
//      inBackQuote = false;
//      ch++;
//
//      continue;
//    }
//
//    if(!inBackQuote && *ch == '.'){
//      dotIndex ++;
//      if(dotIndex == 2){
//        memmove(sql, ch + 1, strlen(ch + 1) + 1);
//        break;
//      }
//    }
//    ch++;
//  }
//}
wmmhello's avatar
wmmhello 已提交
2638 2639

static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
L
Liu Jicong 已提交
2640 2641 2642
  SDeleteRes req = {0};
  SDecoder   coder = {0};
  int32_t    code = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2643 2644 2645 2646 2647 2648 2649 2650 2651 2652

  // decode and process req
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeDeleteRes(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

L
Liu Jicong 已提交
2653
  //  getTbName(req.tableFName);
wmmhello's avatar
wmmhello 已提交
2654
  char sql[256] = {0};
L
Liu Jicong 已提交
2655 2656
  sprintf(sql, "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName, req.tsColName,
          req.skey, req.tsColName, req.ekey);
wmmhello's avatar
wmmhello 已提交
2657 2658
  printf("delete sql:%s\n", sql);

L
Liu Jicong 已提交
2659 2660
  TAOS_RES*    res = taos_query(taos, sql);
  SRequestObj* pRequest = (SRequestObj*)res;
wmmhello's avatar
wmmhello 已提交
2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671
  code = pRequest->code;
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
    code = TSDB_CODE_SUCCESS;
  }
  taos_free_result(res);

end:
  tDecoderClear(&coder);
  return code;
}

2672 2673 2674 2675 2676 2677 2678 2679
static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
  SVAlterTbReq   req = {0};
  SDecoder       coder = {0};
  int32_t        code = TSDB_CODE_SUCCESS;
  SRequestObj*   pRequest = NULL;
  SQuery*        pQuery = NULL;
  SArray*        pArray = NULL;
  SVgDataBlocks* pVgData = NULL;
wmmhello's avatar
wmmhello 已提交
2680

2681
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2682 2683 2684 2685 2686

  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2687
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2688 2689 2690 2691
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2692
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2693 2694 2695 2696 2697 2698 2699 2700
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVAlterTbReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

  // do not deal TSDB_ALTER_TABLE_UPDATE_OPTIONS
2701
  if (req.action == TSDB_ALTER_TABLE_UPDATE_OPTIONS) {
wmmhello's avatar
wmmhello 已提交
2702 2703 2704
    goto end;
  }

2705
  STscObj*  pTscObj = pRequest->pTscObj;
2706
  SCatalog* pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2707 2708 2709 2710 2711 2712
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
2713 2714 2715
                           .requestId = pRequest->requestId,
                           .requestObjRefId = pRequest->self,
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
wmmhello's avatar
wmmhello 已提交
2716 2717

  SVgroupInfo pInfo = {0};
2718
  SName       pName = {0};
wmmhello's avatar
wmmhello 已提交
2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748 2749 2750 2751
  toName(pTscObj->acctId, pRequest->pDb, req.tbName, &pName);
  code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  pArray = taosArrayInit(1, sizeof(void*));
  if (NULL == pArray) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }

  pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
  if (NULL == pVgData) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  pVgData->vg = pInfo;
  pVgData->pData = taosMemoryMalloc(metaLen);
  if (NULL == pVgData->pData) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  memcpy(pVgData->pData, meta, metaLen);
  ((SMsgHead*)pVgData->pData)->vgId = htonl(pInfo.vgId);
  pVgData->size = metaLen;
  pVgData->numOfTables = 1;
  taosArrayPush(pArray, &pVgData);

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->msgType = TDMT_VND_ALTER_TABLE;
  pQuery->stableQuery = false;
2752
  pQuery->pRoot = nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT);
wmmhello's avatar
wmmhello 已提交
2753 2754 2755 2756 2757 2758

  code = rewriteToVnodeModifyOpStmt(pQuery, pArray);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
2759 2760
  launchQueryImpl(pRequest, pQuery, true, NULL);

wmmhello's avatar
wmmhello 已提交
2761
  pVgData = NULL;
2762 2763 2764
  pArray = NULL;
  code = pRequest->code;
  if (code == TSDB_CODE_VND_TABLE_NOT_EXIST) {
wmmhello's avatar
wmmhello 已提交
2765
    code = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2766
  }
wmmhello's avatar
wmmhello 已提交
2767

L
Liu Jicong 已提交
2768
  if (pRequest->code == TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2769
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
L
Liu Jicong 已提交
2770
    if (pRes->res != NULL) {
wmmhello's avatar
wmmhello 已提交
2771 2772 2773
      code = handleAlterTbExecRes(pRes->res, pCatalog);
    }
  }
2774
end:
wmmhello's avatar
wmmhello 已提交
2775
  taosArrayDestroy(pArray);
2776
  if (pVgData) taosMemoryFreeClear(pVgData->pData);
wmmhello's avatar
wmmhello 已提交
2777 2778 2779 2780 2781 2782 2783
  taosMemoryFreeClear(pVgData);
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  qDestroyQuery(pQuery);
  return code;
}

L
Liu Jicong 已提交
2784
typedef struct {
2785
  SVgroupInfo vg;
L
Liu Jicong 已提交
2786 2787
  void*       data;
} VgData;
2788 2789 2790 2791 2792 2793

static void destroyVgHash(void* data) {
  VgData* vgData = (VgData*)data;
  taosMemoryFreeClear(vgData->data);
}

L
Liu Jicong 已提交
2794 2795
int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) {
  int32_t     code = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2796
  STableMeta* pTableMeta = NULL;
L
Liu Jicong 已提交
2797
  SQuery*     pQuery = NULL;
wmmhello's avatar
wmmhello 已提交
2798 2799

  SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT);
L
Liu Jicong 已提交
2800
  if (!pRequest) {
wmmhello's avatar
wmmhello 已提交
2801 2802 2803
    uError("WriteRaw:createRequest error request is null");
    code = terrno;
    goto end;
2804 2805
  }

wmmhello's avatar
wmmhello 已提交
2806 2807 2808 2809 2810 2811 2812 2813 2814 2815
  if (!pRequest->pDb) {
    uError("WriteRaw:not use db");
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }

  SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
  strcpy(pName.dbname, pRequest->pDb);
  strcpy(pName.tname, tbname);

L
Liu Jicong 已提交
2816
  struct SCatalog* pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2817
  code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
L
Liu Jicong 已提交
2818
  if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839 2840 2841 2842
    uError("WriteRaw: get gatlog error");
    goto end;
  }

  SRequestConnInfo conn = {0};
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
  conn.requestId = pRequest->requestId;
  conn.requestObjRefId = pRequest->self;
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);

  SVgroupInfo vgData = {0};
  code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData);
  if (code != TSDB_CODE_SUCCESS) {
    uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbname);
    goto end;
  }

  code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
  if (code != TSDB_CODE_SUCCESS) {
    uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbname);
    goto end;
  }
  uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid);
  uint64_t uid = pTableMeta->uid;
L
Liu Jicong 已提交
2843
  int32_t  numOfCols = pTableMeta->tableInfo.numOfColumns;
wmmhello's avatar
wmmhello 已提交
2844 2845

  uint16_t fLen = 0;
L
Liu Jicong 已提交
2846 2847
  int32_t  rowSize = 0;
  int16_t  nVar = 0;
wmmhello's avatar
wmmhello 已提交
2848
  for (int i = 0; i < numOfCols; i++) {
L
Liu Jicong 已提交
2849
    SSchema* schema = pTableMeta->schema + i;
wmmhello's avatar
wmmhello 已提交
2850 2851
    fLen += TYPE_BYTES[schema->type];
    rowSize += schema->bytes;
L
Liu Jicong 已提交
2852 2853
    if (IS_VAR_DATA_TYPE(schema->type)) {
      nVar++;
wmmhello's avatar
wmmhello 已提交
2854 2855 2856 2857 2858 2859 2860 2861
    }
  }

  int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
                            (int32_t)TD_BITMAP_BYTES(numOfCols - 1);
  int32_t schemaLen = 0;
  int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;

L
Liu Jicong 已提交
2862
  int32_t     totalLen = sizeof(SSubmitReq) + submitLen;
wmmhello's avatar
wmmhello 已提交
2863 2864
  SSubmitReq* subReq = taosMemoryCalloc(1, totalLen);
  SSubmitBlk* blk = POINTER_SHIFT(subReq, sizeof(SSubmitReq));
L
Liu Jicong 已提交
2865 2866
  void*       blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk));
  STSRow*     rowData = POINTER_SHIFT(blkSchema, schemaLen);
wmmhello's avatar
wmmhello 已提交
2867 2868 2869 2870 2871 2872

  SRowBuilder rb = {0};
  tdSRowInit(&rb, pTableMeta->sversion);
  tdSRowSetTpInfo(&rb, numOfCols, fLen);
  int32_t dataLen = 0;

L
Liu Jicong 已提交
2873
  char*    pStart = pData + sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t));
wmmhello's avatar
wmmhello 已提交
2874 2875 2876
  int32_t* colLength = (int32_t*)pStart;
  pStart += sizeof(int32_t) * numOfCols;

L
Liu Jicong 已提交
2877
  SResultColumn* pCol = taosMemoryCalloc(numOfCols, sizeof(SResultColumn));
wmmhello's avatar
wmmhello 已提交
2878 2879 2880 2881 2882 2883 2884 2885 2886 2887 2888 2889 2890 2891 2892 2893 2894 2895

  for (int32_t i = 0; i < numOfCols; ++i) {
    if (IS_VAR_DATA_TYPE(pTableMeta->schema[i].type)) {
      pCol[i].offset = (int32_t*)pStart;
      pStart += rows * sizeof(int32_t);
    } else {
      pCol[i].nullbitmap = pStart;
      pStart += BitmapLen(rows);
    }

    pCol[i].pData = pStart;
    pStart += colLength[i];
  }

  for (int32_t j = 0; j < rows; j++) {
    tdSRowResetBuf(&rb, rowData);
    int32_t offset = 0;
    for (int32_t k = 0; k < numOfCols; k++) {
L
Liu Jicong 已提交
2896
      const SSchema* pColumn = &pTableMeta->schema[k];
wmmhello's avatar
wmmhello 已提交
2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914 2915 2916 2917 2918 2919 2920 2921 2922 2923 2924 2925 2926

      if (IS_VAR_DATA_TYPE(pColumn->type)) {
        if (pCol[k].offset[j] != -1) {
          char* data = pCol[k].pData + pCol[k].offset[j];
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k);
        } else {
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
        }
      } else {
        if (!colDataIsNull_f(pCol[k].nullbitmap, j)) {
          char* data = pCol[k].pData + pColumn->bytes * j;
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k);
        } else {
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
        }
      }

      offset += TYPE_BYTES[pColumn->type];
    }
    int32_t rowLen = TD_ROW_LEN(rowData);
    rowData = POINTER_SHIFT(rowData, rowLen);
    dataLen += rowLen;
  }

  taosMemoryFree(pCol);

  blk->uid = htobe64(uid);
  blk->suid = htobe64(suid);
  blk->sversion = htonl(pTableMeta->sversion);
  blk->schemaLen = htonl(schemaLen);
2927
  blk->numOfRows = htonl(rows);
wmmhello's avatar
wmmhello 已提交
2928 2929 2930 2931 2932 2933 2934 2935 2936 2937 2938 2939 2940
  blk->dataLen = htonl(dataLen);
  subReq->length = sizeof(SSubmitReq) + sizeof(SSubmitBlk) + schemaLen + dataLen;
  subReq->numOfBlocks = 1;

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  if (NULL == pQuery) {
    uError("create SQuery error");
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->haveResultSet = false;
  pQuery->msgType = TDMT_VND_SUBMIT;
L
Liu Jicong 已提交
2941
  pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
wmmhello's avatar
wmmhello 已提交
2942 2943 2944 2945 2946
  if (NULL == pQuery->pRoot) {
    uError("create pQuery->pRoot error");
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
L
Liu Jicong 已提交
2947
  SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
wmmhello's avatar
wmmhello 已提交
2948 2949 2950
  nodeStmt->payloadType = PAYLOAD_TYPE_KV;
  nodeStmt->pDataBlocks = taosArrayInit(1, POINTER_BYTES);

L
Liu Jicong 已提交
2951
  SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
wmmhello's avatar
wmmhello 已提交
2952 2953 2954 2955 2956 2957 2958 2959 2960 2961 2962 2963 2964
  if (NULL == dst) {
    code = TSDB_CODE_TSC_OUT_OF_MEMORY;
    goto end;
  }
  dst->vg = vgData;
  dst->numOfTables = subReq->numOfBlocks;
  dst->size = subReq->length;
  dst->pData = (char*)subReq;
  subReq->header.vgId = htonl(dst->vg.vgId);
  subReq->version = htonl(1);
  subReq->header.contLen = htonl(subReq->length);
  subReq->length = htonl(subReq->length);
  subReq->numOfBlocks = htonl(subReq->numOfBlocks);
L
Liu Jicong 已提交
2965
  subReq = NULL;  // no need free
wmmhello's avatar
wmmhello 已提交
2966 2967 2968 2969 2970 2971 2972 2973 2974 2975 2976
  taosArrayPush(nodeStmt->pDataBlocks, &dst);

  launchQueryImpl(pRequest, pQuery, true, NULL);
  code = pRequest->code;

end:
  taosMemoryFreeClear(pTableMeta);
  qDestroyQuery(pQuery);
  return code;
}

L
Liu Jicong 已提交
2977 2978 2979 2980
static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) {
  int32_t   code = TSDB_CODE_SUCCESS;
  SHashObj* pVgHash = NULL;
  SQuery*   pQuery = NULL;
wmmhello's avatar
wmmhello 已提交
2981
  SMqRspObj rspObj = {0};
L
Liu Jicong 已提交
2982
  SDecoder  decoder = {0};
2983 2984 2985

  terrno = TSDB_CODE_SUCCESS;
  SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT);
L
Liu Jicong 已提交
2986
  if (!pRequest) {
2987 2988 2989 2990
    uError("WriteRaw:createRequest error request is null");
    return terrno;
  }

wmmhello's avatar
wmmhello 已提交
2991 2992 2993 2994 2995
  rspObj.resIter = -1;
  rspObj.resType = RES_TYPE__TMQ;

  tDecoderInit(&decoder, data, dataLen);
  code = tDecodeSMqDataRsp(&decoder, &rspObj.rsp);
L
Liu Jicong 已提交
2996
  if (code != 0) {
wmmhello's avatar
wmmhello 已提交
2997 2998 2999 3000 3001
    uError("WriteRaw:decode smqDataRsp error");
    code = TSDB_CODE_INVALID_MSG;
    goto end;
  }

3002 3003 3004 3005 3006 3007 3008 3009
  if (!pRequest->pDb) {
    uError("WriteRaw:not use db");
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }

  pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
  taosHashSetFreeFp(pVgHash, destroyVgHash);
L
Liu Jicong 已提交
3010
  struct SCatalog* pCatalog = NULL;
3011
  code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
L
Liu Jicong 已提交
3012
  if (code != TSDB_CODE_SUCCESS) {
3013 3014 3015 3016 3017 3018 3019 3020 3021
    uError("WriteRaw: get gatlog error");
    goto end;
  }

  SRequestConnInfo conn = {0};
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
  conn.requestId = pRequest->requestId;
  conn.requestObjRefId = pRequest->self;
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
wmmhello's avatar
wmmhello 已提交
3022 3023 3024 3025 3026 3027

  printf("raw data block num:%d\n", rspObj.rsp.blockNum);
  while (++rspObj.resIter < rspObj.rsp.blockNum) {
    SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter);
    if (!rspObj.rsp.withSchema) {
      uError("WriteRaw:no schema, iter:%d", rspObj.resIter);
3028 3029
      goto end;
    }
wmmhello's avatar
wmmhello 已提交
3030 3031
    SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter);
    setResSchemaInfo(&rspObj.resInfo, pSW->pSchema, pSW->nCols);
3032

wmmhello's avatar
wmmhello 已提交
3033
    code = setQueryResultFromRsp(&rspObj.resInfo, pRetrieve, false, false);
L
Liu Jicong 已提交
3034
    if (code != TSDB_CODE_SUCCESS) {
3035 3036 3037 3038 3039
      uError("WriteRaw: setQueryResultFromRsp error");
      goto end;
    }

    uint16_t fLen = 0;
L
Liu Jicong 已提交
3040 3041
    int32_t  rowSize = 0;
    int16_t  nVar = 0;
3042
    for (int i = 0; i < pSW->nCols; i++) {
L
Liu Jicong 已提交
3043
      SSchema* schema = pSW->pSchema + i;
3044 3045
      fLen += TYPE_BYTES[schema->type];
      rowSize += schema->bytes;
L
Liu Jicong 已提交
3046 3047
      if (IS_VAR_DATA_TYPE(schema->type)) {
        nVar++;
3048 3049 3050
      }
    }

wmmhello's avatar
wmmhello 已提交
3051
    int32_t rows = rspObj.resInfo.numOfRows;
3052 3053 3054 3055 3056
    int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
                              (int32_t)TD_BITMAP_BYTES(pSW->nCols - 1);
    int32_t schemaLen = 0;
    int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;

wmmhello's avatar
wmmhello 已提交
3057
    const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter);
L
Liu Jicong 已提交
3058
    if (!tbName) {
3059 3060 3061 3062 3063
      uError("WriteRaw: tbname is null");
      code = TSDB_CODE_TMQ_INVALID_MSG;
      goto end;
    }

3064
    printf("raw data tbname:%s\n", tbName);
3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077
    SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
    strcpy(pName.dbname, pRequest->pDb);
    strcpy(pName.tname, tbName);

    VgData vgData = {0};
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &(vgData.vg));
    if (code != TSDB_CODE_SUCCESS) {
      uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName);
      goto end;
    }

    SSubmitReq* subReq = NULL;
    SSubmitBlk* blk = NULL;
L
Liu Jicong 已提交
3078 3079
    void*       hData = taosHashGet(pVgHash, &vgData.vg.vgId, sizeof(vgData.vg.vgId));
    if (hData) {
3080 3081 3082
      vgData = *(VgData*)hData;

      int32_t totalLen = ((SSubmitReq*)(vgData.data))->length + submitLen;
L
Liu Jicong 已提交
3083
      void*   tmp = taosMemoryRealloc(vgData.data, totalLen);
3084 3085 3086 3087 3088 3089 3090 3091
      if (tmp == NULL) {
        code = TSDB_CODE_TSC_OUT_OF_MEMORY;
        goto end;
      }
      vgData.data = tmp;
      ((VgData*)hData)->data = tmp;
      subReq = (SSubmitReq*)(vgData.data);
      blk = POINTER_SHIFT(vgData.data, subReq->length);
L
Liu Jicong 已提交
3092
    } else {
3093
      int32_t totalLen = sizeof(SSubmitReq) + submitLen;
L
Liu Jicong 已提交
3094
      void*   tmp = taosMemoryCalloc(1, totalLen);
3095 3096 3097 3098 3099
      if (tmp == NULL) {
        code = TSDB_CODE_TSC_OUT_OF_MEMORY;
        goto end;
      }
      vgData.data = tmp;
L
Liu Jicong 已提交
3100
      taosHashPut(pVgHash, (const char*)&vgData.vg.vgId, sizeof(vgData.vg.vgId), (char*)&vgData, sizeof(vgData));
3101 3102 3103 3104 3105 3106 3107
      subReq = (SSubmitReq*)(vgData.data);
      subReq->length = sizeof(SSubmitReq);
      subReq->numOfBlocks = 0;

      blk = POINTER_SHIFT(vgData.data, sizeof(SSubmitReq));
    }

3108 3109
    STableMeta* pTableMeta = NULL;
    code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
3110 3111 3112 3113
    if (code != TSDB_CODE_SUCCESS) {
      uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbName);
      goto end;
    }
3114 3115 3116
    uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid);
    uint64_t uid = pTableMeta->uid;
    taosMemoryFreeClear(pTableMeta);
3117

L
Liu Jicong 已提交
3118
    void*   blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk));
3119 3120 3121 3122 3123 3124 3125 3126 3127 3128
    STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);

    SRowBuilder rb = {0};
    tdSRowInit(&rb, pSW->version);
    tdSRowSetTpInfo(&rb, pSW->nCols, fLen);
    int32_t dataLen = 0;

    for (int32_t j = 0; j < rows; j++) {
      tdSRowResetBuf(&rb, rowData);

wmmhello's avatar
wmmhello 已提交
3129 3130
      doSetOneRowPtr(&rspObj.resInfo);
      rspObj.resInfo.current += 1;
3131 3132 3133

      int32_t offset = 0;
      for (int32_t k = 0; k < pSW->nCols; k++) {
L
Liu Jicong 已提交
3134 3135
        const SSchema* pColumn = &pSW->pSchema[k];
        char*          data = rspObj.resInfo.row[k];
3136 3137 3138
        if (!data) {
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
        } else {
L
Liu Jicong 已提交
3139
          if (IS_VAR_DATA_TYPE(pColumn->type)) {
3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154
            data -= VARSTR_HEADER_SIZE;
          }
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k);
        }
        offset += TYPE_BYTES[pColumn->type];
      }
      int32_t rowLen = TD_ROW_LEN(rowData);
      rowData = POINTER_SHIFT(rowData, rowLen);
      dataLen += rowLen;
    }

    blk->uid = htobe64(uid);
    blk->suid = htobe64(suid);
    blk->sversion = htonl(pSW->version);
    blk->schemaLen = htonl(schemaLen);
3155
    blk->numOfRows = htonl(rows);
3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166 3167 3168 3169
    blk->dataLen = htonl(dataLen);
    subReq->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
    subReq->numOfBlocks++;
  }

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  if (NULL == pQuery) {
    uError("create SQuery error");
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->haveResultSet = false;
  pQuery->msgType = TDMT_VND_SUBMIT;
L
Liu Jicong 已提交
3170
  pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
3171 3172 3173 3174 3175
  if (NULL == pQuery->pRoot) {
    uError("create pQuery->pRoot error");
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
L
Liu Jicong 已提交
3176
  SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
3177 3178 3179 3180 3181
  nodeStmt->payloadType = PAYLOAD_TYPE_KV;

  int32_t numOfVg = taosHashGetSize(pVgHash);
  nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);

L
Liu Jicong 已提交
3182
  VgData* vData = (VgData*)taosHashIterate(pVgHash, NULL);
3183
  while (vData) {
L
Liu Jicong 已提交
3184
    SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
3185 3186 3187 3188
    if (NULL == dst) {
      code = TSDB_CODE_TSC_OUT_OF_MEMORY;
      goto end;
    }
3189 3190
    dst->vg = vData->vg;
    SSubmitReq* subReq = (SSubmitReq*)(vData->data);
3191 3192 3193
    dst->numOfTables = subReq->numOfBlocks;
    dst->size = subReq->length;
    dst->pData = (char*)subReq;
L
Liu Jicong 已提交
3194
    vData->data = NULL;  // no need free
3195 3196 3197 3198 3199 3200
    subReq->header.vgId = htonl(dst->vg.vgId);
    subReq->version = htonl(1);
    subReq->header.contLen = htonl(subReq->length);
    subReq->length = htonl(subReq->length);
    subReq->numOfBlocks = htonl(subReq->numOfBlocks);
    taosArrayPush(nodeStmt->pDataBlocks, &dst);
L
Liu Jicong 已提交
3201
    vData = (VgData*)taosHashIterate(pVgHash, vData);
3202 3203 3204 3205
  }

  launchQueryImpl(pRequest, pQuery, true, NULL);
  code = pRequest->code;
wmmhello's avatar
wmmhello 已提交
3206

3207
end:
wmmhello's avatar
wmmhello 已提交
3208 3209
  tDecoderClear(&decoder);
  taos_free_result(&rspObj);
3210 3211 3212 3213
  qDestroyQuery(pQuery);
  destroyRequest(pRequest);
  taosHashCleanup(pVgHash);
  return code;
wmmhello's avatar
wmmhello 已提交
3214 3215
}

wmmhello's avatar
wmmhello 已提交
3216 3217 3218 3219 3220 3221 3222 3223 3224 3225 3226 3227 3228 3229 3230 3231 3232 3233 3234 3235 3236 3237 3238 3239
char* tmq_get_json_meta(TAOS_RES* res) {
  if (!TD_RES_TMQ_META(res)) {
    return NULL;
  }

  SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
  if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB) {
    return processCreateStb(&pMetaRspObj->metaRsp);
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_STB) {
    return processAlterStb(&pMetaRspObj->metaRsp);
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_STB) {
    return processDropSTable(&pMetaRspObj->metaRsp);
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_TABLE) {
    return processCreateTable(&pMetaRspObj->metaRsp);
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_TABLE) {
    return processAlterTable(&pMetaRspObj->metaRsp);
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE) {
    return processDropTable(&pMetaRspObj->metaRsp);
  }
  return NULL;
}

void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }

L
Liu Jicong 已提交
3240 3241
int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
  if (!raw || !res) {
wmmhello's avatar
wmmhello 已提交
3242 3243 3244 3245 3246 3247 3248
    return TSDB_CODE_INVALID_PARA;
  }
  if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    raw->raw = pMetaRspObj->metaRsp.metaRsp;
    raw->raw_len = pMetaRspObj->metaRsp.metaRspLen;
    raw->raw_type = pMetaRspObj->metaRsp.resMsgType;
L
Liu Jicong 已提交
3249 3250
  } else if (TD_RES_TMQ(res)) {
    SMqRspObj* rspObj = ((SMqRspObj*)res);
wmmhello's avatar
wmmhello 已提交
3251 3252 3253 3254 3255 3256 3257 3258

    int32_t len = 0;
    int32_t code = 0;
    tEncodeSize(tEncodeSMqDataRsp, &rspObj->rsp, len, code);
    if (code < 0) {
      return -1;
    }

L
Liu Jicong 已提交
3259
    void*    buf = taosMemoryCalloc(1, len);
wmmhello's avatar
wmmhello 已提交
3260 3261 3262 3263 3264 3265 3266 3267 3268 3269 3270 3271 3272 3273 3274
    SEncoder encoder = {0};
    tEncoderInit(&encoder, buf, len);
    tEncodeSMqDataRsp(&encoder, &rspObj->rsp);
    tEncoderClear(&encoder);

    raw->raw = buf;
    raw->raw_len = len;
    raw->raw_type = RES_TYPE__TMQ;
  } else {
    return TSDB_CODE_TMQ_INVALID_MSG;
  }
  return TSDB_CODE_SUCCESS;
}

void tmq_free_raw(tmq_raw_data raw) {
L
Liu Jicong 已提交
3275
  if (raw.raw_type == RES_TYPE__TMQ) {
wmmhello's avatar
wmmhello 已提交
3276 3277 3278 3279
    taosMemoryFree(raw.raw);
  }
}

L
Liu Jicong 已提交
3280
int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
wmmhello's avatar
wmmhello 已提交
3281 3282 3283 3284
  if (!taos) {
    return TSDB_CODE_INVALID_PARA;
  }

L
Liu Jicong 已提交
3285
  if (raw.raw_type == TDMT_VND_CREATE_STB) {
wmmhello's avatar
wmmhello 已提交
3286
    return taosCreateStb(taos, raw.raw, raw.raw_len);
L
Liu Jicong 已提交
3287
  } else if (raw.raw_type == TDMT_VND_ALTER_STB) {
wmmhello's avatar
wmmhello 已提交
3288
    return taosCreateStb(taos, raw.raw, raw.raw_len);
L
Liu Jicong 已提交
3289
  } else if (raw.raw_type == TDMT_VND_DROP_STB) {
wmmhello's avatar
wmmhello 已提交
3290
    return taosDropStb(taos, raw.raw, raw.raw_len);
L
Liu Jicong 已提交
3291
  } else if (raw.raw_type == TDMT_VND_CREATE_TABLE) {
wmmhello's avatar
wmmhello 已提交
3292
    return taosCreateTable(taos, raw.raw, raw.raw_len);
L
Liu Jicong 已提交
3293
  } else if (raw.raw_type == TDMT_VND_ALTER_TABLE) {
wmmhello's avatar
wmmhello 已提交
3294
    return taosAlterTable(taos, raw.raw, raw.raw_len);
L
Liu Jicong 已提交
3295
  } else if (raw.raw_type == TDMT_VND_DROP_TABLE) {
wmmhello's avatar
wmmhello 已提交
3296
    return taosDropTable(taos, raw.raw, raw.raw_len);
L
Liu Jicong 已提交
3297
  } else if (raw.raw_type == TDMT_VND_DELETE) {
wmmhello's avatar
wmmhello 已提交
3298
    return taosDeleteData(taos, raw.raw, raw.raw_len);
L
Liu Jicong 已提交
3299
  } else if (raw.raw_type == RES_TYPE__TMQ) {
wmmhello's avatar
wmmhello 已提交
3300 3301 3302 3303 3304
    return tmqWriteRaw(taos, raw.raw, raw.raw_len);
  }
  return TSDB_CODE_INVALID_PARA;
}

L
Liu Jicong 已提交
3305
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
3306
  //
L
Liu Jicong 已提交
3307
  tmqCommitInner(tmq, msg, 0, 1, cb, param);
L
Liu Jicong 已提交
3308 3309
}

3310 3311
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) {
  //
L
Liu Jicong 已提交
3312
  return tmqCommitInner(tmq, msg, 0, 0, NULL, NULL);
3313
}