tmq.c 100.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "cJSON.h"
17 18 19
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22 23
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
24
#include "tqueue.h"
25
#include "tref.h"
L
Liu Jicong 已提交
26 27
#include "ttimer.h"

L
Liu Jicong 已提交
28 29 30 31 32 33 34 35
int32_t tmqAskEp(tmq_t* tmq, bool async);

typedef struct {
  int8_t inited;
  tmr_h  timer;
} SMqMgmt;

static SMqMgmt tmqMgmt = {0};
36

L
Liu Jicong 已提交
37 38 39 40 41 42
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
43 44 45
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
46 47
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
48
struct tmq_list_t {
L
Liu Jicong 已提交
49
  SArray container;
L
Liu Jicong 已提交
50
};
L
Liu Jicong 已提交
51

L
Liu Jicong 已提交
52
struct tmq_conf_t {
53 54 55 56 57 58 59 60 61 62
  char    clientId[256];
  char    groupId[TSDB_CGROUP_LEN];
  int8_t  autoCommit;
  int8_t  resetOffset;
  int8_t  withTbName;
  int8_t  ssEnable;
  int32_t ssBatchSize;

  bool hbBgEnable;

63 64 65 66 67
  uint16_t       port;
  int32_t        autoCommitInterval;
  char*          ip;
  char*          user;
  char*          pass;
68
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
69
  void*          commitCbUserParam;
L
Liu Jicong 已提交
70 71 72
};

struct tmq_t {
L
Liu Jicong 已提交
73
  // conf
74 75 76 77 78 79 80 81 82 83 84
  char    groupId[TSDB_CGROUP_LEN];
  char    clientId[256];
  int8_t  withTbName;
  int8_t  useSnapshot;
  int8_t  autoCommit;
  int32_t autoCommitInterval;
  int32_t resetOffsetCfg;
  int64_t consumerId;

  bool hbBgEnable;

L
Liu Jicong 已提交
85 86
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
87 88 89 90

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
91 92
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
93
  int32_t epSkipCnt;
L
Liu Jicong 已提交
94
#endif
L
Liu Jicong 已提交
95 96
  int64_t pollCnt;

L
Liu Jicong 已提交
97
  // timer
98 99
  tmr_h hbLiveTimer;
  tmr_h epTimer;
L
Liu Jicong 已提交
100 101 102
  tmr_h reportTimer;
  tmr_h commitTimer;

L
Liu Jicong 已提交
103 104 105 106
  // connection
  STscObj* pTscObj;

  // container
L
Liu Jicong 已提交
107
  SArray*     clientTopics;  // SArray<SMqClientTopic>
L
Liu Jicong 已提交
108
  STaosQueue* mqueue;        // queue of rsp
L
Liu Jicong 已提交
109
  STaosQall*  qall;
L
Liu Jicong 已提交
110 111 112 113
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit

  // ctl
  tsem_t rspSem;
L
Liu Jicong 已提交
114 115
};

X
Xiaoyu Wang 已提交
116 117 118 119 120 121 122 123
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
124
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
125
  TMQ_CONSUMER_STATUS__RECOVER,
L
Liu Jicong 已提交
126 127
};

L
Liu Jicong 已提交
128
enum {
129
  TMQ_DELAYED_TASK__ASK_EP = 1,
L
Liu Jicong 已提交
130 131 132 133
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

L
Liu Jicong 已提交
134
typedef struct {
135 136 137
  // statistics
  int64_t pollCnt;
  // offset
L
Liu Jicong 已提交
138 139
  STqOffsetVal committedOffset;
  STqOffsetVal currentOffset;
L
Liu Jicong 已提交
140
  // connection info
141
  int32_t vgId;
X
Xiaoyu Wang 已提交
142
  int32_t vgStatus;
L
Liu Jicong 已提交
143
  int32_t vgSkipCnt;
144 145 146
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
147
typedef struct {
148
  // subscribe info
L
Liu Jicong 已提交
149
  char* topicName;
L
Liu Jicong 已提交
150
  char  db[TSDB_DB_FNAME_LEN];
L
Liu Jicong 已提交
151 152 153

  SArray* vgs;  // SArray<SMqClientVg>

L
Liu Jicong 已提交
154
  SSchemaWrapper schema;
155 156
} SMqClientTopic;

L
Liu Jicong 已提交
157 158 159 160 161
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
162
  union {
L
Liu Jicong 已提交
163 164
    SMqDataRsp dataRsp;
    SMqMetaRsp metaRsp;
L
Liu Jicong 已提交
165
  };
L
Liu Jicong 已提交
166 167
} SMqPollRspWrapper;

L
Liu Jicong 已提交
168
typedef struct {
L
Liu Jicong 已提交
169 170 171
  tmq_t*  tmq;
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
172
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
173

L
Liu Jicong 已提交
174
typedef struct {
175
  tmq_t*  tmq;
L
Liu Jicong 已提交
176
  int32_t code;
L
Liu Jicong 已提交
177
  int32_t async;
X
Xiaoyu Wang 已提交
178
  tsem_t  rspSem;
179 180
} SMqAskEpCbParam;

L
Liu Jicong 已提交
181
typedef struct {
L
Liu Jicong 已提交
182 183
  tmq_t*          tmq;
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
184
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
185
  int32_t         epoch;
L
Liu Jicong 已提交
186
  int32_t         vgId;
L
Liu Jicong 已提交
187
  tsem_t          rspSem;
X
Xiaoyu Wang 已提交
188
} SMqPollCbParam;
189

190
typedef struct {
L
Liu Jicong 已提交
191 192 193
  tmq_t*         tmq;
  int8_t         automatic;
  int8_t         async;
L
Liu Jicong 已提交
194 195
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
L
Liu Jicong 已提交
196
  int32_t        rspErr;
197
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
198 199 200 201
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
  void*  userParam;
  tsem_t rspSem;
202 203 204 205 206
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
  STqOffset*           pOffset;
207
} SMqCommitCbParam;
208

209
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
210
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
211
  conf->withTbName = false;
L
Liu Jicong 已提交
212
  conf->autoCommit = true;
L
Liu Jicong 已提交
213
  conf->autoCommitInterval = 5000;
X
Xiaoyu Wang 已提交
214
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
215
  conf->hbBgEnable = true;
216 217 218
  return conf;
}

L
Liu Jicong 已提交
219
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
220 221 222 223 224 225
  if (conf) {
    if (conf->ip) taosMemoryFree(conf->ip);
    if (conf->user) taosMemoryFree(conf->user);
    if (conf->pass) taosMemoryFree(conf->pass);
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
226 227 228
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
229 230
  if (strcmp(key, "group.id") == 0) {
    strcpy(conf->groupId, value);
L
Liu Jicong 已提交
231
    return TMQ_CONF_OK;
232
  }
L
Liu Jicong 已提交
233

234 235
  if (strcmp(key, "client.id") == 0) {
    strcpy(conf->clientId, value);
L
Liu Jicong 已提交
236 237
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
238

L
Liu Jicong 已提交
239 240
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
241
      conf->autoCommit = true;
L
Liu Jicong 已提交
242 243
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
244
      conf->autoCommit = false;
L
Liu Jicong 已提交
245 246 247 248
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
249
  }
L
Liu Jicong 已提交
250

L
Liu Jicong 已提交
251 252 253 254 255
  if (strcmp(key, "auto.commit.interval.ms") == 0) {
    conf->autoCommitInterval = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
256 257 258 259 260 261 262 263 264 265 266 267 268 269
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
270

271 272
  if (strcmp(key, "msg.with.table.name") == 0) {
    if (strcmp(value, "true") == 0) {
273
      conf->withTbName = true;
L
Liu Jicong 已提交
274
      return TMQ_CONF_OK;
275
    } else if (strcmp(value, "false") == 0) {
276
      conf->withTbName = false;
L
Liu Jicong 已提交
277
      return TMQ_CONF_OK;
278 279 280 281 282
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
283
  if (strcmp(key, "experimental.snapshot.enable") == 0) {
L
Liu Jicong 已提交
284
    if (strcmp(value, "true") == 0) {
285 286 287 288 289 290 291 292 293 294 295 296 297
      conf->ssEnable = true;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
      conf->ssEnable = false;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

  if (strcmp(key, "enable.heartbeat.background") == 0) {
    if (strcmp(value, "true") == 0) {
      conf->hbBgEnable = true;
L
Liu Jicong 已提交
298 299
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
300
      conf->hbBgEnable = false;
L
Liu Jicong 已提交
301 302 303 304
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
305
    return TMQ_CONF_OK;
L
Liu Jicong 已提交
306 307
  }

L
Liu Jicong 已提交
308
  if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
309
    conf->ssBatchSize = atoi(value);
L
Liu Jicong 已提交
310 311 312
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
313
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
314 315 316
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
317
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
318 319 320
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
321
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
322 323 324
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
325
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
326 327 328
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
329
  if (strcmp(key, "td.connect.db") == 0) {
330
    /*conf->db = strdup(value);*/
L
Liu Jicong 已提交
331 332 333
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
334
  return TMQ_CONF_UNKNOWN;
335 336 337
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
338 339
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
340 341
}

L
Liu Jicong 已提交
342 343
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
344 345 346 347 348
  if (src == NULL || src[0] == 0) return -1;
  char* topic = strdup(src);
  if (topic[0] != '`') {
    strtolower(topic, src);
  }
L
fix  
Liu Jicong 已提交
349
  if (taosArrayPush(container, &topic) == NULL) return -1;
350 351 352
  return 0;
}

L
Liu Jicong 已提交
353
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
354
  SArray* container = &list->container;
L
Liu Jicong 已提交
355
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
356 357
}

L
Liu Jicong 已提交
358 359 360 361 362 363 364 365 366 367
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

L
Liu Jicong 已提交
368 369 370 371
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

372 373
int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
374 375
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
  // push into array
L
Liu Jicong 已提交
376
#if 0
377 378 379 380 381
  if (code == 0) {
    taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset);
  } else {
    taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
  }
L
Liu Jicong 已提交
382
#endif
L
Liu Jicong 已提交
383

S
Shengliang Guan 已提交
384
  /*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
L
Liu Jicong 已提交
385 386
   * pOffset->version);*/

387
  // count down waiting rsp
L
Liu Jicong 已提交
388
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
389 390 391 392 393 394 395
  ASSERT(waitingRspNum >= 0);

  if (waitingRspNum == 0) {
    // if no more waiting rsp
    if (pParamSet->async) {
      // call async cb func
      if (pParamSet->automatic && pParamSet->tmq->commitCb) {
L
Liu Jicong 已提交
396
        pParamSet->tmq->commitCb(pParamSet->tmq, pParamSet->rspErr, pParamSet->tmq->commitCbUserParam);
397 398
      } else if (!pParamSet->automatic && pParamSet->userCb) {
        // sem post
L
Liu Jicong 已提交
399
        pParamSet->userCb(pParamSet->tmq, pParamSet->rspErr, pParamSet->userParam);
400
      }
L
Liu Jicong 已提交
401 402
    } else {
      tsem_post(&pParamSet->rspSem);
403 404
    }

L
Liu Jicong 已提交
405
#if 0
406 407
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
L
Liu Jicong 已提交
408
#endif
409 410 411 412
  }
  return 0;
}

L
Liu Jicong 已提交
413 414 415 416 417 418
static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pTopic, SMqCommitCbParamSet* pParamSet) {
  STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
  if (pOffset == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
419
  pOffset->val = pVg->currentOffset;
420

L
Liu Jicong 已提交
421 422 423 424
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pOffset->subKey, tmq->groupId, groupLen);
  pOffset->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);
L
Liu Jicong 已提交
425

L
Liu Jicong 已提交
426 427 428 429 430 431 432 433 434 435
  int32_t len;
  int32_t code;
  tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
  if (code < 0) {
    ASSERT(0);
    return -1;
  }
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
  if (buf == NULL) return -1;
  ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
436

L
Liu Jicong 已提交
437 438 439 440 441 442 443
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
  tEncodeSTqOffset(&encoder, pOffset);

  // build param
444
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
445 446 447 448 449 450 451 452 453 454 455 456 457 458
  pParam->params = pParamSet;
  pParam->pOffset = pOffset;

  // build send info
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
    return -1;
  }
  pMsgSendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = sizeof(SMsgHead) + len,
      .handle = NULL,
  };

S
Shengliang Guan 已提交
459 460
  tscDebug("consumer:%" PRId64 ", commit offset of %s on vgId:%d, offset is %" PRId64, tmq->consumerId, pOffset->subKey,
           pVg->vgId, pOffset->val.version);
L
Liu Jicong 已提交
461 462

  // TODO: put into cb
L
Liu Jicong 已提交
463
  pVg->committedOffset = pVg->currentOffset;
L
Liu Jicong 已提交
464 465 466 467

  pMsgSendInfo->requestId = generateRequestId();
  pMsgSendInfo->requestObjRefId = 0;
  pMsgSendInfo->param = pParam;
L
Liu Jicong 已提交
468
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
469
  pMsgSendInfo->fp = tmqCommitCb;
L
Liu Jicong 已提交
470 471 472
  pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
  // send msg

L
Liu Jicong 已提交
473 474 475
  atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
  atomic_add_fetch_32(&pParamSet->totalRspNum, 1);

L
Liu Jicong 已提交
476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
  return 0;
}

int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
  char*   topic;
  int32_t vgId;
  ASSERT(msg != NULL);
  if (TD_RES_TMQ(msg)) {
    SMqRspObj* pRspObj = (SMqRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
  } else if (TD_RES_TMQ_META(msg)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
    topic = pMetaRspObj->topic;
    vgId = pMetaRspObj->vgId;
  } else {
    return TSDB_CODE_TMQ_INVALID_MSG;
  }

  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pParamSet->tmq = tmq;
  pParamSet->automatic = 0;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

L
Liu Jicong 已提交
509 510
  int32_t code = -1;

L
Liu Jicong 已提交
511 512 513 514 515 516 517
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(pTopic->topicName, topic) != 0) continue;
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      if (pVg->vgId != vgId) continue;

L
Liu Jicong 已提交
518
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
L
Liu Jicong 已提交
519 520
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          goto FAIL;
L
Liu Jicong 已提交
521
        }
L
Liu Jicong 已提交
522
        goto HANDLE_RSP;
L
Liu Jicong 已提交
523 524
      }
    }
L
Liu Jicong 已提交
525
  }
L
Liu Jicong 已提交
526

L
Liu Jicong 已提交
527 528 529 530
HANDLE_RSP:
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
531 532 533
    return 0;
  }

L
Liu Jicong 已提交
534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
    return code;
  } else {
    code = 0;
  }

FAIL:
  if (code != 0 && async) {
    userCb(tmq, code, userParam);
  }
  return 0;
}

L
Liu Jicong 已提交
550 551
int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                       void* userParam) {
L
Liu Jicong 已提交
552 553 554 555 556 557
  int32_t code = -1;

  if (msg != NULL) {
    return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
  }

558 559 560 561 562 563 564 565 566 567 568 569 570 571
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pParamSet->tmq = tmq;
  pParamSet->automatic = automatic;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
572

S
Shengliang Guan 已提交
573
    tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgNum %d", tmq->consumerId, pTopic->topicName,
L
Liu Jicong 已提交
574 575
             (int32_t)taosArrayGetSize(pTopic->vgs));

576
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
L
Liu Jicong 已提交
577 578
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);

S
Shengliang Guan 已提交
579 580
      tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgId:%d", tmq->consumerId, pTopic->topicName,
               pVg->vgId);
L
Liu Jicong 已提交
581

L
Liu Jicong 已提交
582 583 584
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
        tscDebug("consumer: %ld, vg:%d, current %ld, committed %ld", tmq->consumerId, pVg->vgId,
                 pVg->currentOffset.version, pVg->committedOffset.version);
L
Liu Jicong 已提交
585 586 587
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          continue;
        }
588 589 590 591
      }
    }
  }

L
Liu Jicong 已提交
592 593 594 595 596 597
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
    return 0;
  }

598 599 600 601 602 603 604 605 606 607
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
  } else {
    code = 0;
  }

  if (code != 0 && async) {
    if (automatic) {
L
Liu Jicong 已提交
608
      tmq->commitCb(tmq, code, tmq->commitCbUserParam);
609
    } else {
L
Liu Jicong 已提交
610
      userCb(tmq, code, userParam);
611 612 613
    }
  }

L
Liu Jicong 已提交
614
#if 0
615 616 617 618
  if (!async) {
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
  }
L
Liu Jicong 已提交
619
#endif
620 621 622 623

  return 0;
}

624
void tmqAssignAskEpTask(void* param, void* tmrId) {
L
Liu Jicong 已提交
625
  tmq_t*  tmq = (tmq_t*)param;
626
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
627
  *pTaskType = TMQ_DELAYED_TASK__ASK_EP;
L
Liu Jicong 已提交
628
  taosWriteQitem(tmq->delayedTask, pTaskType);
629
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
630 631 632 633
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
634
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
635 636
  *pTaskType = TMQ_DELAYED_TASK__COMMIT;
  taosWriteQitem(tmq->delayedTask, pTaskType);
637
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
638 639 640 641
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
642
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
643 644
  *pTaskType = TMQ_DELAYED_TASK__REPORT;
  taosWriteQitem(tmq->delayedTask, pTaskType);
645
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
646 647
}

648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
  if (pMsg && pMsg->pData) taosMemoryFree(pMsg->pData);
  return 0;
}

void tmqSendHbReq(void* param, void* tmrId) {
  // TODO replace with ref
  tmq_t*    tmq = (tmq_t*)param;
  int64_t   consumerId = tmq->consumerId;
  int32_t   epoch = tmq->epoch;
  SMqHbReq* pReq = taosMemoryMalloc(sizeof(SMqHbReq));
  if (pReq == NULL) goto OVER;
  pReq->consumerId = consumerId;
  pReq->epoch = epoch;

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pReq);
  }
  sendInfo->msgInfo = (SDataBuf){
      .pData = pReq,
      .len = sizeof(SMqHbReq),
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = NULL;
  sendInfo->fp = tmqHbCb;
  sendInfo->msgType = TDMT_MND_MQ_HB;

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

OVER:
  taosTmrReset(tmqSendHbReq, 1000, tmq, tmqMgmt.timer, &tmq->hbLiveTimer);
}

L
Liu Jicong 已提交
688 689 690 691 692 693 694 695
int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
  STaosQall* qall = taosAllocateQall();
  taosReadAllQitems(tmq->delayedTask, qall);
  while (1) {
    int8_t* pTaskType = NULL;
    taosGetQitem(qall, (void**)&pTaskType);
    if (pTaskType == NULL) break;

696
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
L
Liu Jicong 已提交
697
      tmqAskEp(tmq, true);
698
      taosTmrReset(tmqAssignAskEpTask, 1000, tmq, tmqMgmt.timer, &tmq->epTimer);
L
Liu Jicong 已提交
699
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
L
Liu Jicong 已提交
700
      tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
L
Liu Jicong 已提交
701 702 703 704 705
      taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer);
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    } else {
      ASSERT(0);
    }
L
Liu Jicong 已提交
706
    taosFreeQitem(pTaskType);
L
Liu Jicong 已提交
707 708 709 710 711
  }
  taosFreeQall(qall);
  return 0;
}

L
Liu Jicong 已提交
712
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
713
  SMqRspWrapper* msg = NULL;
L
Liu Jicong 已提交
714 715 716 717 718 719 720 721
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }

L
fix  
Liu Jicong 已提交
722
  msg = NULL;
L
Liu Jicong 已提交
723 724 725 726 727 728 729 730 731 732
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }
}

D
dapan1121 已提交
733
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
734 735
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
L
Liu Jicong 已提交
736
  /*tmq_t* tmq = pParam->tmq;*/
L
Liu Jicong 已提交
737 738 739
  tsem_post(&pParam->rspSem);
  return 0;
}
740

L
Liu Jicong 已提交
741
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
X
Xiaoyu Wang 已提交
742 743 744 745
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
746
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
747
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
748
  }
L
Liu Jicong 已提交
749
  return 0;
X
Xiaoyu Wang 已提交
750 751
}

L
Liu Jicong 已提交
752 753 754
int32_t tmq_unsubscribe(tmq_t* tmq) {
  tmq_list_t* lst = tmq_list_new();
  int32_t     rsp = tmq_subscribe(tmq, lst);
L
Liu Jicong 已提交
755 756
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
757 758
}

L
Liu Jicong 已提交
759
#if 0
760
tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
wafwerar's avatar
wafwerar 已提交
761
  tmq_t* pTmq = taosMemoryCalloc(sizeof(tmq_t), 1);
762 763 764 765 766 767
  if (pTmq == NULL) {
    return NULL;
  }
  pTmq->pTscObj = (STscObj*)conn;
  pTmq->status = 0;
  pTmq->pollCnt = 0;
L
Liu Jicong 已提交
768
  pTmq->epoch = 0;
L
fix txn  
Liu Jicong 已提交
769
  pTmq->epStatus = 0;
L
temp  
Liu Jicong 已提交
770
  pTmq->epSkipCnt = 0;
L
Liu Jicong 已提交
771
  // set conf
772 773
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
L
Liu Jicong 已提交
774
  pTmq->autoCommit = conf->autoCommit;
775
  pTmq->commit_cb = conf->commit_cb;
L
Liu Jicong 已提交
776
  pTmq->resetOffsetCfg = conf->resetOffset;
L
Liu Jicong 已提交
777

L
Liu Jicong 已提交
778 779 780 781 782 783 784 785 786 787
  pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  if (pTmq->clientTopics == NULL) {
    taosMemoryFree(pTmq);
    return NULL;
  }

  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();

L
Liu Jicong 已提交
788 789
  tsem_init(&pTmq->rspSem, 0, 0);

L
Liu Jicong 已提交
790 791
  return pTmq;
}
L
Liu Jicong 已提交
792
#endif
L
Liu Jicong 已提交
793

L
Liu Jicong 已提交
794
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
L
Liu Jicong 已提交
795 796 797 798 799 800 801 802 803 804 805
  // init timer
  int8_t inited = atomic_val_compare_exchange_8(&tmqMgmt.inited, 0, 1);
  if (inited == 0) {
    tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
    if (tmqMgmt.timer == NULL) {
      atomic_store_8(&tmqMgmt.inited, 0);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
  }

L
Liu Jicong 已提交
806 807
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
L
Liu Jicong 已提交
808
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
809 810
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
811 812
    return NULL;
  }
L
Liu Jicong 已提交
813

L
Liu Jicong 已提交
814 815 816 817 818
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

  ASSERT(user);
  ASSERT(pass);
L
Liu Jicong 已提交
819
  ASSERT(conf->groupId[0]);
L
Liu Jicong 已提交
820

L
Liu Jicong 已提交
821 822 823 824 825 826
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
  pTmq->delayedTask = taosOpenQueue();

  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
L
Liu Jicong 已提交
827
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
828 829
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
830 831
    goto FAIL;
  }
L
Liu Jicong 已提交
832

L
Liu Jicong 已提交
833 834
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
835 836
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
837 838
  /*pTmq->epStatus = 0;*/
  /*pTmq->epSkipCnt = 0;*/
L
Liu Jicong 已提交
839

L
Liu Jicong 已提交
840 841 842
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
843
  pTmq->withTbName = conf->withTbName;
844
  pTmq->useSnapshot = conf->ssEnable;
L
Liu Jicong 已提交
845
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
846
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
847 848
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
849 850
  pTmq->resetOffsetCfg = conf->resetOffset;

851 852
  pTmq->hbBgEnable = conf->hbBgEnable;

L
Liu Jicong 已提交
853
  // assign consumerId
L
Liu Jicong 已提交
854
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
855

L
Liu Jicong 已提交
856 857
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
S
Shengliang Guan 已提交
858 859
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
860 861
    goto FAIL;
  }
L
Liu Jicong 已提交
862

L
Liu Jicong 已提交
863 864 865
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
S
Shengliang Guan 已提交
866 867
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
868 869 870
    tsem_destroy(&pTmq->rspSem);
    goto FAIL;
  }
L
Liu Jicong 已提交
871

872 873 874 875
  if (pTmq->hbBgEnable) {
    pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pTmq, tmqMgmt.timer);
  }

S
Shengliang Guan 已提交
876
  tscInfo("consumer %" PRId64 " is setup, consumer group %s", pTmq->consumerId, pTmq->groupId);
L
Liu Jicong 已提交
877

878
  return pTmq;
L
Liu Jicong 已提交
879 880 881 882 883 884 885 886

FAIL:
  if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
  if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
  if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
  if (pTmq->qall) taosFreeQall(pTmq->qall);
  taosMemoryFree(pTmq);
  return NULL;
887 888
}

L
Liu Jicong 已提交
889
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
L
Liu Jicong 已提交
890 891 892 893 894
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
  SCMSubscribeReq req = {0};
  int32_t         code = -1;
895 896

  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
897
  tstrncpy(req.clientId, tmq->clientId, 256);
L
Liu Jicong 已提交
898
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
899
  req.topicNames = taosArrayInit(sz, sizeof(void*));
L
Liu Jicong 已提交
900
  if (req.topicNames == NULL) goto FAIL;
901

L
Liu Jicong 已提交
902 903
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
904 905

    SName name = {0};
L
Liu Jicong 已提交
906
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
907

L
Liu Jicong 已提交
908 909 910
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
911
    }
L
Liu Jicong 已提交
912
    tNameExtractFullName(&name, topicFName);
913

L
Liu Jicong 已提交
914 915 916
    tscDebug("subscribe topic: %s", topicFName);

    taosArrayPush(req.topicNames, &topicFName);
917 918
  }

L
Liu Jicong 已提交
919 920 921 922
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) goto FAIL;

923 924 925
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

926
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
927
  if (sendInfo == NULL) goto FAIL;
928

X
Xiaoyu Wang 已提交
929
  SMqSubscribeCbParam param = {
L
Liu Jicong 已提交
930
      .rspErr = 0,
X
Xiaoyu Wang 已提交
931 932
      .tmq = tmq,
  };
L
Liu Jicong 已提交
933

L
Liu Jicong 已提交
934 935 936
  if (tsem_init(&param.rspSem, 0, 0) != 0) goto FAIL;

  sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
937 938 939 940
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
941

L
Liu Jicong 已提交
942 943
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
944 945
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
946 947
  sendInfo->msgType = TDMT_MND_SUBSCRIBE;

948 949 950 951 952
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
953 954 955
  // avoid double free if msg is sent
  buf = NULL;

L
Liu Jicong 已提交
956 957
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
958

L
Liu Jicong 已提交
959 960 961
  code = param.rspErr;
  if (code != 0) goto FAIL;

L
Liu Jicong 已提交
962
  int32_t retryCnt = 0;
L
Liu Jicong 已提交
963
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
L
Liu Jicong 已提交
964 965 966
    if (retryCnt++ > 10) {
      goto FAIL;
    }
L
fix  
Liu Jicong 已提交
967
    tscDebug("consumer not ready, retry");
L
Liu Jicong 已提交
968 969
    taosMsleep(500);
  }
970

971 972 973
  // init ep timer
  if (tmq->epTimer == NULL) {
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, tmq, tmqMgmt.timer);
974
  }
L
Liu Jicong 已提交
975 976

  // init auto commit timer
977
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
L
Liu Jicong 已提交
978 979 980
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer);
  }

L
Liu Jicong 已提交
981 982 983
  code = 0;
FAIL:
  if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
984
  if (code != 0 && buf) {
L
Liu Jicong 已提交
985 986 987
    taosMemoryFree(buf);
  }
  return code;
988 989
}

L
Liu Jicong 已提交
990
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
991
  //
992
  conf->commitCb = cb;
L
Liu Jicong 已提交
993
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
994
}
995

D
dapan1121 已提交
996
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
997 998
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
999
  SMqClientTopic* pTopic = pParam->pTopic;
X
Xiaoyu Wang 已提交
1000
  tmq_t*          tmq = pParam->tmq;
L
Liu Jicong 已提交
1001 1002 1003
  int32_t         vgId = pParam->vgId;
  int32_t         epoch = pParam->epoch;
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
1004
  if (code != 0) {
L
Liu Jicong 已提交
1005
    tscWarn("msg discard from vgId:%d, epoch %d, since %s", vgId, epoch, terrstr());
L
Liu Jicong 已提交
1006
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1007 1008 1009 1010
    if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
      goto CREATE_MSG_FAIL;
    }
L
Liu Jicong 已提交
1011 1012 1013 1014
    if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
      if (pRspWrapper == NULL) {
        taosMemoryFree(pMsg->pData);
S
Shengliang Guan 已提交
1015
        tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
Liu Jicong 已提交
1016 1017 1018 1019 1020 1021 1022 1023
        goto CREATE_MSG_FAIL;
      }
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      /*pRspWrapper->vgHandle = pVg;*/
      /*pRspWrapper->topicHandle = pTopic;*/
      taosWriteQitem(tmq->mqueue, pRspWrapper);
      tsem_post(&tmq->rspSem);
    }
L
fix txn  
Liu Jicong 已提交
1024
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1025 1026
  }

X
Xiaoyu Wang 已提交
1027 1028 1029
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
1030
    // do not write into queue since updating epoch reset
S
Shengliang Guan 已提交
1031
    tscWarn("msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d", vgId, msgEpoch,
L
Liu Jicong 已提交
1032
            tmqEpoch);
1033
    tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1034
    taosMemoryFree(pMsg->pData);
X
Xiaoyu Wang 已提交
1035 1036 1037 1038
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
S
Shengliang Guan 已提交
1039
    tscWarn("mismatch rsp from vgId:%d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
X
Xiaoyu Wang 已提交
1040 1041
  }

L
Liu Jicong 已提交
1042 1043 1044
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

1045
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1046
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1047
    taosMemoryFree(pMsg->pData);
S
Shengliang Guan 已提交
1048
    tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
fix txn  
Liu Jicong 已提交
1049
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1050
  }
L
Liu Jicong 已提交
1051

L
Liu Jicong 已提交
1052
  pRspWrapper->tmqRspType = rspType;
L
Liu Jicong 已提交
1053 1054
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
1055

L
Liu Jicong 已提交
1056
  if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1057 1058 1059
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
wmmhello's avatar
wmmhello 已提交
1060
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1061
    memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1062 1063 1064
  } else {
    ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP);
    tDecodeSMqMetaRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->metaRsp);
L
Liu Jicong 已提交
1065
    memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1066
  }
L
Liu Jicong 已提交
1067

L
Liu Jicong 已提交
1068
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1069

S
Shengliang Guan 已提交
1070 1071 1072
  tscDebug("consumer:%" PRId64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d",
           tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version,
           rspType);
L
fix  
Liu Jicong 已提交
1073

L
Liu Jicong 已提交
1074
  taosWriteQitem(tmq->mqueue, pRspWrapper);
1075
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1076

L
Liu Jicong 已提交
1077
  return 0;
L
fix txn  
Liu Jicong 已提交
1078
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1079
  if (epoch == tmq->epoch) {
L
Liu Jicong 已提交
1080 1081
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
1082
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1083
  return -1;
1084 1085
}

L
Liu Jicong 已提交
1086
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
1087 1088 1089 1090
  bool set = false;

  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
S
Shengliang Guan 已提交
1091
  tscDebug("consumer:%" PRId64 ", update ep epoch %d to epoch %d, topic num:%d", tmq->consumerId, tmq->epoch, epoch,
1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109
           topicNumGet);

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
S
Shengliang Guan 已提交
1110
      tscDebug("consumer:%" PRId64 ", new vg num: %d", tmq->consumerId, vgNumCur);
1111 1112 1113
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
        sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
L
Liu Jicong 已提交
1114
        char buf[80];
L
Liu Jicong 已提交
1115
        tFormatOffset(buf, 80, &pVgCur->currentOffset);
L
Liu Jicong 已提交
1116 1117
        tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch,
                 pVgCur->vgId, vgKey, buf);
L
Liu Jicong 已提交
1118
        taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal));
1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
    topic.schema = pTopicEp->schema;
    topic.topicName = strdup(pTopicEp->topic);
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);

S
Shengliang Guan 已提交
1130
    tscDebug("consumer:%" PRId64 ", update topic: %s", tmq->consumerId, topic.topicName);
1131 1132 1133 1134 1135 1136

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
L
Liu Jicong 已提交
1137 1138
      STqOffsetVal* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      STqOffsetVal  offsetNew = {.type = tmq->resetOffsetCfg};
1139
      if (pOffset != NULL) {
L
Liu Jicong 已提交
1140
        offsetNew = *pOffset;
1141 1142 1143 1144
      }

      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1145
          .currentOffset = offsetNew,
X
Xiaoyu Wang 已提交
1146 1147 1148
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
L
Liu Jicong 已提交
1149
          .vgSkipCnt = 0,
X
Xiaoyu Wang 已提交
1150 1151 1152 1153
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
L
Liu Jicong 已提交
1154
    taosArrayPush(newTopics, &topic);
X
Xiaoyu Wang 已提交
1155
  }
L
Liu Jicong 已提交
1156
  if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
L
Liu Jicong 已提交
1157
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
1158
  tmq->clientTopics = newTopics;
1159

1160 1161 1162 1163
  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
1164

X
Xiaoyu Wang 已提交
1165 1166 1167 1168
  atomic_store_32(&tmq->epoch, epoch);
  return set;
}

D
dapan1121 已提交
1169
int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
1170
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1171
  tmq_t*           tmq = pParam->tmq;
L
Liu Jicong 已提交
1172
  int8_t           async = pParam->async;
L
Liu Jicong 已提交
1173
  pParam->code = code;
1174
  if (code != 0) {
S
Shengliang Guan 已提交
1175
    tscError("consumer:%" PRId64 ", get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->async);
L
Liu Jicong 已提交
1176
    goto END;
1177
  }
L
Liu Jicong 已提交
1178

L
Liu Jicong 已提交
1179
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1180
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1181
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1182 1183
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
S
Shengliang Guan 已提交
1184
  tscDebug("consumer:%" PRId64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1185 1186
  if (head->epoch <= epoch) {
    goto END;
1187
  }
L
Liu Jicong 已提交
1188

L
Liu Jicong 已提交
1189
  if (!async) {
L
Liu Jicong 已提交
1190 1191
    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
S
Shengliang Guan 已提交
1192 1193
    /*printf("rsp epoch %" PRId64 " sz %" PRId64 "\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %" PRId64 " sz %" PRId64 "\n", tmq->epoch, tmq->clientTopics->size);*/
L
Liu Jicong 已提交
1194
    tmqUpdateEp(tmq, head->epoch, &rsp);
L
Liu Jicong 已提交
1195
    tDeleteSMqAskEpRsp(&rsp);
X
Xiaoyu Wang 已提交
1196
  } else {
1197
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1198
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1199
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1200 1201
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1202
    }
L
Liu Jicong 已提交
1203 1204 1205
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1206
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1207

L
Liu Jicong 已提交
1208
    taosWriteQitem(tmq->mqueue, pWrapper);
1209
    tsem_post(&tmq->rspSem);
1210
  }
L
Liu Jicong 已提交
1211 1212

END:
L
Liu Jicong 已提交
1213
  /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1214
  if (!async) {
L
Liu Jicong 已提交
1215
    tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
1216 1217
  } else {
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
1218 1219
  }
  return code;
1220 1221
}

L
Liu Jicong 已提交
1222
int32_t tmqAskEp(tmq_t* tmq, bool async) {
L
Liu Jicong 已提交
1223
  int32_t code = 0;
L
Liu Jicong 已提交
1224
#if 0
L
Liu Jicong 已提交
1225
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
L
fix txn  
Liu Jicong 已提交
1226
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
1227
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
S
Shengliang Guan 已提交
1228
    tscTrace("consumer:%" PRId64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
1229
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
1230
  }
L
temp  
Liu Jicong 已提交
1231
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
1232
#endif
L
Liu Jicong 已提交
1233
  int32_t      tlen = sizeof(SMqAskEpReq);
L
Liu Jicong 已提交
1234
  SMqAskEpReq* req = taosMemoryCalloc(1, tlen);
L
Liu Jicong 已提交
1235
  if (req == NULL) {
L
Liu Jicong 已提交
1236
    tscError("failed to malloc get subscribe ep buf");
L
Liu Jicong 已提交
1237
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1238
    return -1;
L
Liu Jicong 已提交
1239
  }
L
Liu Jicong 已提交
1240 1241 1242
  req->consumerId = htobe64(tmq->consumerId);
  req->epoch = htonl(tmq->epoch);
  strcpy(req->cgroup, tmq->groupId);
1243

L
Liu Jicong 已提交
1244
  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
1245 1246
  if (pParam == NULL) {
    tscError("failed to malloc subscribe param");
wafwerar's avatar
wafwerar 已提交
1247
    taosMemoryFree(req);
L
Liu Jicong 已提交
1248
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1249
    return -1;
L
Liu Jicong 已提交
1250 1251
  }
  pParam->tmq = tmq;
L
Liu Jicong 已提交
1252
  pParam->async = async;
X
Xiaoyu Wang 已提交
1253
  tsem_init(&pParam->rspSem, 0, 0);
1254

1255
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1256 1257
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1258 1259
    taosMemoryFree(pParam);
    taosMemoryFree(req);
L
Liu Jicong 已提交
1260
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1261 1262 1263 1264 1265 1266 1267 1268 1269 1270
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
      .pData = req,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
L
Liu Jicong 已提交
1271 1272 1273
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1274
  sendInfo->msgType = TDMT_MND_MQ_ASK_EP;
1275

L
Liu Jicong 已提交
1276 1277
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

S
Shengliang Guan 已提交
1278
  tscDebug("consumer:%" PRId64 ", ask ep", tmq->consumerId);
L
add log  
Liu Jicong 已提交
1279

L
Liu Jicong 已提交
1280 1281
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1282

L
Liu Jicong 已提交
1283
  if (!async) {
L
Liu Jicong 已提交
1284 1285 1286 1287 1288
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }
  return code;
1289 1290
}

L
Liu Jicong 已提交
1291 1292
#if 0
int32_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
L
Liu Jicong 已提交
1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305
  const SMqOffset* pOffset = &offset->offset;
  if (strcmp(pOffset->cgroup, tmq->groupId) != 0) {
    return TMQ_RESP_ERR__FAIL;
  }
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* clientTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(clientTopic->topicName, pOffset->topicName) == 0) {
      int32_t vgSz = taosArrayGetSize(clientTopic->vgs);
      for (int32_t j = 0; j < vgSz; j++) {
        SMqClientVg* pVg = taosArrayGet(clientTopic->vgs, j);
        if (pVg->vgId == pOffset->vgId) {
          pVg->currentOffset = pOffset->offset;
L
Liu Jicong 已提交
1306
          tmqClearUnhandleMsg(tmq);
L
Liu Jicong 已提交
1307 1308 1309 1310 1311 1312 1313
          return TMQ_RESP_ERR__SUCCESS;
        }
      }
    }
  }
  return TMQ_RESP_ERR__FAIL;
}
L
Liu Jicong 已提交
1314
#endif
L
Liu Jicong 已提交
1315

1316
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1317
  SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq));
1318 1319 1320
  if (pReq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
1321

L
Liu Jicong 已提交
1322 1323 1324
  /*strcpy(pReq->topic, pTopic->topicName);*/
  /*strcpy(pReq->cgroup, tmq->groupId);*/

L
Liu Jicong 已提交
1325 1326 1327 1328
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1329

1330
  pReq->withTbName = tmq->withTbName;
1331
  pReq->timeout = timeout;
L
Liu Jicong 已提交
1332
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1333
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1334
  /*pReq->currentOffset = reqOffset;*/
L
Liu Jicong 已提交
1335
  pReq->reqOffset = pVg->currentOffset;
L
Liu Jicong 已提交
1336
  pReq->reqId = generateRequestId();
1337

L
Liu Jicong 已提交
1338 1339
  pReq->useSnapshot = tmq->useSnapshot;

1340
  pReq->head.vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
1341
  pReq->head.contLen = htonl(sizeof(SMqPollReq));
1342 1343 1344
  return pReq;
}

L
Liu Jicong 已提交
1345 1346
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
L
Liu Jicong 已提交
1347
  pRspObj->resType = RES_TYPE__TMQ_META;
L
Liu Jicong 已提交
1348 1349 1350 1351 1352 1353 1354 1355
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

L
Liu Jicong 已提交
1356 1357 1358
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1359 1360
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1361
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1362
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1363
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
L
Liu Jicong 已提交
1364

L
Liu Jicong 已提交
1365 1366
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
L
Liu Jicong 已提交
1367
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1368 1369
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1370

L
Liu Jicong 已提交
1371
  return pRspObj;
X
Xiaoyu Wang 已提交
1372 1373
}

1374
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1375
  /*tscDebug("call poll");*/
X
Xiaoyu Wang 已提交
1376 1377 1378 1379 1380 1381
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
      if (vgStatus != TMQ_VG_STATUS__IDLE) {
L
Liu Jicong 已提交
1382
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
L
Liu Jicong 已提交
1383 1384
        tscTrace("consumer:%" PRId64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId,
                 vgSkipCnt);
X
Xiaoyu Wang 已提交
1385
        continue;
L
Liu Jicong 已提交
1386
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1387 1388 1389 1390
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
S
Shengliang Guan 已提交
1391
        tscDebug("consumer:%" PRId64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId);
L
temp  
Liu Jicong 已提交
1392 1393
        }
#endif
X
Xiaoyu Wang 已提交
1394
      }
L
Liu Jicong 已提交
1395
      atomic_store_32(&pVg->vgSkipCnt, 0);
1396
      SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, timeout, pTopic, pVg);
X
Xiaoyu Wang 已提交
1397 1398
      if (pReq == NULL) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1399
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1400 1401
        return -1;
      }
wafwerar's avatar
wafwerar 已提交
1402
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1403
      if (pParam == NULL) {
wafwerar's avatar
wafwerar 已提交
1404
        taosMemoryFree(pReq);
X
Xiaoyu Wang 已提交
1405
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1406
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1407 1408
        return -1;
      }
L
Liu Jicong 已提交
1409 1410
      pParam->tmq = tmq;
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1411
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1412
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1413 1414
      pParam->epoch = tmq->epoch;

1415
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1416
      if (sendInfo == NULL) {
wafwerar's avatar
wafwerar 已提交
1417 1418
        taosMemoryFree(pReq);
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1419
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1420
        tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1421 1422 1423 1424
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1425
          .pData = pReq,
L
Liu Jicong 已提交
1426
          .len = sizeof(SMqPollReq),
X
Xiaoyu Wang 已提交
1427 1428
          .handle = NULL,
      };
L
Liu Jicong 已提交
1429
      sendInfo->requestId = pReq->reqId;
X
Xiaoyu Wang 已提交
1430
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1431
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1432
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1433
      sendInfo->msgType = TDMT_VND_CONSUME;
X
Xiaoyu Wang 已提交
1434 1435

      int64_t transporterId = 0;
L
fix  
Liu Jicong 已提交
1436
      /*printf("send poll\n");*/
L
Liu Jicong 已提交
1437

1438
      char offsetFormatBuf[80];
L
Liu Jicong 已提交
1439
      tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffset);
L
Liu Jicong 已提交
1440 1441
      tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64,
               tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId);
S
Shengliang Guan 已提交
1442
      /*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/
X
Xiaoyu Wang 已提交
1443 1444 1445 1446 1447 1448 1449 1450
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
  return 0;
}

L
Liu Jicong 已提交
1451 1452
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1453
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1454 1455
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1456
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
L
Liu Jicong 已提交
1457
      tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1458
      /*tmqClearUnhandleMsg(tmq);*/
X
Xiaoyu Wang 已提交
1459 1460 1461 1462 1463 1464 1465 1466 1467 1468
      *pReset = true;
    } else {
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
1469
void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
X
Xiaoyu Wang 已提交
1470
  while (1) {
L
Liu Jicong 已提交
1471 1472 1473
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1474
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1475 1476
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
      if (rspWrapper == NULL) return NULL;
X
Xiaoyu Wang 已提交
1477 1478
    }

L
Liu Jicong 已提交
1479 1480 1481 1482 1483
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(rspWrapper);
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
      return NULL;
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1484
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1485
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
1486
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1487
      if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1488
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1489
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1490
         * rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1491
        pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;
X
Xiaoyu Wang 已提交
1492
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1493
        if (pollRspWrapper->dataRsp.blockNum == 0) {
L
Liu Jicong 已提交
1494 1495
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1496 1497
          continue;
        }
L
Liu Jicong 已提交
1498
        // build rsp
L
Liu Jicong 已提交
1499
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1500
        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1501
        return pRsp;
X
Xiaoyu Wang 已提交
1502
      } else {
L
Liu Jicong 已提交
1503 1504 1505 1506 1507 1508 1509
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
                 pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
        taosFreeQitem(pollRspWrapper);
      }
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1510
      if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1511
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1512
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1513
         * rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1514 1515
        pVg->currentOffset.version = pollRspWrapper->metaRsp.rspOffset;
        pVg->currentOffset.type = TMQ_OFFSET__LOG;
L
Liu Jicong 已提交
1516 1517
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // build rsp
L
Liu Jicong 已提交
1518
        SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1519 1520 1521 1522
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
L
Liu Jicong 已提交
1523
                 pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1524
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1525 1526
      }
    } else {
L
fix  
Liu Jicong 已提交
1527
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1528
      bool reset = false;
L
Liu Jicong 已提交
1529 1530
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1531
      if (pollIfReset && reset) {
S
Shengliang Guan 已提交
1532
        tscDebug("consumer:%" PRId64 ", reset and repoll", tmq->consumerId);
1533
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
1534 1535 1536 1537 1538
      }
    }
  }
}

1539
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1540
  /*tscDebug("call poll1");*/
L
Liu Jicong 已提交
1541 1542
  void*   rspObj;
  int64_t startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1543

1544 1545 1546
#if 0
  tmqHandleAllDelayedTask(tmq);
  tmqPollImpl(tmq, timeout);
1547
  rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1548 1549
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1550
  }
1551
#endif
X
Xiaoyu Wang 已提交
1552

L
Liu Jicong 已提交
1553
  // in no topic status also need process delayed task
L
Liu Jicong 已提交
1554
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1555 1556 1557
    return NULL;
  }

L
Liu Jicong 已提交
1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
    int32_t retryCnt = 0;
    while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
      if (retryCnt++ > 10) {
        return NULL;
      }
      tscDebug("consumer not ready, retry");
      taosMsleep(500);
    }
  }

X
Xiaoyu Wang 已提交
1569
  while (1) {
L
Liu Jicong 已提交
1570
    tmqHandleAllDelayedTask(tmq);
1571
    if (tmqPollImpl(tmq, timeout) < 0) return NULL;
L
Liu Jicong 已提交
1572

1573
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1574 1575
    if (rspObj) {
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
1576 1577
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      return NULL;
X
Xiaoyu Wang 已提交
1578
    }
1579
    if (timeout != -1) {
X
Xiaoyu Wang 已提交
1580
      int64_t endTime = taosGetTimestampMs();
1581
      int64_t leftTime = endTime - startTime;
1582
      if (leftTime > timeout) {
S
Shengliang Guan 已提交
1583
        tscDebug("consumer:%" PRId64 ", (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
X
Xiaoyu Wang 已提交
1584 1585
        return NULL;
      }
1586
      tsem_timewait(&tmq->rspSem, leftTime * 1000);
L
Liu Jicong 已提交
1587 1588 1589
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
      tsem_timewait(&tmq->rspSem, 500 * 1000);
X
Xiaoyu Wang 已提交
1590 1591 1592 1593
    }
  }
}

L
Liu Jicong 已提交
1594
int32_t tmq_consumer_close(tmq_t* tmq) {
1595
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
L
Liu Jicong 已提交
1596 1597
    int32_t rsp = tmq_commit_sync(tmq, NULL);
    if (rsp != 0) {
L
Liu Jicong 已提交
1598
      return rsp;
1599 1600
    }

L
Liu Jicong 已提交
1601
    int32_t     retryCnt = 0;
1602
    tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
1603 1604 1605 1606 1607 1608 1609 1610 1611 1612
    while (1) {
      rsp = tmq_subscribe(tmq, lst);
      if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
        break;
      } else {
        retryCnt++;
        taosMsleep(500);
      }
    }

1613
    tmq_list_destroy(lst);
1614

L
Liu Jicong 已提交
1615 1616
    /*return rsp;*/
    return 0;
L
Liu Jicong 已提交
1617
  }
1618
  // TODO: free resources
L
Liu Jicong 已提交
1619
  return 0;
1620
}
L
Liu Jicong 已提交
1621

L
Liu Jicong 已提交
1622 1623
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
1624
    return "success";
L
Liu Jicong 已提交
1625
  } else if (err == -1) {
L
Liu Jicong 已提交
1626 1627 1628
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
1629 1630
  }
}
L
Liu Jicong 已提交
1631

L
Liu Jicong 已提交
1632 1633 1634 1635
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
wmmhello's avatar
wmmhello 已提交
1636 1637 1638 1639
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DELETE) {
      return TMQ_RES_DATA;
    }
L
Liu Jicong 已提交
1640 1641 1642 1643 1644 1645
    return TMQ_RES_TABLE_META;
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
1646
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
1647 1648
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
1649
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1650 1651 1652
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1653 1654 1655 1656 1657
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1658 1659 1660 1661
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1662 1663 1664
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1665 1666 1667 1668 1669
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1670 1671 1672 1673
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
1674 1675 1676
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
L
Liu Jicong 已提交
1677 1678 1679 1680
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
1681 1682 1683 1684 1685 1686 1687 1688

const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
1689
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
L
Liu Jicong 已提交
1690 1691 1692
  }
  return NULL;
}
1693

1694 1695
static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id,
                                  int8_t t) {
wmmhello's avatar
wmmhello 已提交
1696 1697 1698 1699 1700 1701 1702
  char*  string = NULL;
  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    return string;
  }
  cJSON* type = cJSON_CreateString("create");
  cJSON_AddItemToObject(json, "type", type);
wmmhello's avatar
wmmhello 已提交
1703

1704 1705 1706 1707
  //  char uid[32] = {0};
  //  sprintf(uid, "%"PRIi64, id);
  //  cJSON* id_ = cJSON_CreateString(uid);
  //  cJSON_AddItemToObject(json, "id", id_);
wmmhello's avatar
wmmhello 已提交
1708 1709 1710 1711
  cJSON* tableName = cJSON_CreateString(name);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super");
  cJSON_AddItemToObject(json, "tableType", tableType);
1712 1713
  //  cJSON* version = cJSON_CreateNumber(1);
  //  cJSON_AddItemToObject(json, "version", version);
wmmhello's avatar
wmmhello 已提交
1714 1715

  cJSON* columns = cJSON_CreateArray();
1716 1717 1718 1719
  for (int i = 0; i < schemaRow->nCols; i++) {
    cJSON*   column = cJSON_CreateObject();
    SSchema* s = schemaRow->pSchema + i;
    cJSON*   cname = cJSON_CreateString(s->name);
wmmhello's avatar
wmmhello 已提交
1720 1721 1722
    cJSON_AddItemToObject(column, "name", cname);
    cJSON* ctype = cJSON_CreateNumber(s->type);
    cJSON_AddItemToObject(column, "type", ctype);
1723
    if (s->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
1724
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
1725
      cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1726
      cJSON_AddItemToObject(column, "length", cbytes);
1727 1728 1729
    } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
      cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1730 1731
      cJSON_AddItemToObject(column, "length", cbytes);
    }
wmmhello's avatar
wmmhello 已提交
1732 1733 1734 1735 1736
    cJSON_AddItemToArray(columns, column);
  }
  cJSON_AddItemToObject(json, "columns", columns);

  cJSON* tags = cJSON_CreateArray();
1737 1738 1739 1740
  for (int i = 0; schemaTag && i < schemaTag->nCols; i++) {
    cJSON*   tag = cJSON_CreateObject();
    SSchema* s = schemaTag->pSchema + i;
    cJSON*   tname = cJSON_CreateString(s->name);
wmmhello's avatar
wmmhello 已提交
1741 1742 1743
    cJSON_AddItemToObject(tag, "name", tname);
    cJSON* ttype = cJSON_CreateNumber(s->type);
    cJSON_AddItemToObject(tag, "type", ttype);
1744
    if (s->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
1745
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
1746
      cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1747
      cJSON_AddItemToObject(tag, "length", cbytes);
1748 1749 1750
    } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
      cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1751 1752
      cJSON_AddItemToObject(tag, "length", cbytes);
    }
wmmhello's avatar
wmmhello 已提交
1753 1754 1755 1756 1757 1758 1759 1760 1761
    cJSON_AddItemToArray(tags, tag);
  }
  cJSON_AddItemToObject(json, "tags", tags);

  string = cJSON_PrintUnformatted(json);
  cJSON_Delete(json);
  return string;
}

1762 1763 1764 1765
static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) {
  SMAlterStbReq req = {0};
  cJSON*        json = NULL;
  char*         string = NULL;
wmmhello's avatar
wmmhello 已提交
1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776

  if (tDeserializeSMAlterStbReq(alterData, alterDataLen, &req) != 0) {
    goto end;
  }

  json = cJSON_CreateObject();
  if (json == NULL) {
    goto end;
  }
  cJSON* type = cJSON_CreateString("alter");
  cJSON_AddItemToObject(json, "type", type);
1777 1778
  //  cJSON* uid = cJSON_CreateNumber(id);
  //  cJSON_AddItemToObject(json, "uid", uid);
wmmhello's avatar
wmmhello 已提交
1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790
  SName name = {0};
  tNameFromString(&name, req.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
  cJSON* tableName = cJSON_CreateString(name.tname);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString("super");
  cJSON_AddItemToObject(json, "tableType", tableType);

  cJSON* alterType = cJSON_CreateNumber(req.alterType);
  cJSON_AddItemToObject(json, "alterType", alterType);
  switch (req.alterType) {
    case TSDB_ALTER_TABLE_ADD_TAG:
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
1791 1792
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
      cJSON*      colName = cJSON_CreateString(field->name);
wmmhello's avatar
wmmhello 已提交
1793 1794 1795 1796
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colType = cJSON_CreateNumber(field->type);
      cJSON_AddItemToObject(json, "colType", colType);

1797
      if (field->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
1798
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
1799
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1800
        cJSON_AddItemToObject(json, "colLength", cbytes);
1801 1802 1803
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1804 1805 1806 1807 1808
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
      break;
    }
    case TSDB_ALTER_TABLE_DROP_TAG:
1809 1810 1811
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
      cJSON*      colName = cJSON_CreateString(field->name);
wmmhello's avatar
wmmhello 已提交
1812 1813 1814 1815
      cJSON_AddItemToObject(json, "colName", colName);
      break;
    }
    case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
1816 1817 1818
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
      cJSON*      colName = cJSON_CreateString(field->name);
wmmhello's avatar
wmmhello 已提交
1819 1820 1821
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colType = cJSON_CreateNumber(field->type);
      cJSON_AddItemToObject(json, "colType", colType);
1822
      if (field->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
1823
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
1824
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1825
        cJSON_AddItemToObject(json, "colLength", cbytes);
1826 1827 1828
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1829 1830 1831 1832 1833
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
      break;
    }
    case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
1834 1835 1836 1837
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
      TAOS_FIELD* oldField = taosArrayGet(req.pFields, 0);
      TAOS_FIELD* newField = taosArrayGet(req.pFields, 1);
      cJSON*      colName = cJSON_CreateString(oldField->name);
wmmhello's avatar
wmmhello 已提交
1838 1839 1840 1841 1842 1843 1844 1845 1846 1847
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colNewName = cJSON_CreateString(newField->name);
      cJSON_AddItemToObject(json, "colNewName", colNewName);
      break;
    }
    default:
      break;
  }
  string = cJSON_PrintUnformatted(json);

1848
end:
wmmhello's avatar
wmmhello 已提交
1849 1850 1851 1852 1853
  cJSON_Delete(json);
  tFreeSMAltertbReq(&req);
  return string;
}

1854
static char* processCreateStb(SMqMetaRsp* metaRsp) {
wmmhello's avatar
wmmhello 已提交
1855 1856
  SVCreateStbReq req = {0};
  SDecoder       coder;
1857
  char*          string = NULL;
wmmhello's avatar
wmmhello 已提交
1858 1859

  // decode and process req
1860
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
1861 1862 1863 1864 1865 1866 1867 1868 1869 1870
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
    goto _err;
  }
  string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE);
  tDecoderClear(&coder);
  return string;

1871
_err:
wmmhello's avatar
wmmhello 已提交
1872 1873 1874 1875
  tDecoderClear(&coder);
  return string;
}

1876
static char* processAlterStb(SMqMetaRsp* metaRsp) {
wmmhello's avatar
wmmhello 已提交
1877 1878
  SVCreateStbReq req = {0};
  SDecoder       coder;
1879
  char*          string = NULL;
wmmhello's avatar
wmmhello 已提交
1880 1881

  // decode and process req
1882
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
1883 1884 1885 1886 1887 1888 1889 1890 1891 1892
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
    goto _err;
  }
  string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen);
  tDecoderClear(&coder);
  return string;

1893
_err:
wmmhello's avatar
wmmhello 已提交
1894 1895 1896 1897
  tDecoderClear(&coder);
  return string;
}

1898
static char* buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray* tagName, int64_t id, uint8_t tagNum) {
1899
  char*   string = NULL;
wmmhello's avatar
wmmhello 已提交
1900
  SArray* pTagVals = NULL;
1901
  cJSON*  json = cJSON_CreateObject();
wmmhello's avatar
wmmhello 已提交
1902 1903 1904 1905 1906
  if (json == NULL) {
    return string;
  }
  cJSON* type = cJSON_CreateString("create");
  cJSON_AddItemToObject(json, "type", type);
1907 1908 1909 1910
  //  char cid[32] = {0};
  //  sprintf(cid, "%"PRIi64, id);
  //  cJSON* cid_ = cJSON_CreateString(cid);
  //  cJSON_AddItemToObject(json, "id", cid_);
wmmhello's avatar
wmmhello 已提交
1911

wmmhello's avatar
wmmhello 已提交
1912 1913 1914 1915
  cJSON* tableName = cJSON_CreateString(name);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString("child");
  cJSON_AddItemToObject(json, "tableType", tableType);
wmmhello's avatar
wmmhello 已提交
1916
  cJSON* using = cJSON_CreateString(sname);
wmmhello's avatar
wmmhello 已提交
1917
  cJSON_AddItemToObject(json, "using", using);
1918 1919
  cJSON* tagNumJson = cJSON_CreateNumber(tagNum);
  cJSON_AddItemToObject(json, "tagNum", tagNumJson);
1920 1921
  //  cJSON* version = cJSON_CreateNumber(1);
  //  cJSON_AddItemToObject(json, "version", version);
wmmhello's avatar
wmmhello 已提交
1922

1923
  cJSON*  tags = cJSON_CreateArray();
wmmhello's avatar
wmmhello 已提交
1924 1925 1926 1927
  int32_t code = tTagToValArray(pTag, &pTagVals);
  if (code) {
    goto end;
  }
wmmhello's avatar
wmmhello 已提交
1928

wmmhello's avatar
wmmhello 已提交
1929 1930
  if (tTagIsJson(pTag)) {
    STag* p = (STag*)pTag;
1931
    if (p->nTag == 0) {
wmmhello's avatar
wmmhello 已提交
1932 1933
      goto end;
    }
1934 1935
    char*    pJson = parseTagDatatoJson(pTag);
    cJSON*   tag = cJSON_CreateObject();
wmmhello's avatar
wmmhello 已提交
1936 1937
    STagVal* pTagVal = taosArrayGet(pTagVals, 0);

wmmhello's avatar
wmmhello 已提交
1938 1939
    char*  ptname = taosArrayGet(tagName, 0);
    cJSON* tname = cJSON_CreateString(ptname);
wmmhello's avatar
wmmhello 已提交
1940
    cJSON_AddItemToObject(tag, "name", tname);
1941 1942
    //    cJSON* cid_ = cJSON_CreateString("");
    //    cJSON_AddItemToObject(tag, "cid", cid_);
wmmhello's avatar
wmmhello 已提交
1943 1944
    cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON);
    cJSON_AddItemToObject(tag, "type", ttype);
wmmhello's avatar
wmmhello 已提交
1945
    cJSON* tvalue = cJSON_CreateString(pJson);
wmmhello's avatar
wmmhello 已提交
1946 1947
    cJSON_AddItemToObject(tag, "value", tvalue);
    cJSON_AddItemToArray(tags, tag);
wmmhello's avatar
wmmhello 已提交
1948
    taosMemoryFree(pJson);
wmmhello's avatar
wmmhello 已提交
1949 1950 1951
    goto end;
  }

1952
  for (int i = 0; i < taosArrayGetSize(pTagVals); i++) {
wmmhello's avatar
wmmhello 已提交
1953 1954 1955
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);

    cJSON* tag = cJSON_CreateObject();
wmmhello's avatar
wmmhello 已提交
1956

wmmhello's avatar
wmmhello 已提交
1957 1958
    char*  ptname = taosArrayGet(tagName, i);
    cJSON* tname = cJSON_CreateString(ptname);
wmmhello's avatar
wmmhello 已提交
1959
    cJSON_AddItemToObject(tag, "name", tname);
1960 1961
    //    cJSON* cid = cJSON_CreateNumber(pTagVal->cid);
    //    cJSON_AddItemToObject(tag, "cid", cid);
wmmhello's avatar
wmmhello 已提交
1962 1963
    cJSON* ttype = cJSON_CreateNumber(pTagVal->type);
    cJSON_AddItemToObject(tag, "type", ttype);
wmmhello's avatar
wmmhello 已提交
1964

wmmhello's avatar
wmmhello 已提交
1965
    cJSON* tvalue = NULL;
wmmhello's avatar
wmmhello 已提交
1966
    if (IS_VAR_DATA_TYPE(pTagVal->type)) {
wmmhello's avatar
wmmhello 已提交
1967
      char* buf = taosMemoryCalloc(pTagVal->nData + 3, 1);
L
Liu Jicong 已提交
1968
      if (!buf) goto end;
wmmhello's avatar
wmmhello 已提交
1969
      dataConverToStr(buf, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL);
wmmhello's avatar
wmmhello 已提交
1970 1971
      tvalue = cJSON_CreateString(buf);
      taosMemoryFree(buf);
wmmhello's avatar
wmmhello 已提交
1972
    } else {
wmmhello's avatar
wmmhello 已提交
1973 1974 1975
      double val = 0;
      GET_TYPED_DATA(val, double, pTagVal->type, &pTagVal->i64);
      tvalue = cJSON_CreateNumber(val);
wmmhello's avatar
wmmhello 已提交
1976 1977
    }

wmmhello's avatar
wmmhello 已提交
1978 1979 1980
    cJSON_AddItemToObject(tag, "value", tvalue);
    cJSON_AddItemToArray(tags, tag);
  }
wmmhello's avatar
wmmhello 已提交
1981

1982
end:
wmmhello's avatar
wmmhello 已提交
1983 1984 1985
  cJSON_AddItemToObject(json, "tags", tags);
  string = cJSON_PrintUnformatted(json);
  cJSON_Delete(json);
wmmhello's avatar
wmmhello 已提交
1986
  taosArrayDestroy(pTagVals);
wmmhello's avatar
wmmhello 已提交
1987 1988 1989
  return string;
}

1990
static char* processCreateTable(SMqMetaRsp* metaRsp) {
wmmhello's avatar
wmmhello 已提交
1991 1992
  SDecoder           decoder = {0};
  SVCreateTbBatchReq req = {0};
1993 1994
  SVCreateTbReq*     pCreateReq;
  char*              string = NULL;
wmmhello's avatar
wmmhello 已提交
1995
  // decode
1996
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
1997 1998 1999 2000 2001 2002 2003 2004 2005
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
    goto _exit;
  }

  // loop to create table
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;
2006 2007
    if (pCreateReq->type == TSDB_CHILD_TABLE) {
      string = buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.name, pCreateReq->name,
2008
                                     pCreateReq->ctb.tagName, pCreateReq->uid, pCreateReq->ctb.tagNum);
2009 2010 2011
    } else if (pCreateReq->type == TSDB_NORMAL_TABLE) {
      string =
          buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE);
wmmhello's avatar
wmmhello 已提交
2012 2013 2014 2015 2016
    }
  }

  tDecoderClear(&decoder);

2017
_exit:
wmmhello's avatar
wmmhello 已提交
2018 2019 2020 2021
  tDecoderClear(&decoder);
  return string;
}

2022 2023 2024 2025
static char* processAlterTable(SMqMetaRsp* metaRsp) {
  SDecoder     decoder = {0};
  SVAlterTbReq vAlterTbReq = {0};
  char*        string = NULL;
wmmhello's avatar
wmmhello 已提交
2026 2027

  // decode
2028
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVAlterTbReq(&decoder, &vAlterTbReq) < 0) {
    goto _exit;
  }

  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    goto _exit;
  }
  cJSON* type = cJSON_CreateString("alter");
  cJSON_AddItemToObject(json, "type", type);
2041 2042
  //  cJSON* uid = cJSON_CreateNumber(id);
  //  cJSON_AddItemToObject(json, "uid", uid);
wmmhello's avatar
wmmhello 已提交
2043 2044
  cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName);
  cJSON_AddItemToObject(json, "tableName", tableName);
wmmhello's avatar
wmmhello 已提交
2045
  cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ? "child" : "normal");
wmmhello's avatar
wmmhello 已提交
2046
  cJSON_AddItemToObject(json, "tableType", tableType);
wmmhello's avatar
wmmhello 已提交
2047 2048
  cJSON* alterType = cJSON_CreateNumber(vAlterTbReq.action);
  cJSON_AddItemToObject(json, "alterType", alterType);
wmmhello's avatar
wmmhello 已提交
2049 2050 2051 2052 2053 2054 2055

  switch (vAlterTbReq.action) {
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
      cJSON_AddItemToObject(json, "colType", colType);
wmmhello's avatar
wmmhello 已提交
2056

2057
      if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
2058
        int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
2059
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2060
        cJSON_AddItemToObject(json, "colLength", cbytes);
2061 2062 2063
      } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
        int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2064 2065
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
wmmhello's avatar
wmmhello 已提交
2066 2067
      break;
    }
2068
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
wmmhello's avatar
wmmhello 已提交
2069 2070 2071 2072
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
      break;
    }
2073
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
wmmhello's avatar
wmmhello 已提交
2074 2075
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
wmmhello's avatar
wmmhello 已提交
2076
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.colModType);
wmmhello's avatar
wmmhello 已提交
2077
      cJSON_AddItemToObject(json, "colType", colType);
2078
      if (vAlterTbReq.colModType == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
2079
        int32_t length = vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE;
2080
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2081
        cJSON_AddItemToObject(json, "colLength", cbytes);
2082 2083 2084
      } else if (vAlterTbReq.colModType == TSDB_DATA_TYPE_NCHAR) {
        int32_t length = (vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2085 2086
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
wmmhello's avatar
wmmhello 已提交
2087 2088
      break;
    }
2089
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
wmmhello's avatar
wmmhello 已提交
2090 2091 2092 2093 2094 2095
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colNewName = cJSON_CreateString(vAlterTbReq.colNewName);
      cJSON_AddItemToObject(json, "colNewName", colNewName);
      break;
    }
2096
    case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: {
wmmhello's avatar
wmmhello 已提交
2097 2098
      cJSON* tagName = cJSON_CreateString(vAlterTbReq.tagName);
      cJSON_AddItemToObject(json, "colName", tagName);
wmmhello's avatar
wmmhello 已提交
2099

wmmhello's avatar
wmmhello 已提交
2100
      bool isNull = vAlterTbReq.isNull;
2101 2102 2103
      if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
        STag* jsonTag = (STag*)vAlterTbReq.pTagVal;
        if (jsonTag->nTag == 0) isNull = true;
wmmhello's avatar
wmmhello 已提交
2104
      }
2105
      if (!isNull) {
wmmhello's avatar
wmmhello 已提交
2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120
        char* buf = NULL;

        if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
          ASSERT(tTagIsJson(vAlterTbReq.pTagVal) == true);
          buf = parseTagDatatoJson(vAlterTbReq.pTagVal);
        } else {
          buf = taosMemoryCalloc(vAlterTbReq.nTagVal + 1, 1);
          dataConverToStr(buf, vAlterTbReq.tagType, vAlterTbReq.pTagVal, vAlterTbReq.nTagVal, NULL);
        }

        cJSON* colValue = cJSON_CreateString(buf);
        cJSON_AddItemToObject(json, "colValue", colValue);
        taosMemoryFree(buf);
      }

wmmhello's avatar
wmmhello 已提交
2121 2122
      cJSON* isNullCJson = cJSON_CreateBool(isNull);
      cJSON_AddItemToObject(json, "colValueNull", isNullCJson);
wmmhello's avatar
wmmhello 已提交
2123 2124
      break;
    }
wmmhello's avatar
wmmhello 已提交
2125 2126 2127 2128 2129
    default:
      break;
  }
  string = cJSON_PrintUnformatted(json);

2130
_exit:
wmmhello's avatar
wmmhello 已提交
2131 2132 2133 2134
  tDecoderClear(&decoder);
  return string;
}

2135 2136 2137 2138
static char* processDropSTable(SMqMetaRsp* metaRsp) {
  SDecoder     decoder = {0};
  SVDropStbReq req = {0};
  char*        string = NULL;
wmmhello's avatar
wmmhello 已提交
2139 2140

  // decode
2141
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
    goto _exit;
  }

  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    goto _exit;
  }
  cJSON* type = cJSON_CreateString("drop");
  cJSON_AddItemToObject(json, "type", type);
  cJSON* tableName = cJSON_CreateString(req.name);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString("super");
  cJSON_AddItemToObject(json, "tableType", tableType);

  string = cJSON_PrintUnformatted(json);

2161
_exit:
wmmhello's avatar
wmmhello 已提交
2162 2163 2164 2165
  tDecoderClear(&decoder);
  return string;
}

2166 2167 2168 2169
static char* processDropTable(SMqMetaRsp* metaRsp) {
  SDecoder         decoder = {0};
  SVDropTbBatchReq req = {0};
  char*            string = NULL;
wmmhello's avatar
wmmhello 已提交
2170 2171

  // decode
2172
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVDropTbBatchReq(&decoder, &req) < 0) {
    goto _exit;
  }

  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    goto _exit;
  }
  cJSON* type = cJSON_CreateString("drop");
  cJSON_AddItemToObject(json, "type", type);
2185 2186 2187 2188
  //  cJSON* uid = cJSON_CreateNumber(id);
  //  cJSON_AddItemToObject(json, "uid", uid);
  //  cJSON* tableType = cJSON_CreateString("normal");
  //  cJSON_AddItemToObject(json, "tableType", tableType);
wmmhello's avatar
wmmhello 已提交
2189 2190 2191 2192 2193

  cJSON* tableNameList = cJSON_CreateArray();
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;

wmmhello's avatar
wmmhello 已提交
2194
    cJSON* tableName = cJSON_CreateString(pDropTbReq->name);
wmmhello's avatar
wmmhello 已提交
2195 2196 2197 2198 2199 2200
    cJSON_AddItemToArray(tableNameList, tableName);
  }
  cJSON_AddItemToObject(json, "tableNameList", tableNameList);

  string = cJSON_PrintUnformatted(json);

2201
_exit:
wmmhello's avatar
wmmhello 已提交
2202 2203 2204 2205
  tDecoderClear(&decoder);
  return string;
}

2206
static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
wmmhello's avatar
wmmhello 已提交
2207 2208 2209
  SVCreateStbReq req = {0};
  SDecoder       coder;
  SMCreateStbReq pReq = {0};
2210 2211
  int32_t        code = TSDB_CODE_SUCCESS;
  SRequestObj*   pRequest = NULL;
wmmhello's avatar
wmmhello 已提交
2212

2213
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2214 2215 2216 2217
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2218
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2219 2220 2221 2222
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2223
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2224 2225 2226 2227 2228 2229 2230 2231
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }
  // build create stable
  pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SField));
2232
  for (int32_t i = 0; i < req.schemaRow.nCols; i++) {
wmmhello's avatar
wmmhello 已提交
2233
    SSchema* pSchema = req.schemaRow.pSchema + i;
2234
    SField   field = {.type = pSchema->type, .bytes = pSchema->bytes};
wmmhello's avatar
wmmhello 已提交
2235 2236 2237 2238
    strcpy(field.name, pSchema->name);
    taosArrayPush(pReq.pColumns, &field);
  }
  pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField));
2239
  for (int32_t i = 0; i < req.schemaTag.nCols; i++) {
wmmhello's avatar
wmmhello 已提交
2240
    SSchema* pSchema = req.schemaTag.pSchema + i;
2241
    SField   field = {.type = pSchema->type, .bytes = pSchema->bytes};
wmmhello's avatar
wmmhello 已提交
2242 2243 2244
    strcpy(field.name, pSchema->name);
    taosArrayPush(pReq.pTags, &field);
  }
2245

2246 2247
  pReq.colVer = req.schemaRow.version;
  pReq.tagVer = req.schemaTag.version;
wmmhello's avatar
wmmhello 已提交
2248 2249 2250 2251
  pReq.numOfColumns = req.schemaRow.nCols;
  pReq.numOfTags = req.schemaTag.nCols;
  pReq.commentLen = -1;
  pReq.suid = req.suid;
wmmhello's avatar
wmmhello 已提交
2252
  pReq.source = TD_REQ_FROM_TAOX;
wmmhello's avatar
wmmhello 已提交
2253
  pReq.igExists = true;
wmmhello's avatar
wmmhello 已提交
2254

2255
  STscObj* pTscObj = pRequest->pTscObj;
2256
  SName    tableName;
wmmhello's avatar
wmmhello 已提交
2257
  tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
wmmhello's avatar
wmmhello 已提交
2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269

  SCmdMsgInfo pCmdMsg = {0};
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
  if (NULL == pCmdMsg.pMsg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);

2270
  SQuery pQuery = {0};
wmmhello's avatar
wmmhello 已提交
2271 2272 2273 2274 2275 2276
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
  pQuery.pCmdMsg = &pCmdMsg;
  pQuery.msgType = pQuery.pCmdMsg->msgType;
  pQuery.stableQuery = true;

  launchQueryImpl(pRequest, &pQuery, true, NULL);
wmmhello's avatar
wmmhello 已提交
2277

L
Liu Jicong 已提交
2278 2279
  if (pRequest->code == TSDB_CODE_SUCCESS) {
    SCatalog* pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2280 2281 2282 2283
    catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
    catalogRemoveTableMeta(pCatalog, &tableName);
  }

wmmhello's avatar
wmmhello 已提交
2284 2285 2286
  code = pRequest->code;
  taosMemoryFree(pCmdMsg.pMsg);

2287
end:
wmmhello's avatar
wmmhello 已提交
2288 2289 2290 2291 2292 2293
  destroyRequest(pRequest);
  tFreeSMCreateStbReq(&pReq);
  tDecoderClear(&coder);
  return code;
}

2294
static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
wmmhello's avatar
wmmhello 已提交
2295 2296 2297
  SVDropStbReq req = {0};
  SDecoder     coder;
  SMDropStbReq pReq = {0};
2298
  int32_t      code = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2299 2300
  SRequestObj* pRequest = NULL;

2301
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2302 2303 2304 2305
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2306
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2307 2308 2309 2310
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2311
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2312 2313 2314 2315 2316 2317 2318 2319 2320
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVDropStbReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

  // build drop stable
  pReq.igNotExists = true;
wmmhello's avatar
wmmhello 已提交
2321
  pReq.source = TD_REQ_FROM_TAOX;
wmmhello's avatar
wmmhello 已提交
2322
  pReq.suid = req.suid;
2323 2324

  STscObj* pTscObj = pRequest->pTscObj;
wmmhello's avatar
wmmhello 已提交
2325
  SName    tableName = {0};
wmmhello's avatar
wmmhello 已提交
2326
  tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
wmmhello's avatar
wmmhello 已提交
2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338

  SCmdMsgInfo pCmdMsg = {0};
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  pCmdMsg.msgType = TDMT_MND_DROP_STB;
  pCmdMsg.msgLen = tSerializeSMDropStbReq(NULL, 0, &pReq);
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
  if (NULL == pCmdMsg.pMsg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  tSerializeSMDropStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);

2339
  SQuery pQuery = {0};
wmmhello's avatar
wmmhello 已提交
2340 2341 2342 2343 2344 2345
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
  pQuery.pCmdMsg = &pCmdMsg;
  pQuery.msgType = pQuery.pCmdMsg->msgType;
  pQuery.stableQuery = true;

  launchQueryImpl(pRequest, &pQuery, true, NULL);
wmmhello's avatar
wmmhello 已提交
2346

L
Liu Jicong 已提交
2347 2348
  if (pRequest->code == TSDB_CODE_SUCCESS) {
    SCatalog* pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2349 2350 2351 2352
    catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
    catalogRemoveTableMeta(pCatalog, &tableName);
  }

wmmhello's avatar
wmmhello 已提交
2353 2354 2355
  code = pRequest->code;
  taosMemoryFree(pCmdMsg.pMsg);

2356
end:
wmmhello's avatar
wmmhello 已提交
2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  return code;
}

typedef struct SVgroupCreateTableBatch {
  SVCreateTbBatchReq req;
  SVgroupInfo        info;
  char               dbName[TSDB_DB_NAME_LEN];
} SVgroupCreateTableBatch;

static void destroyCreateTbReqBatch(void* data) {
2369
  SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*)data;
wmmhello's avatar
wmmhello 已提交
2370 2371 2372
  taosArrayDestroy(pTbBatch->req.pArray);
}

L
Liu Jicong 已提交
2373 2374 2375 2376 2377 2378 2379
static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
  SVCreateTbBatchReq req = {0};
  SDecoder           coder = {0};
  int32_t            code = TSDB_CODE_SUCCESS;
  SRequestObj*       pRequest = NULL;
  SQuery*            pQuery = NULL;
  SHashObj*          pVgroupHashmap = NULL;
wmmhello's avatar
wmmhello 已提交
2380

L
Liu Jicong 已提交
2381
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2382 2383 2384 2385
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2386
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2387 2388 2389 2390
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2391
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2392 2393 2394 2395 2396 2397 2398
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

2399 2400
  STscObj* pTscObj = pRequest->pTscObj;

L
Liu Jicong 已提交
2401 2402
  SVCreateTbReq* pCreateReq = NULL;
  SCatalog*      pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
  if (NULL == pVgroupHashmap) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);

  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
2416 2417 2418
                           .requestId = pRequest->requestId,
                           .requestObjRefId = pRequest->self,
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
wmmhello's avatar
wmmhello 已提交
2419 2420

  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
wmmhello's avatar
wmmhello 已提交
2421 2422 2423 2424 2425
  // loop to create table
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;

    SVgroupInfo pInfo = {0};
wmmhello's avatar
wmmhello 已提交
2426
    SName       pName = {0};
wmmhello's avatar
wmmhello 已提交
2427 2428 2429 2430 2431
    toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName);
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
    if (code != TSDB_CODE_SUCCESS) {
      goto end;
    }
wmmhello's avatar
wmmhello 已提交
2432
    taosArrayPush(pRequest->tableList, &pName);
wmmhello's avatar
wmmhello 已提交
2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458

    SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
    if (pTableBatch == NULL) {
      SVgroupCreateTableBatch tBatch = {0};
      tBatch.info = pInfo;
      strcpy(tBatch.dbName, pRequest->pDb);

      tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
      taosArrayPush(tBatch.req.pArray, pCreateReq);

      taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch));
    } else {  // add to the correct vgroup
      taosArrayPush(pTableBatch->req.pArray, pCreateReq);
    }
  }

  SArray* pBufArray = serializeVgroupsCreateTableBatch(pVgroupHashmap);
  if (NULL == pBufArray) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->msgType = TDMT_VND_CREATE_TABLE;
  pQuery->stableQuery = false;
2459
  pQuery->pRoot = nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT);
wmmhello's avatar
wmmhello 已提交
2460 2461 2462 2463 2464 2465

  code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
2466
  launchQueryImpl(pRequest, pQuery, true, NULL);
L
Liu Jicong 已提交
2467
  if (pRequest->code == TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2468 2469 2470
    removeMeta(pTscObj, pRequest->tableList);
  }

2471
  code = pRequest->code;
wmmhello's avatar
wmmhello 已提交
2472

2473
end:
wmmhello's avatar
wmmhello 已提交
2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491
  taosHashCleanup(pVgroupHashmap);
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  qDestroyQuery(pQuery);
  return code;
}

typedef struct SVgroupDropTableBatch {
  SVDropTbBatchReq req;
  SVgroupInfo      info;
  char             dbName[TSDB_DB_NAME_LEN];
} SVgroupDropTableBatch;

static void destroyDropTbReqBatch(void* data) {
  SVgroupDropTableBatch* pTbBatch = (SVgroupDropTableBatch*)data;
  taosArrayDestroy(pTbBatch->req.pArray);
}

L
Liu Jicong 已提交
2492 2493 2494 2495 2496 2497 2498
static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
  SVDropTbBatchReq req = {0};
  SDecoder         coder = {0};
  int32_t          code = TSDB_CODE_SUCCESS;
  SRequestObj*     pRequest = NULL;
  SQuery*          pQuery = NULL;
  SHashObj*        pVgroupHashmap = NULL;
wmmhello's avatar
wmmhello 已提交
2499

2500
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2501 2502 2503 2504
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2505
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2506 2507 2508 2509
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2510
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2511 2512 2513 2514 2515 2516 2517
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVDropTbBatchReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

2518 2519
  STscObj* pTscObj = pRequest->pTscObj;

L
Liu Jicong 已提交
2520 2521
  SVDropTbReq* pDropReq = NULL;
  SCatalog*    pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
  if (NULL == pVgroupHashmap) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);

  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
2535 2536 2537
                           .requestId = pRequest->requestId,
                           .requestObjRefId = pRequest->self,
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
wmmhello's avatar
wmmhello 已提交
2538
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
wmmhello's avatar
wmmhello 已提交
2539 2540 2541
  // loop to create table
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pDropReq = req.pReqs + iReq;
wmmhello's avatar
wmmhello 已提交
2542
    pDropReq->igNotExists = true;
wmmhello's avatar
wmmhello 已提交
2543 2544

    SVgroupInfo pInfo = {0};
wmmhello's avatar
wmmhello 已提交
2545
    SName       pName = {0};
wmmhello's avatar
wmmhello 已提交
2546 2547 2548 2549 2550 2551
    toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName);
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
    if (code != TSDB_CODE_SUCCESS) {
      goto end;
    }

wmmhello's avatar
wmmhello 已提交
2552
    taosArrayPush(pRequest->tableList, &pName);
wmmhello's avatar
wmmhello 已提交
2553 2554 2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575
    SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
    if (pTableBatch == NULL) {
      SVgroupDropTableBatch tBatch = {0};
      tBatch.info = pInfo;
      tBatch.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
      taosArrayPush(tBatch.req.pArray, pDropReq);

      taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch));
    } else {  // add to the correct vgroup
      taosArrayPush(pTableBatch->req.pArray, pDropReq);
    }
  }

  SArray* pBufArray = serializeVgroupsDropTableBatch(pVgroupHashmap);
  if (NULL == pBufArray) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->msgType = TDMT_VND_DROP_TABLE;
  pQuery->stableQuery = false;
2576
  pQuery->pRoot = nodesMakeNode(QUERY_NODE_DROP_TABLE_STMT);
wmmhello's avatar
wmmhello 已提交
2577 2578 2579 2580 2581 2582

  code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
2583
  launchQueryImpl(pRequest, pQuery, true, NULL);
L
Liu Jicong 已提交
2584
  if (pRequest->code == TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2585 2586
    removeMeta(pTscObj, pRequest->tableList);
  }
2587
  code = pRequest->code;
wmmhello's avatar
wmmhello 已提交
2588

2589
end:
wmmhello's avatar
wmmhello 已提交
2590 2591 2592 2593 2594 2595 2596
  taosHashCleanup(pVgroupHashmap);
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  qDestroyQuery(pQuery);
  return code;
}

wmmhello's avatar
wmmhello 已提交
2597 2598
// delete from db.tabl where ..       -> delete from tabl where ..
// delete from db    .tabl where ..   -> delete from tabl where ..
L
Liu Jicong 已提交
2599
// static void getTbName(char *sql){
2600 2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627
//  char *ch = sql;
//
//  bool inBackQuote = false;
//  int8_t dotIndex = 0;
//  while(*ch != '\0'){
//    if(!inBackQuote && *ch == '`'){
//      inBackQuote = true;
//      ch++;
//      continue;
//    }
//
//    if(inBackQuote && *ch == '`'){
//      inBackQuote = false;
//      ch++;
//
//      continue;
//    }
//
//    if(!inBackQuote && *ch == '.'){
//      dotIndex ++;
//      if(dotIndex == 2){
//        memmove(sql, ch + 1, strlen(ch + 1) + 1);
//        break;
//      }
//    }
//    ch++;
//  }
//}
wmmhello's avatar
wmmhello 已提交
2628 2629

static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
L
Liu Jicong 已提交
2630 2631 2632
  SDeleteRes req = {0};
  SDecoder   coder = {0};
  int32_t    code = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2633 2634 2635 2636 2637 2638 2639 2640 2641 2642

  // decode and process req
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeDeleteRes(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

L
Liu Jicong 已提交
2643
  //  getTbName(req.tableFName);
wmmhello's avatar
wmmhello 已提交
2644
  char sql[256] = {0};
L
Liu Jicong 已提交
2645 2646
  sprintf(sql, "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName, req.tsColName,
          req.skey, req.tsColName, req.ekey);
wmmhello's avatar
wmmhello 已提交
2647 2648
  printf("delete sql:%s\n", sql);

L
Liu Jicong 已提交
2649 2650
  TAOS_RES*    res = taos_query(taos, sql);
  SRequestObj* pRequest = (SRequestObj*)res;
wmmhello's avatar
wmmhello 已提交
2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661
  code = pRequest->code;
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
    code = TSDB_CODE_SUCCESS;
  }
  taos_free_result(res);

end:
  tDecoderClear(&coder);
  return code;
}

2662 2663 2664 2665 2666 2667 2668 2669
static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
  SVAlterTbReq   req = {0};
  SDecoder       coder = {0};
  int32_t        code = TSDB_CODE_SUCCESS;
  SRequestObj*   pRequest = NULL;
  SQuery*        pQuery = NULL;
  SArray*        pArray = NULL;
  SVgDataBlocks* pVgData = NULL;
wmmhello's avatar
wmmhello 已提交
2670

2671
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2672 2673 2674 2675 2676

  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2677
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2678 2679 2680 2681
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2682
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2683 2684 2685 2686 2687 2688 2689 2690
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVAlterTbReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

  // do not deal TSDB_ALTER_TABLE_UPDATE_OPTIONS
2691
  if (req.action == TSDB_ALTER_TABLE_UPDATE_OPTIONS) {
wmmhello's avatar
wmmhello 已提交
2692 2693 2694
    goto end;
  }

2695
  STscObj*  pTscObj = pRequest->pTscObj;
2696
  SCatalog* pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2697 2698 2699 2700 2701 2702
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
2703 2704 2705
                           .requestId = pRequest->requestId,
                           .requestObjRefId = pRequest->self,
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
wmmhello's avatar
wmmhello 已提交
2706 2707

  SVgroupInfo pInfo = {0};
2708
  SName       pName = {0};
wmmhello's avatar
wmmhello 已提交
2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741
  toName(pTscObj->acctId, pRequest->pDb, req.tbName, &pName);
  code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  pArray = taosArrayInit(1, sizeof(void*));
  if (NULL == pArray) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }

  pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
  if (NULL == pVgData) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  pVgData->vg = pInfo;
  pVgData->pData = taosMemoryMalloc(metaLen);
  if (NULL == pVgData->pData) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  memcpy(pVgData->pData, meta, metaLen);
  ((SMsgHead*)pVgData->pData)->vgId = htonl(pInfo.vgId);
  pVgData->size = metaLen;
  pVgData->numOfTables = 1;
  taosArrayPush(pArray, &pVgData);

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->msgType = TDMT_VND_ALTER_TABLE;
  pQuery->stableQuery = false;
2742
  pQuery->pRoot = nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT);
wmmhello's avatar
wmmhello 已提交
2743 2744 2745 2746 2747 2748

  code = rewriteToVnodeModifyOpStmt(pQuery, pArray);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
2749 2750
  launchQueryImpl(pRequest, pQuery, true, NULL);

wmmhello's avatar
wmmhello 已提交
2751
  pVgData = NULL;
2752 2753 2754
  pArray = NULL;
  code = pRequest->code;
  if (code == TSDB_CODE_VND_TABLE_NOT_EXIST) {
wmmhello's avatar
wmmhello 已提交
2755
    code = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2756
  }
wmmhello's avatar
wmmhello 已提交
2757

L
Liu Jicong 已提交
2758
  if (pRequest->code == TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2759
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
L
Liu Jicong 已提交
2760
    if (pRes->res != NULL) {
wmmhello's avatar
wmmhello 已提交
2761 2762 2763
      code = handleAlterTbExecRes(pRes->res, pCatalog);
    }
  }
2764
end:
wmmhello's avatar
wmmhello 已提交
2765
  taosArrayDestroy(pArray);
2766
  if (pVgData) taosMemoryFreeClear(pVgData->pData);
wmmhello's avatar
wmmhello 已提交
2767 2768 2769 2770 2771 2772 2773
  taosMemoryFreeClear(pVgData);
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  qDestroyQuery(pQuery);
  return code;
}

L
Liu Jicong 已提交
2774
typedef struct {
2775
  SVgroupInfo vg;
L
Liu Jicong 已提交
2776 2777
  void*       data;
} VgData;
2778 2779 2780 2781 2782 2783

static void destroyVgHash(void* data) {
  VgData* vgData = (VgData*)data;
  taosMemoryFreeClear(vgData->data);
}

L
Liu Jicong 已提交
2784 2785
int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) {
  int32_t     code = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2786
  STableMeta* pTableMeta = NULL;
L
Liu Jicong 已提交
2787
  SQuery*     pQuery = NULL;
wmmhello's avatar
wmmhello 已提交
2788 2789

  SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT);
L
Liu Jicong 已提交
2790
  if (!pRequest) {
wmmhello's avatar
wmmhello 已提交
2791 2792 2793
    uError("WriteRaw:createRequest error request is null");
    code = terrno;
    goto end;
2794 2795
  }

wmmhello's avatar
wmmhello 已提交
2796 2797 2798 2799 2800 2801 2802 2803 2804 2805
  if (!pRequest->pDb) {
    uError("WriteRaw:not use db");
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }

  SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
  strcpy(pName.dbname, pRequest->pDb);
  strcpy(pName.tname, tbname);

L
Liu Jicong 已提交
2806
  struct SCatalog* pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2807
  code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
L
Liu Jicong 已提交
2808
  if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2809 2810 2811 2812 2813 2814 2815 2816 2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832
    uError("WriteRaw: get gatlog error");
    goto end;
  }

  SRequestConnInfo conn = {0};
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
  conn.requestId = pRequest->requestId;
  conn.requestObjRefId = pRequest->self;
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);

  SVgroupInfo vgData = {0};
  code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData);
  if (code != TSDB_CODE_SUCCESS) {
    uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbname);
    goto end;
  }

  code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
  if (code != TSDB_CODE_SUCCESS) {
    uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbname);
    goto end;
  }
  uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid);
  uint64_t uid = pTableMeta->uid;
L
Liu Jicong 已提交
2833
  int32_t  numOfCols = pTableMeta->tableInfo.numOfColumns;
wmmhello's avatar
wmmhello 已提交
2834 2835

  uint16_t fLen = 0;
L
Liu Jicong 已提交
2836 2837
  int32_t  rowSize = 0;
  int16_t  nVar = 0;
wmmhello's avatar
wmmhello 已提交
2838
  for (int i = 0; i < numOfCols; i++) {
L
Liu Jicong 已提交
2839
    SSchema* schema = pTableMeta->schema + i;
wmmhello's avatar
wmmhello 已提交
2840 2841
    fLen += TYPE_BYTES[schema->type];
    rowSize += schema->bytes;
L
Liu Jicong 已提交
2842 2843
    if (IS_VAR_DATA_TYPE(schema->type)) {
      nVar++;
wmmhello's avatar
wmmhello 已提交
2844 2845 2846 2847 2848 2849 2850 2851
    }
  }

  int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
                            (int32_t)TD_BITMAP_BYTES(numOfCols - 1);
  int32_t schemaLen = 0;
  int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;

L
Liu Jicong 已提交
2852
  int32_t     totalLen = sizeof(SSubmitReq) + submitLen;
wmmhello's avatar
wmmhello 已提交
2853 2854
  SSubmitReq* subReq = taosMemoryCalloc(1, totalLen);
  SSubmitBlk* blk = POINTER_SHIFT(subReq, sizeof(SSubmitReq));
L
Liu Jicong 已提交
2855 2856
  void*       blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk));
  STSRow*     rowData = POINTER_SHIFT(blkSchema, schemaLen);
wmmhello's avatar
wmmhello 已提交
2857 2858 2859 2860 2861 2862

  SRowBuilder rb = {0};
  tdSRowInit(&rb, pTableMeta->sversion);
  tdSRowSetTpInfo(&rb, numOfCols, fLen);
  int32_t dataLen = 0;

L
Liu Jicong 已提交
2863
  char*    pStart = pData + sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t));
wmmhello's avatar
wmmhello 已提交
2864 2865 2866
  int32_t* colLength = (int32_t*)pStart;
  pStart += sizeof(int32_t) * numOfCols;

L
Liu Jicong 已提交
2867
  SResultColumn* pCol = taosMemoryCalloc(numOfCols, sizeof(SResultColumn));
wmmhello's avatar
wmmhello 已提交
2868 2869 2870 2871 2872 2873 2874 2875 2876 2877 2878 2879 2880 2881 2882 2883 2884 2885

  for (int32_t i = 0; i < numOfCols; ++i) {
    if (IS_VAR_DATA_TYPE(pTableMeta->schema[i].type)) {
      pCol[i].offset = (int32_t*)pStart;
      pStart += rows * sizeof(int32_t);
    } else {
      pCol[i].nullbitmap = pStart;
      pStart += BitmapLen(rows);
    }

    pCol[i].pData = pStart;
    pStart += colLength[i];
  }

  for (int32_t j = 0; j < rows; j++) {
    tdSRowResetBuf(&rb, rowData);
    int32_t offset = 0;
    for (int32_t k = 0; k < numOfCols; k++) {
L
Liu Jicong 已提交
2886
      const SSchema* pColumn = &pTableMeta->schema[k];
wmmhello's avatar
wmmhello 已提交
2887 2888 2889 2890 2891 2892 2893 2894 2895 2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914 2915 2916

      if (IS_VAR_DATA_TYPE(pColumn->type)) {
        if (pCol[k].offset[j] != -1) {
          char* data = pCol[k].pData + pCol[k].offset[j];
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k);
        } else {
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
        }
      } else {
        if (!colDataIsNull_f(pCol[k].nullbitmap, j)) {
          char* data = pCol[k].pData + pColumn->bytes * j;
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k);
        } else {
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
        }
      }

      offset += TYPE_BYTES[pColumn->type];
    }
    int32_t rowLen = TD_ROW_LEN(rowData);
    rowData = POINTER_SHIFT(rowData, rowLen);
    dataLen += rowLen;
  }

  taosMemoryFree(pCol);

  blk->uid = htobe64(uid);
  blk->suid = htobe64(suid);
  blk->sversion = htonl(pTableMeta->sversion);
  blk->schemaLen = htonl(schemaLen);
2917
  blk->numOfRows = htonl(rows);
wmmhello's avatar
wmmhello 已提交
2918 2919 2920 2921 2922 2923 2924 2925 2926 2927 2928 2929 2930
  blk->dataLen = htonl(dataLen);
  subReq->length = sizeof(SSubmitReq) + sizeof(SSubmitBlk) + schemaLen + dataLen;
  subReq->numOfBlocks = 1;

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  if (NULL == pQuery) {
    uError("create SQuery error");
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->haveResultSet = false;
  pQuery->msgType = TDMT_VND_SUBMIT;
L
Liu Jicong 已提交
2931
  pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
wmmhello's avatar
wmmhello 已提交
2932 2933 2934 2935 2936
  if (NULL == pQuery->pRoot) {
    uError("create pQuery->pRoot error");
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
L
Liu Jicong 已提交
2937
  SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
wmmhello's avatar
wmmhello 已提交
2938 2939 2940
  nodeStmt->payloadType = PAYLOAD_TYPE_KV;
  nodeStmt->pDataBlocks = taosArrayInit(1, POINTER_BYTES);

L
Liu Jicong 已提交
2941
  SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
wmmhello's avatar
wmmhello 已提交
2942 2943 2944 2945 2946 2947 2948 2949 2950 2951 2952 2953 2954
  if (NULL == dst) {
    code = TSDB_CODE_TSC_OUT_OF_MEMORY;
    goto end;
  }
  dst->vg = vgData;
  dst->numOfTables = subReq->numOfBlocks;
  dst->size = subReq->length;
  dst->pData = (char*)subReq;
  subReq->header.vgId = htonl(dst->vg.vgId);
  subReq->version = htonl(1);
  subReq->header.contLen = htonl(subReq->length);
  subReq->length = htonl(subReq->length);
  subReq->numOfBlocks = htonl(subReq->numOfBlocks);
L
Liu Jicong 已提交
2955
  subReq = NULL;  // no need free
wmmhello's avatar
wmmhello 已提交
2956 2957 2958 2959 2960 2961 2962 2963 2964 2965 2966
  taosArrayPush(nodeStmt->pDataBlocks, &dst);

  launchQueryImpl(pRequest, pQuery, true, NULL);
  code = pRequest->code;

end:
  taosMemoryFreeClear(pTableMeta);
  qDestroyQuery(pQuery);
  return code;
}

L
Liu Jicong 已提交
2967 2968 2969 2970
static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) {
  int32_t   code = TSDB_CODE_SUCCESS;
  SHashObj* pVgHash = NULL;
  SQuery*   pQuery = NULL;
wmmhello's avatar
wmmhello 已提交
2971
  SMqRspObj rspObj = {0};
L
Liu Jicong 已提交
2972
  SDecoder  decoder = {0};
2973 2974 2975

  terrno = TSDB_CODE_SUCCESS;
  SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT);
L
Liu Jicong 已提交
2976
  if (!pRequest) {
2977 2978 2979 2980
    uError("WriteRaw:createRequest error request is null");
    return terrno;
  }

wmmhello's avatar
wmmhello 已提交
2981 2982 2983 2984 2985
  rspObj.resIter = -1;
  rspObj.resType = RES_TYPE__TMQ;

  tDecoderInit(&decoder, data, dataLen);
  code = tDecodeSMqDataRsp(&decoder, &rspObj.rsp);
L
Liu Jicong 已提交
2986
  if (code != 0) {
wmmhello's avatar
wmmhello 已提交
2987 2988 2989 2990 2991
    uError("WriteRaw:decode smqDataRsp error");
    code = TSDB_CODE_INVALID_MSG;
    goto end;
  }

2992 2993 2994 2995 2996 2997 2998 2999
  if (!pRequest->pDb) {
    uError("WriteRaw:not use db");
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }

  pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
  taosHashSetFreeFp(pVgHash, destroyVgHash);
L
Liu Jicong 已提交
3000
  struct SCatalog* pCatalog = NULL;
3001
  code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
L
Liu Jicong 已提交
3002
  if (code != TSDB_CODE_SUCCESS) {
3003 3004 3005 3006 3007 3008 3009 3010 3011
    uError("WriteRaw: get gatlog error");
    goto end;
  }

  SRequestConnInfo conn = {0};
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
  conn.requestId = pRequest->requestId;
  conn.requestObjRefId = pRequest->self;
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
wmmhello's avatar
wmmhello 已提交
3012 3013 3014 3015 3016 3017

  printf("raw data block num:%d\n", rspObj.rsp.blockNum);
  while (++rspObj.resIter < rspObj.rsp.blockNum) {
    SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter);
    if (!rspObj.rsp.withSchema) {
      uError("WriteRaw:no schema, iter:%d", rspObj.resIter);
3018 3019
      goto end;
    }
wmmhello's avatar
wmmhello 已提交
3020 3021
    SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter);
    setResSchemaInfo(&rspObj.resInfo, pSW->pSchema, pSW->nCols);
3022

wmmhello's avatar
wmmhello 已提交
3023
    code = setQueryResultFromRsp(&rspObj.resInfo, pRetrieve, false, false);
L
Liu Jicong 已提交
3024
    if (code != TSDB_CODE_SUCCESS) {
3025 3026 3027 3028 3029
      uError("WriteRaw: setQueryResultFromRsp error");
      goto end;
    }

    uint16_t fLen = 0;
L
Liu Jicong 已提交
3030 3031
    int32_t  rowSize = 0;
    int16_t  nVar = 0;
3032
    for (int i = 0; i < pSW->nCols; i++) {
L
Liu Jicong 已提交
3033
      SSchema* schema = pSW->pSchema + i;
3034 3035
      fLen += TYPE_BYTES[schema->type];
      rowSize += schema->bytes;
L
Liu Jicong 已提交
3036 3037
      if (IS_VAR_DATA_TYPE(schema->type)) {
        nVar++;
3038 3039 3040
      }
    }

wmmhello's avatar
wmmhello 已提交
3041
    int32_t rows = rspObj.resInfo.numOfRows;
3042 3043 3044 3045 3046
    int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
                              (int32_t)TD_BITMAP_BYTES(pSW->nCols - 1);
    int32_t schemaLen = 0;
    int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;

wmmhello's avatar
wmmhello 已提交
3047
    const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter);
L
Liu Jicong 已提交
3048
    if (!tbName) {
3049 3050 3051 3052 3053
      uError("WriteRaw: tbname is null");
      code = TSDB_CODE_TMQ_INVALID_MSG;
      goto end;
    }

3054
    printf("raw data tbname:%s\n", tbName);
3055 3056 3057 3058 3059 3060 3061 3062 3063 3064 3065 3066 3067
    SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
    strcpy(pName.dbname, pRequest->pDb);
    strcpy(pName.tname, tbName);

    VgData vgData = {0};
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &(vgData.vg));
    if (code != TSDB_CODE_SUCCESS) {
      uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName);
      goto end;
    }

    SSubmitReq* subReq = NULL;
    SSubmitBlk* blk = NULL;
L
Liu Jicong 已提交
3068 3069
    void*       hData = taosHashGet(pVgHash, &vgData.vg.vgId, sizeof(vgData.vg.vgId));
    if (hData) {
3070 3071 3072
      vgData = *(VgData*)hData;

      int32_t totalLen = ((SSubmitReq*)(vgData.data))->length + submitLen;
L
Liu Jicong 已提交
3073
      void*   tmp = taosMemoryRealloc(vgData.data, totalLen);
3074 3075 3076 3077 3078 3079 3080 3081
      if (tmp == NULL) {
        code = TSDB_CODE_TSC_OUT_OF_MEMORY;
        goto end;
      }
      vgData.data = tmp;
      ((VgData*)hData)->data = tmp;
      subReq = (SSubmitReq*)(vgData.data);
      blk = POINTER_SHIFT(vgData.data, subReq->length);
L
Liu Jicong 已提交
3082
    } else {
3083
      int32_t totalLen = sizeof(SSubmitReq) + submitLen;
L
Liu Jicong 已提交
3084
      void*   tmp = taosMemoryCalloc(1, totalLen);
3085 3086 3087 3088 3089
      if (tmp == NULL) {
        code = TSDB_CODE_TSC_OUT_OF_MEMORY;
        goto end;
      }
      vgData.data = tmp;
L
Liu Jicong 已提交
3090
      taosHashPut(pVgHash, (const char*)&vgData.vg.vgId, sizeof(vgData.vg.vgId), (char*)&vgData, sizeof(vgData));
3091 3092 3093 3094 3095 3096 3097
      subReq = (SSubmitReq*)(vgData.data);
      subReq->length = sizeof(SSubmitReq);
      subReq->numOfBlocks = 0;

      blk = POINTER_SHIFT(vgData.data, sizeof(SSubmitReq));
    }

3098 3099
    STableMeta* pTableMeta = NULL;
    code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
3100 3101 3102 3103
    if (code != TSDB_CODE_SUCCESS) {
      uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbName);
      goto end;
    }
3104 3105 3106
    uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid);
    uint64_t uid = pTableMeta->uid;
    taosMemoryFreeClear(pTableMeta);
3107

L
Liu Jicong 已提交
3108
    void*   blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk));
3109 3110 3111 3112 3113 3114 3115 3116 3117 3118
    STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);

    SRowBuilder rb = {0};
    tdSRowInit(&rb, pSW->version);
    tdSRowSetTpInfo(&rb, pSW->nCols, fLen);
    int32_t dataLen = 0;

    for (int32_t j = 0; j < rows; j++) {
      tdSRowResetBuf(&rb, rowData);

wmmhello's avatar
wmmhello 已提交
3119 3120
      doSetOneRowPtr(&rspObj.resInfo);
      rspObj.resInfo.current += 1;
3121 3122 3123

      int32_t offset = 0;
      for (int32_t k = 0; k < pSW->nCols; k++) {
L
Liu Jicong 已提交
3124 3125
        const SSchema* pColumn = &pSW->pSchema[k];
        char*          data = rspObj.resInfo.row[k];
3126 3127 3128
        if (!data) {
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
        } else {
L
Liu Jicong 已提交
3129
          if (IS_VAR_DATA_TYPE(pColumn->type)) {
3130 3131 3132 3133 3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144
            data -= VARSTR_HEADER_SIZE;
          }
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k);
        }
        offset += TYPE_BYTES[pColumn->type];
      }
      int32_t rowLen = TD_ROW_LEN(rowData);
      rowData = POINTER_SHIFT(rowData, rowLen);
      dataLen += rowLen;
    }

    blk->uid = htobe64(uid);
    blk->suid = htobe64(suid);
    blk->sversion = htonl(pSW->version);
    blk->schemaLen = htonl(schemaLen);
3145
    blk->numOfRows = htonl(rows);
3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159
    blk->dataLen = htonl(dataLen);
    subReq->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
    subReq->numOfBlocks++;
  }

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  if (NULL == pQuery) {
    uError("create SQuery error");
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->haveResultSet = false;
  pQuery->msgType = TDMT_VND_SUBMIT;
L
Liu Jicong 已提交
3160
  pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
3161 3162 3163 3164 3165
  if (NULL == pQuery->pRoot) {
    uError("create pQuery->pRoot error");
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
L
Liu Jicong 已提交
3166
  SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
3167 3168 3169 3170 3171
  nodeStmt->payloadType = PAYLOAD_TYPE_KV;

  int32_t numOfVg = taosHashGetSize(pVgHash);
  nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);

L
Liu Jicong 已提交
3172
  VgData* vData = (VgData*)taosHashIterate(pVgHash, NULL);
3173
  while (vData) {
L
Liu Jicong 已提交
3174
    SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
3175 3176 3177 3178
    if (NULL == dst) {
      code = TSDB_CODE_TSC_OUT_OF_MEMORY;
      goto end;
    }
3179 3180
    dst->vg = vData->vg;
    SSubmitReq* subReq = (SSubmitReq*)(vData->data);
3181 3182 3183
    dst->numOfTables = subReq->numOfBlocks;
    dst->size = subReq->length;
    dst->pData = (char*)subReq;
L
Liu Jicong 已提交
3184
    vData->data = NULL;  // no need free
3185 3186 3187 3188 3189 3190
    subReq->header.vgId = htonl(dst->vg.vgId);
    subReq->version = htonl(1);
    subReq->header.contLen = htonl(subReq->length);
    subReq->length = htonl(subReq->length);
    subReq->numOfBlocks = htonl(subReq->numOfBlocks);
    taosArrayPush(nodeStmt->pDataBlocks, &dst);
L
Liu Jicong 已提交
3191
    vData = (VgData*)taosHashIterate(pVgHash, vData);
3192 3193 3194 3195
  }

  launchQueryImpl(pRequest, pQuery, true, NULL);
  code = pRequest->code;
wmmhello's avatar
wmmhello 已提交
3196

3197
end:
wmmhello's avatar
wmmhello 已提交
3198 3199
  tDecoderClear(&decoder);
  taos_free_result(&rspObj);
3200 3201 3202 3203
  qDestroyQuery(pQuery);
  destroyRequest(pRequest);
  taosHashCleanup(pVgHash);
  return code;
wmmhello's avatar
wmmhello 已提交
3204 3205
}

wmmhello's avatar
wmmhello 已提交
3206 3207 3208 3209 3210 3211 3212 3213 3214 3215 3216 3217 3218 3219 3220 3221 3222 3223 3224 3225 3226 3227 3228 3229
char* tmq_get_json_meta(TAOS_RES* res) {
  if (!TD_RES_TMQ_META(res)) {
    return NULL;
  }

  SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
  if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB) {
    return processCreateStb(&pMetaRspObj->metaRsp);
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_STB) {
    return processAlterStb(&pMetaRspObj->metaRsp);
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_STB) {
    return processDropSTable(&pMetaRspObj->metaRsp);
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_TABLE) {
    return processCreateTable(&pMetaRspObj->metaRsp);
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_TABLE) {
    return processAlterTable(&pMetaRspObj->metaRsp);
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE) {
    return processDropTable(&pMetaRspObj->metaRsp);
  }
  return NULL;
}

void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }

L
Liu Jicong 已提交
3230 3231
int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
  if (!raw || !res) {
wmmhello's avatar
wmmhello 已提交
3232 3233 3234 3235 3236 3237 3238
    return TSDB_CODE_INVALID_PARA;
  }
  if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    raw->raw = pMetaRspObj->metaRsp.metaRsp;
    raw->raw_len = pMetaRspObj->metaRsp.metaRspLen;
    raw->raw_type = pMetaRspObj->metaRsp.resMsgType;
L
Liu Jicong 已提交
3239 3240
  } else if (TD_RES_TMQ(res)) {
    SMqRspObj* rspObj = ((SMqRspObj*)res);
wmmhello's avatar
wmmhello 已提交
3241 3242 3243 3244 3245 3246 3247 3248

    int32_t len = 0;
    int32_t code = 0;
    tEncodeSize(tEncodeSMqDataRsp, &rspObj->rsp, len, code);
    if (code < 0) {
      return -1;
    }

L
Liu Jicong 已提交
3249
    void*    buf = taosMemoryCalloc(1, len);
wmmhello's avatar
wmmhello 已提交
3250 3251 3252 3253 3254 3255 3256 3257 3258 3259 3260 3261 3262 3263 3264
    SEncoder encoder = {0};
    tEncoderInit(&encoder, buf, len);
    tEncodeSMqDataRsp(&encoder, &rspObj->rsp);
    tEncoderClear(&encoder);

    raw->raw = buf;
    raw->raw_len = len;
    raw->raw_type = RES_TYPE__TMQ;
  } else {
    return TSDB_CODE_TMQ_INVALID_MSG;
  }
  return TSDB_CODE_SUCCESS;
}

void tmq_free_raw(tmq_raw_data raw) {
L
Liu Jicong 已提交
3265
  if (raw.raw_type == RES_TYPE__TMQ) {
wmmhello's avatar
wmmhello 已提交
3266 3267 3268 3269
    taosMemoryFree(raw.raw);
  }
}

L
Liu Jicong 已提交
3270
int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
wmmhello's avatar
wmmhello 已提交
3271 3272 3273 3274
  if (!taos) {
    return TSDB_CODE_INVALID_PARA;
  }

L
Liu Jicong 已提交
3275
  if (raw.raw_type == TDMT_VND_CREATE_STB) {
wmmhello's avatar
wmmhello 已提交
3276
    return taosCreateStb(taos, raw.raw, raw.raw_len);
L
Liu Jicong 已提交
3277
  } else if (raw.raw_type == TDMT_VND_ALTER_STB) {
wmmhello's avatar
wmmhello 已提交
3278
    return taosCreateStb(taos, raw.raw, raw.raw_len);
L
Liu Jicong 已提交
3279
  } else if (raw.raw_type == TDMT_VND_DROP_STB) {
wmmhello's avatar
wmmhello 已提交
3280
    return taosDropStb(taos, raw.raw, raw.raw_len);
L
Liu Jicong 已提交
3281
  } else if (raw.raw_type == TDMT_VND_CREATE_TABLE) {
wmmhello's avatar
wmmhello 已提交
3282
    return taosCreateTable(taos, raw.raw, raw.raw_len);
L
Liu Jicong 已提交
3283
  } else if (raw.raw_type == TDMT_VND_ALTER_TABLE) {
wmmhello's avatar
wmmhello 已提交
3284
    return taosAlterTable(taos, raw.raw, raw.raw_len);
L
Liu Jicong 已提交
3285
  } else if (raw.raw_type == TDMT_VND_DROP_TABLE) {
wmmhello's avatar
wmmhello 已提交
3286
    return taosDropTable(taos, raw.raw, raw.raw_len);
L
Liu Jicong 已提交
3287
  } else if (raw.raw_type == TDMT_VND_DELETE) {
wmmhello's avatar
wmmhello 已提交
3288
    return taosDeleteData(taos, raw.raw, raw.raw_len);
L
Liu Jicong 已提交
3289
  } else if (raw.raw_type == RES_TYPE__TMQ) {
wmmhello's avatar
wmmhello 已提交
3290 3291 3292 3293 3294
    return tmqWriteRaw(taos, raw.raw, raw.raw_len);
  }
  return TSDB_CODE_INVALID_PARA;
}

L
Liu Jicong 已提交
3295
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
3296
  //
L
Liu Jicong 已提交
3297
  tmqCommitInner(tmq, msg, 0, 1, cb, param);
L
Liu Jicong 已提交
3298 3299
}

3300 3301
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) {
  //
L
Liu Jicong 已提交
3302
  return tmqCommitInner(tmq, msg, 0, 0, NULL, NULL);
3303
}