tmq.c 100.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "cJSON.h"
17 18 19
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22 23
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
24
#include "tqueue.h"
25
#include "tref.h"
L
Liu Jicong 已提交
26 27
#include "ttimer.h"

L
Liu Jicong 已提交
28 29 30 31 32 33 34 35
int32_t tmqAskEp(tmq_t* tmq, bool async);

typedef struct {
  int8_t inited;
  tmr_h  timer;
} SMqMgmt;

static SMqMgmt tmqMgmt = {0};
36

L
Liu Jicong 已提交
37 38 39 40 41 42
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
43 44 45
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
46 47
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
48
struct tmq_list_t {
L
Liu Jicong 已提交
49
  SArray container;
L
Liu Jicong 已提交
50
};
L
Liu Jicong 已提交
51

L
Liu Jicong 已提交
52
struct tmq_conf_t {
53 54 55 56 57 58 59 60 61 62
  char    clientId[256];
  char    groupId[TSDB_CGROUP_LEN];
  int8_t  autoCommit;
  int8_t  resetOffset;
  int8_t  withTbName;
  int8_t  ssEnable;
  int32_t ssBatchSize;

  bool hbBgEnable;

63 64 65 66 67
  uint16_t       port;
  int32_t        autoCommitInterval;
  char*          ip;
  char*          user;
  char*          pass;
68
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
69
  void*          commitCbUserParam;
L
Liu Jicong 已提交
70 71 72
};

struct tmq_t {
L
Liu Jicong 已提交
73
  // conf
74 75 76 77 78 79 80 81 82 83 84
  char    groupId[TSDB_CGROUP_LEN];
  char    clientId[256];
  int8_t  withTbName;
  int8_t  useSnapshot;
  int8_t  autoCommit;
  int32_t autoCommitInterval;
  int32_t resetOffsetCfg;
  int64_t consumerId;

  bool hbBgEnable;

L
Liu Jicong 已提交
85 86
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
87 88 89 90

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
91 92
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
93
  int32_t epSkipCnt;
L
Liu Jicong 已提交
94
#endif
L
Liu Jicong 已提交
95 96
  int64_t pollCnt;

L
Liu Jicong 已提交
97
  // timer
98 99
  tmr_h hbLiveTimer;
  tmr_h epTimer;
L
Liu Jicong 已提交
100 101 102
  tmr_h reportTimer;
  tmr_h commitTimer;

L
Liu Jicong 已提交
103 104 105 106
  // connection
  STscObj* pTscObj;

  // container
L
Liu Jicong 已提交
107
  SArray*     clientTopics;  // SArray<SMqClientTopic>
L
Liu Jicong 已提交
108
  STaosQueue* mqueue;        // queue of rsp
L
Liu Jicong 已提交
109
  STaosQall*  qall;
L
Liu Jicong 已提交
110 111 112 113
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit

  // ctl
  tsem_t rspSem;
L
Liu Jicong 已提交
114 115
};

X
Xiaoyu Wang 已提交
116 117 118 119 120 121 122 123
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
124
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
125
  TMQ_CONSUMER_STATUS__RECOVER,
L
Liu Jicong 已提交
126 127
};

L
Liu Jicong 已提交
128
enum {
129
  TMQ_DELAYED_TASK__ASK_EP = 1,
L
Liu Jicong 已提交
130 131 132 133
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

L
Liu Jicong 已提交
134
typedef struct {
135 136 137
  // statistics
  int64_t pollCnt;
  // offset
L
Liu Jicong 已提交
138 139
  STqOffsetVal committedOffset;
  STqOffsetVal currentOffset;
L
Liu Jicong 已提交
140
  // connection info
141
  int32_t vgId;
X
Xiaoyu Wang 已提交
142
  int32_t vgStatus;
L
Liu Jicong 已提交
143
  int32_t vgSkipCnt;
144 145 146
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
147
typedef struct {
148
  // subscribe info
L
Liu Jicong 已提交
149
  char* topicName;
L
Liu Jicong 已提交
150
  char  db[TSDB_DB_FNAME_LEN];
L
Liu Jicong 已提交
151 152 153

  SArray* vgs;  // SArray<SMqClientVg>

L
Liu Jicong 已提交
154
  SSchemaWrapper schema;
155 156
} SMqClientTopic;

L
Liu Jicong 已提交
157 158 159 160 161
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
162
  union {
L
Liu Jicong 已提交
163 164
    SMqDataRsp dataRsp;
    SMqMetaRsp metaRsp;
L
Liu Jicong 已提交
165
  };
L
Liu Jicong 已提交
166 167
} SMqPollRspWrapper;

L
Liu Jicong 已提交
168
typedef struct {
L
Liu Jicong 已提交
169 170 171
  tmq_t*  tmq;
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
172
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
173

L
Liu Jicong 已提交
174
typedef struct {
175
  tmq_t*  tmq;
L
Liu Jicong 已提交
176
  int32_t code;
L
Liu Jicong 已提交
177
  int32_t async;
X
Xiaoyu Wang 已提交
178
  tsem_t  rspSem;
179 180
} SMqAskEpCbParam;

L
Liu Jicong 已提交
181
typedef struct {
L
Liu Jicong 已提交
182 183
  tmq_t*          tmq;
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
184
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
185
  int32_t         epoch;
L
Liu Jicong 已提交
186
  int32_t         vgId;
L
Liu Jicong 已提交
187
  tsem_t          rspSem;
X
Xiaoyu Wang 已提交
188
} SMqPollCbParam;
189

190
typedef struct {
L
Liu Jicong 已提交
191 192 193
  tmq_t*         tmq;
  int8_t         automatic;
  int8_t         async;
L
Liu Jicong 已提交
194 195
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
L
Liu Jicong 已提交
196
  int32_t        rspErr;
197
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
198 199 200 201
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
  void*  userParam;
  tsem_t rspSem;
202 203 204 205 206
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
  STqOffset*           pOffset;
207
} SMqCommitCbParam;
208

209
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
210
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
211
  conf->withTbName = false;
L
Liu Jicong 已提交
212
  conf->autoCommit = true;
L
Liu Jicong 已提交
213
  conf->autoCommitInterval = 5000;
X
Xiaoyu Wang 已提交
214
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
215
  conf->hbBgEnable = true;
216 217 218
  return conf;
}

L
Liu Jicong 已提交
219
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
220 221 222 223 224 225
  if (conf) {
    if (conf->ip) taosMemoryFree(conf->ip);
    if (conf->user) taosMemoryFree(conf->user);
    if (conf->pass) taosMemoryFree(conf->pass);
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
226 227 228
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
229 230
  if (strcmp(key, "group.id") == 0) {
    strcpy(conf->groupId, value);
L
Liu Jicong 已提交
231
    return TMQ_CONF_OK;
232
  }
L
Liu Jicong 已提交
233

234 235
  if (strcmp(key, "client.id") == 0) {
    strcpy(conf->clientId, value);
L
Liu Jicong 已提交
236 237
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
238

L
Liu Jicong 已提交
239 240
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
241
      conf->autoCommit = true;
L
Liu Jicong 已提交
242 243
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
244
      conf->autoCommit = false;
L
Liu Jicong 已提交
245 246 247 248
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
249
  }
L
Liu Jicong 已提交
250

L
Liu Jicong 已提交
251 252 253 254 255
  if (strcmp(key, "auto.commit.interval.ms") == 0) {
    conf->autoCommitInterval = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
256 257 258 259 260 261 262 263 264 265 266 267 268 269
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
270

271 272
  if (strcmp(key, "msg.with.table.name") == 0) {
    if (strcmp(value, "true") == 0) {
273
      conf->withTbName = true;
L
Liu Jicong 已提交
274
      return TMQ_CONF_OK;
275
    } else if (strcmp(value, "false") == 0) {
276
      conf->withTbName = false;
L
Liu Jicong 已提交
277
      return TMQ_CONF_OK;
278 279 280 281 282
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
283
  if (strcmp(key, "experimental.snapshot.enable") == 0) {
L
Liu Jicong 已提交
284
    if (strcmp(value, "true") == 0) {
285 286 287 288 289 290 291 292 293 294 295 296 297
      conf->ssEnable = true;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
      conf->ssEnable = false;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

  if (strcmp(key, "enable.heartbeat.background") == 0) {
    if (strcmp(value, "true") == 0) {
      conf->hbBgEnable = true;
L
Liu Jicong 已提交
298 299
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
300
      conf->hbBgEnable = false;
L
Liu Jicong 已提交
301 302 303 304
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
305
    return TMQ_CONF_OK;
L
Liu Jicong 已提交
306 307
  }

L
Liu Jicong 已提交
308
  if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
309
    conf->ssBatchSize = atoi(value);
L
Liu Jicong 已提交
310 311 312
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
313
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
314 315 316
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
317
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
318 319 320
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
321
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
322 323 324
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
325
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
326 327 328
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
329
  if (strcmp(key, "td.connect.db") == 0) {
330
    /*conf->db = strdup(value);*/
L
Liu Jicong 已提交
331 332 333
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
334
  return TMQ_CONF_UNKNOWN;
335 336 337
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
338 339
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
340 341
}

L
Liu Jicong 已提交
342 343
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
344 345 346 347 348
  if (src == NULL || src[0] == 0) return -1;
  char* topic = strdup(src);
  if (topic[0] != '`') {
    strtolower(topic, src);
  }
L
fix  
Liu Jicong 已提交
349
  if (taosArrayPush(container, &topic) == NULL) return -1;
350 351 352
  return 0;
}

L
Liu Jicong 已提交
353
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
354
  SArray* container = &list->container;
L
Liu Jicong 已提交
355
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
356 357
}

L
Liu Jicong 已提交
358 359 360 361 362 363 364 365 366 367
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

L
Liu Jicong 已提交
368 369 370 371
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

372 373
int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
374 375
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
  // push into array
L
Liu Jicong 已提交
376
#if 0
377 378 379 380 381
  if (code == 0) {
    taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset);
  } else {
    taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
  }
L
Liu Jicong 已提交
382
#endif
L
Liu Jicong 已提交
383

S
Shengliang Guan 已提交
384
  /*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
L
Liu Jicong 已提交
385 386
   * pOffset->version);*/

387
  // count down waiting rsp
L
Liu Jicong 已提交
388
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
389 390
  ASSERT(waitingRspNum >= 0);

391 392
  taosMemoryFree(pParam);

393 394 395 396 397
  if (waitingRspNum == 0) {
    // if no more waiting rsp
    if (pParamSet->async) {
      // call async cb func
      if (pParamSet->automatic && pParamSet->tmq->commitCb) {
L
Liu Jicong 已提交
398
        pParamSet->tmq->commitCb(pParamSet->tmq, pParamSet->rspErr, pParamSet->tmq->commitCbUserParam);
399 400
      } else if (!pParamSet->automatic && pParamSet->userCb) {
        // sem post
L
Liu Jicong 已提交
401
        pParamSet->userCb(pParamSet->tmq, pParamSet->rspErr, pParamSet->userParam);
402
      }
L
Liu Jicong 已提交
403 404
    } else {
      tsem_post(&pParamSet->rspSem);
405 406
    }

407 408
    taosMemoryFree(pParamSet);

L
Liu Jicong 已提交
409
#if 0
410 411
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
L
Liu Jicong 已提交
412
#endif
413 414 415 416
  }
  return 0;
}

L
Liu Jicong 已提交
417 418 419 420 421 422
static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pTopic, SMqCommitCbParamSet* pParamSet) {
  STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
  if (pOffset == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
423
  pOffset->val = pVg->currentOffset;
424

L
Liu Jicong 已提交
425 426 427 428
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pOffset->subKey, tmq->groupId, groupLen);
  pOffset->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);
L
Liu Jicong 已提交
429

L
Liu Jicong 已提交
430 431 432 433 434 435 436 437 438 439
  int32_t len;
  int32_t code;
  tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
  if (code < 0) {
    ASSERT(0);
    return -1;
  }
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
  if (buf == NULL) return -1;
  ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
440

L
Liu Jicong 已提交
441 442 443 444 445 446 447
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
  tEncodeSTqOffset(&encoder, pOffset);

  // build param
448
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
449 450 451 452 453 454 455 456 457 458 459 460 461 462
  pParam->params = pParamSet;
  pParam->pOffset = pOffset;

  // build send info
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
    return -1;
  }
  pMsgSendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = sizeof(SMsgHead) + len,
      .handle = NULL,
  };

S
Shengliang Guan 已提交
463 464
  tscDebug("consumer:%" PRId64 ", commit offset of %s on vgId:%d, offset is %" PRId64, tmq->consumerId, pOffset->subKey,
           pVg->vgId, pOffset->val.version);
L
Liu Jicong 已提交
465 466

  // TODO: put into cb
L
Liu Jicong 已提交
467
  pVg->committedOffset = pVg->currentOffset;
L
Liu Jicong 已提交
468 469 470 471

  pMsgSendInfo->requestId = generateRequestId();
  pMsgSendInfo->requestObjRefId = 0;
  pMsgSendInfo->param = pParam;
L
Liu Jicong 已提交
472
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
473
  pMsgSendInfo->fp = tmqCommitCb;
L
Liu Jicong 已提交
474 475 476
  pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
  // send msg

L
Liu Jicong 已提交
477 478 479
  atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
  atomic_add_fetch_32(&pParamSet->totalRspNum, 1);

L
Liu Jicong 已提交
480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
  return 0;
}

int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
  char*   topic;
  int32_t vgId;
  ASSERT(msg != NULL);
  if (TD_RES_TMQ(msg)) {
    SMqRspObj* pRspObj = (SMqRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
  } else if (TD_RES_TMQ_META(msg)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
    topic = pMetaRspObj->topic;
    vgId = pMetaRspObj->vgId;
  } else {
    return TSDB_CODE_TMQ_INVALID_MSG;
  }

  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pParamSet->tmq = tmq;
  pParamSet->automatic = 0;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

L
Liu Jicong 已提交
513 514
  int32_t code = -1;

L
Liu Jicong 已提交
515 516 517 518 519 520 521
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(pTopic->topicName, topic) != 0) continue;
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      if (pVg->vgId != vgId) continue;

L
Liu Jicong 已提交
522
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
L
Liu Jicong 已提交
523 524
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          goto FAIL;
L
Liu Jicong 已提交
525
        }
L
Liu Jicong 已提交
526
        goto HANDLE_RSP;
L
Liu Jicong 已提交
527 528
      }
    }
L
Liu Jicong 已提交
529
  }
L
Liu Jicong 已提交
530

L
Liu Jicong 已提交
531 532 533 534
HANDLE_RSP:
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
535 536 537
    return 0;
  }

L
Liu Jicong 已提交
538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
    return code;
  } else {
    code = 0;
  }

FAIL:
  if (code != 0 && async) {
    userCb(tmq, code, userParam);
  }
  return 0;
}

L
Liu Jicong 已提交
554 555
int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                       void* userParam) {
L
Liu Jicong 已提交
556 557 558 559 560 561
  int32_t code = -1;

  if (msg != NULL) {
    return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
  }

562 563 564 565 566 567 568 569 570 571 572 573 574 575
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pParamSet->tmq = tmq;
  pParamSet->automatic = automatic;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
576

S
Shengliang Guan 已提交
577
    tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgNum %d", tmq->consumerId, pTopic->topicName,
L
Liu Jicong 已提交
578 579
             (int32_t)taosArrayGetSize(pTopic->vgs));

580
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
L
Liu Jicong 已提交
581 582
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);

S
Shengliang Guan 已提交
583 584
      tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgId:%d", tmq->consumerId, pTopic->topicName,
               pVg->vgId);
L
Liu Jicong 已提交
585

L
Liu Jicong 已提交
586 587 588
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
        tscDebug("consumer: %ld, vg:%d, current %ld, committed %ld", tmq->consumerId, pVg->vgId,
                 pVg->currentOffset.version, pVg->committedOffset.version);
L
Liu Jicong 已提交
589 590 591
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          continue;
        }
592 593 594 595
      }
    }
  }

L
Liu Jicong 已提交
596 597 598 599 600 601
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
    return 0;
  }

602 603 604 605 606 607 608 609 610 611
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
  } else {
    code = 0;
  }

  if (code != 0 && async) {
    if (automatic) {
L
Liu Jicong 已提交
612
      tmq->commitCb(tmq, code, tmq->commitCbUserParam);
613
    } else {
L
Liu Jicong 已提交
614
      userCb(tmq, code, userParam);
615 616 617 618
    }
  }

  if (!async) {
619
#if 0
620 621
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
L
Liu Jicong 已提交
622
#endif
623
  }
624 625 626 627

  return 0;
}

628
void tmqAssignAskEpTask(void* param, void* tmrId) {
L
Liu Jicong 已提交
629
  tmq_t*  tmq = (tmq_t*)param;
630
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
631
  *pTaskType = TMQ_DELAYED_TASK__ASK_EP;
L
Liu Jicong 已提交
632
  taosWriteQitem(tmq->delayedTask, pTaskType);
633
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
634 635 636 637
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
638
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
639 640
  *pTaskType = TMQ_DELAYED_TASK__COMMIT;
  taosWriteQitem(tmq->delayedTask, pTaskType);
641
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
642 643 644 645
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
646
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
647 648
  *pTaskType = TMQ_DELAYED_TASK__REPORT;
  taosWriteQitem(tmq->delayedTask, pTaskType);
649
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
650 651
}

652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
  if (pMsg && pMsg->pData) taosMemoryFree(pMsg->pData);
  return 0;
}

void tmqSendHbReq(void* param, void* tmrId) {
  // TODO replace with ref
  tmq_t*    tmq = (tmq_t*)param;
  int64_t   consumerId = tmq->consumerId;
  int32_t   epoch = tmq->epoch;
  SMqHbReq* pReq = taosMemoryMalloc(sizeof(SMqHbReq));
  if (pReq == NULL) goto OVER;
  pReq->consumerId = consumerId;
  pReq->epoch = epoch;

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pReq);
  }
  sendInfo->msgInfo = (SDataBuf){
      .pData = pReq,
      .len = sizeof(SMqHbReq),
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = NULL;
  sendInfo->fp = tmqHbCb;
  sendInfo->msgType = TDMT_MND_MQ_HB;

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

OVER:
  taosTmrReset(tmqSendHbReq, 1000, tmq, tmqMgmt.timer, &tmq->hbLiveTimer);
}

L
Liu Jicong 已提交
692 693 694 695 696 697 698 699
int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
  STaosQall* qall = taosAllocateQall();
  taosReadAllQitems(tmq->delayedTask, qall);
  while (1) {
    int8_t* pTaskType = NULL;
    taosGetQitem(qall, (void**)&pTaskType);
    if (pTaskType == NULL) break;

700
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
L
Liu Jicong 已提交
701
      tmqAskEp(tmq, true);
702
      taosTmrReset(tmqAssignAskEpTask, 1000, tmq, tmqMgmt.timer, &tmq->epTimer);
L
Liu Jicong 已提交
703
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
L
Liu Jicong 已提交
704
      tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
L
Liu Jicong 已提交
705 706 707 708 709
      taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer);
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    } else {
      ASSERT(0);
    }
L
Liu Jicong 已提交
710
    taosFreeQitem(pTaskType);
L
Liu Jicong 已提交
711 712 713 714 715
  }
  taosFreeQall(qall);
  return 0;
}

L
Liu Jicong 已提交
716
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
717
  SMqRspWrapper* msg = NULL;
L
Liu Jicong 已提交
718 719 720 721 722 723 724 725
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }

L
fix  
Liu Jicong 已提交
726
  msg = NULL;
L
Liu Jicong 已提交
727 728 729 730 731 732 733 734 735 736
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }
}

D
dapan1121 已提交
737
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
738 739
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
L
Liu Jicong 已提交
740
  /*tmq_t* tmq = pParam->tmq;*/
L
Liu Jicong 已提交
741 742 743
  tsem_post(&pParam->rspSem);
  return 0;
}
744

L
Liu Jicong 已提交
745
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
X
Xiaoyu Wang 已提交
746 747 748 749
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
750
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
751
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
752
  }
L
Liu Jicong 已提交
753
  return 0;
X
Xiaoyu Wang 已提交
754 755
}

L
Liu Jicong 已提交
756 757 758
int32_t tmq_unsubscribe(tmq_t* tmq) {
  tmq_list_t* lst = tmq_list_new();
  int32_t     rsp = tmq_subscribe(tmq, lst);
L
Liu Jicong 已提交
759 760
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
761 762
}

L
Liu Jicong 已提交
763
#if 0
764
tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
wafwerar's avatar
wafwerar 已提交
765
  tmq_t* pTmq = taosMemoryCalloc(sizeof(tmq_t), 1);
766 767 768 769 770 771
  if (pTmq == NULL) {
    return NULL;
  }
  pTmq->pTscObj = (STscObj*)conn;
  pTmq->status = 0;
  pTmq->pollCnt = 0;
L
Liu Jicong 已提交
772
  pTmq->epoch = 0;
L
fix txn  
Liu Jicong 已提交
773
  pTmq->epStatus = 0;
L
temp  
Liu Jicong 已提交
774
  pTmq->epSkipCnt = 0;
L
Liu Jicong 已提交
775
  // set conf
776 777
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
L
Liu Jicong 已提交
778
  pTmq->autoCommit = conf->autoCommit;
779
  pTmq->commit_cb = conf->commit_cb;
L
Liu Jicong 已提交
780
  pTmq->resetOffsetCfg = conf->resetOffset;
L
Liu Jicong 已提交
781

L
Liu Jicong 已提交
782 783 784 785 786 787 788 789 790 791
  pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  if (pTmq->clientTopics == NULL) {
    taosMemoryFree(pTmq);
    return NULL;
  }

  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();

L
Liu Jicong 已提交
792 793
  tsem_init(&pTmq->rspSem, 0, 0);

L
Liu Jicong 已提交
794 795
  return pTmq;
}
L
Liu Jicong 已提交
796
#endif
L
Liu Jicong 已提交
797

L
Liu Jicong 已提交
798
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
L
Liu Jicong 已提交
799 800 801 802 803 804 805 806 807 808 809
  // init timer
  int8_t inited = atomic_val_compare_exchange_8(&tmqMgmt.inited, 0, 1);
  if (inited == 0) {
    tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
    if (tmqMgmt.timer == NULL) {
      atomic_store_8(&tmqMgmt.inited, 0);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
  }

L
Liu Jicong 已提交
810 811
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
L
Liu Jicong 已提交
812
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
813 814
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
815 816
    return NULL;
  }
L
Liu Jicong 已提交
817

L
Liu Jicong 已提交
818 819 820 821 822
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

  ASSERT(user);
  ASSERT(pass);
L
Liu Jicong 已提交
823
  ASSERT(conf->groupId[0]);
L
Liu Jicong 已提交
824

L
Liu Jicong 已提交
825 826 827 828 829 830
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
  pTmq->delayedTask = taosOpenQueue();

  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
L
Liu Jicong 已提交
831
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
832 833
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
834 835
    goto FAIL;
  }
L
Liu Jicong 已提交
836

L
Liu Jicong 已提交
837 838
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
839 840
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
841 842
  /*pTmq->epStatus = 0;*/
  /*pTmq->epSkipCnt = 0;*/
L
Liu Jicong 已提交
843

L
Liu Jicong 已提交
844 845 846
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
847
  pTmq->withTbName = conf->withTbName;
848
  pTmq->useSnapshot = conf->ssEnable;
L
Liu Jicong 已提交
849
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
850
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
851 852
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
853 854
  pTmq->resetOffsetCfg = conf->resetOffset;

855 856
  pTmq->hbBgEnable = conf->hbBgEnable;

L
Liu Jicong 已提交
857
  // assign consumerId
L
Liu Jicong 已提交
858
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
859

L
Liu Jicong 已提交
860 861
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
S
Shengliang Guan 已提交
862 863
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
864 865
    goto FAIL;
  }
L
Liu Jicong 已提交
866

L
Liu Jicong 已提交
867 868 869
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
S
Shengliang Guan 已提交
870 871
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
872 873 874
    tsem_destroy(&pTmq->rspSem);
    goto FAIL;
  }
L
Liu Jicong 已提交
875

876 877 878 879
  if (pTmq->hbBgEnable) {
    pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pTmq, tmqMgmt.timer);
  }

S
Shengliang Guan 已提交
880
  tscInfo("consumer %" PRId64 " is setup, consumer group %s", pTmq->consumerId, pTmq->groupId);
L
Liu Jicong 已提交
881

882
  return pTmq;
L
Liu Jicong 已提交
883 884 885 886 887 888 889 890

FAIL:
  if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
  if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
  if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
  if (pTmq->qall) taosFreeQall(pTmq->qall);
  taosMemoryFree(pTmq);
  return NULL;
891 892
}

L
Liu Jicong 已提交
893
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
L
Liu Jicong 已提交
894 895 896 897 898
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
  SCMSubscribeReq req = {0};
  int32_t         code = -1;
899 900

  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
901
  tstrncpy(req.clientId, tmq->clientId, 256);
L
Liu Jicong 已提交
902
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
903
  req.topicNames = taosArrayInit(sz, sizeof(void*));
L
Liu Jicong 已提交
904
  if (req.topicNames == NULL) goto FAIL;
905

L
Liu Jicong 已提交
906 907
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
908 909

    SName name = {0};
L
Liu Jicong 已提交
910
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
911

L
Liu Jicong 已提交
912 913 914
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
915
    }
L
Liu Jicong 已提交
916
    tNameExtractFullName(&name, topicFName);
917

L
Liu Jicong 已提交
918 919 920
    tscDebug("subscribe topic: %s", topicFName);

    taosArrayPush(req.topicNames, &topicFName);
921 922
  }

L
Liu Jicong 已提交
923 924 925 926
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) goto FAIL;

927 928 929
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

930
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
931
  if (sendInfo == NULL) goto FAIL;
932

X
Xiaoyu Wang 已提交
933
  SMqSubscribeCbParam param = {
L
Liu Jicong 已提交
934
      .rspErr = 0,
X
Xiaoyu Wang 已提交
935 936
      .tmq = tmq,
  };
L
Liu Jicong 已提交
937

L
Liu Jicong 已提交
938 939 940
  if (tsem_init(&param.rspSem, 0, 0) != 0) goto FAIL;

  sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
941 942 943 944
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
945

L
Liu Jicong 已提交
946 947
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
948 949
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
950 951
  sendInfo->msgType = TDMT_MND_SUBSCRIBE;

952 953 954 955 956
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
957 958 959
  // avoid double free if msg is sent
  buf = NULL;

L
Liu Jicong 已提交
960 961
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
962

L
Liu Jicong 已提交
963 964 965
  code = param.rspErr;
  if (code != 0) goto FAIL;

L
Liu Jicong 已提交
966
  int32_t retryCnt = 0;
L
Liu Jicong 已提交
967
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
L
Liu Jicong 已提交
968 969 970
    if (retryCnt++ > 10) {
      goto FAIL;
    }
L
fix  
Liu Jicong 已提交
971
    tscDebug("consumer not ready, retry");
L
Liu Jicong 已提交
972 973
    taosMsleep(500);
  }
974

975 976 977
  // init ep timer
  if (tmq->epTimer == NULL) {
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, tmq, tmqMgmt.timer);
978
  }
L
Liu Jicong 已提交
979 980

  // init auto commit timer
981
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
L
Liu Jicong 已提交
982 983 984
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer);
  }

L
Liu Jicong 已提交
985 986 987
  code = 0;
FAIL:
  if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
988
  if (code != 0 && buf) {
L
Liu Jicong 已提交
989 990 991
    taosMemoryFree(buf);
  }
  return code;
992 993
}

L
Liu Jicong 已提交
994
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
995
  //
996
  conf->commitCb = cb;
L
Liu Jicong 已提交
997
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
998
}
999

D
dapan1121 已提交
1000
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1001 1002
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
1003
  SMqClientTopic* pTopic = pParam->pTopic;
X
Xiaoyu Wang 已提交
1004
  tmq_t*          tmq = pParam->tmq;
L
Liu Jicong 已提交
1005 1006 1007
  int32_t         vgId = pParam->vgId;
  int32_t         epoch = pParam->epoch;
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
1008
  if (code != 0) {
L
Liu Jicong 已提交
1009
    tscWarn("msg discard from vgId:%d, epoch %d, since %s", vgId, epoch, terrstr());
L
Liu Jicong 已提交
1010
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1011 1012 1013 1014
    if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
      goto CREATE_MSG_FAIL;
    }
L
Liu Jicong 已提交
1015 1016 1017 1018
    if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
      if (pRspWrapper == NULL) {
        taosMemoryFree(pMsg->pData);
S
Shengliang Guan 已提交
1019
        tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
Liu Jicong 已提交
1020 1021 1022 1023 1024 1025 1026 1027
        goto CREATE_MSG_FAIL;
      }
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      /*pRspWrapper->vgHandle = pVg;*/
      /*pRspWrapper->topicHandle = pTopic;*/
      taosWriteQitem(tmq->mqueue, pRspWrapper);
      tsem_post(&tmq->rspSem);
    }
L
fix txn  
Liu Jicong 已提交
1028
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1029 1030
  }

X
Xiaoyu Wang 已提交
1031 1032 1033
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
1034
    // do not write into queue since updating epoch reset
S
Shengliang Guan 已提交
1035
    tscWarn("msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d", vgId, msgEpoch,
L
Liu Jicong 已提交
1036
            tmqEpoch);
1037
    tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1038
    taosMemoryFree(pMsg->pData);
X
Xiaoyu Wang 已提交
1039 1040 1041 1042
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
S
Shengliang Guan 已提交
1043
    tscWarn("mismatch rsp from vgId:%d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
X
Xiaoyu Wang 已提交
1044 1045
  }

L
Liu Jicong 已提交
1046 1047 1048
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

1049
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1050
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1051
    taosMemoryFree(pMsg->pData);
S
Shengliang Guan 已提交
1052
    tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
fix txn  
Liu Jicong 已提交
1053
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1054
  }
L
Liu Jicong 已提交
1055

L
Liu Jicong 已提交
1056
  pRspWrapper->tmqRspType = rspType;
L
Liu Jicong 已提交
1057 1058
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
1059

L
Liu Jicong 已提交
1060
  if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1061 1062 1063
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
wmmhello's avatar
wmmhello 已提交
1064
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1065
    memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1066 1067 1068
  } else {
    ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP);
    tDecodeSMqMetaRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->metaRsp);
L
Liu Jicong 已提交
1069
    memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1070
  }
L
Liu Jicong 已提交
1071

L
Liu Jicong 已提交
1072
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1073

S
Shengliang Guan 已提交
1074 1075 1076
  tscDebug("consumer:%" PRId64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d",
           tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version,
           rspType);
L
fix  
Liu Jicong 已提交
1077

L
Liu Jicong 已提交
1078
  taosWriteQitem(tmq->mqueue, pRspWrapper);
1079
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1080

L
Liu Jicong 已提交
1081
  return 0;
L
fix txn  
Liu Jicong 已提交
1082
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1083
  if (epoch == tmq->epoch) {
L
Liu Jicong 已提交
1084 1085
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
1086
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1087
  return -1;
1088 1089
}

L
Liu Jicong 已提交
1090
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
1091 1092 1093 1094
  bool set = false;

  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
S
Shengliang Guan 已提交
1095
  tscDebug("consumer:%" PRId64 ", update ep epoch %d to epoch %d, topic num:%d", tmq->consumerId, tmq->epoch, epoch,
1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113
           topicNumGet);

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
S
Shengliang Guan 已提交
1114
      tscDebug("consumer:%" PRId64 ", new vg num: %d", tmq->consumerId, vgNumCur);
1115 1116 1117
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
        sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
L
Liu Jicong 已提交
1118
        char buf[80];
L
Liu Jicong 已提交
1119
        tFormatOffset(buf, 80, &pVgCur->currentOffset);
L
Liu Jicong 已提交
1120 1121
        tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch,
                 pVgCur->vgId, vgKey, buf);
L
Liu Jicong 已提交
1122
        taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal));
1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
    topic.schema = pTopicEp->schema;
    topic.topicName = strdup(pTopicEp->topic);
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);

S
Shengliang Guan 已提交
1134
    tscDebug("consumer:%" PRId64 ", update topic: %s", tmq->consumerId, topic.topicName);
1135 1136 1137 1138 1139 1140

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
L
Liu Jicong 已提交
1141 1142
      STqOffsetVal* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      STqOffsetVal  offsetNew = {.type = tmq->resetOffsetCfg};
1143
      if (pOffset != NULL) {
L
Liu Jicong 已提交
1144
        offsetNew = *pOffset;
1145 1146 1147 1148
      }

      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1149
          .currentOffset = offsetNew,
X
Xiaoyu Wang 已提交
1150 1151 1152
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
L
Liu Jicong 已提交
1153
          .vgSkipCnt = 0,
X
Xiaoyu Wang 已提交
1154 1155 1156 1157
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
L
Liu Jicong 已提交
1158
    taosArrayPush(newTopics, &topic);
X
Xiaoyu Wang 已提交
1159
  }
L
Liu Jicong 已提交
1160
  if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
L
Liu Jicong 已提交
1161
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
1162
  tmq->clientTopics = newTopics;
1163

1164 1165 1166 1167
  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
1168

X
Xiaoyu Wang 已提交
1169 1170 1171 1172
  atomic_store_32(&tmq->epoch, epoch);
  return set;
}

D
dapan1121 已提交
1173
int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
1174
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1175
  tmq_t*           tmq = pParam->tmq;
L
Liu Jicong 已提交
1176
  int8_t           async = pParam->async;
L
Liu Jicong 已提交
1177
  pParam->code = code;
1178
  if (code != 0) {
S
Shengliang Guan 已提交
1179
    tscError("consumer:%" PRId64 ", get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->async);
L
Liu Jicong 已提交
1180
    goto END;
1181
  }
L
Liu Jicong 已提交
1182

L
Liu Jicong 已提交
1183
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1184
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1185
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1186 1187
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
S
Shengliang Guan 已提交
1188
  tscDebug("consumer:%" PRId64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1189 1190
  if (head->epoch <= epoch) {
    goto END;
1191
  }
L
Liu Jicong 已提交
1192

L
Liu Jicong 已提交
1193
  if (!async) {
L
Liu Jicong 已提交
1194 1195
    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
S
Shengliang Guan 已提交
1196 1197
    /*printf("rsp epoch %" PRId64 " sz %" PRId64 "\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %" PRId64 " sz %" PRId64 "\n", tmq->epoch, tmq->clientTopics->size);*/
L
Liu Jicong 已提交
1198
    tmqUpdateEp(tmq, head->epoch, &rsp);
L
Liu Jicong 已提交
1199
    tDeleteSMqAskEpRsp(&rsp);
X
Xiaoyu Wang 已提交
1200
  } else {
1201
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1202
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1203
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1204 1205
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1206
    }
L
Liu Jicong 已提交
1207 1208 1209
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1210
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1211

L
Liu Jicong 已提交
1212
    taosWriteQitem(tmq->mqueue, pWrapper);
1213
    tsem_post(&tmq->rspSem);
1214
  }
L
Liu Jicong 已提交
1215 1216

END:
L
Liu Jicong 已提交
1217
  /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1218
  if (!async) {
L
Liu Jicong 已提交
1219
    tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
1220 1221
  } else {
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
1222 1223
  }
  return code;
1224 1225
}

L
Liu Jicong 已提交
1226
int32_t tmqAskEp(tmq_t* tmq, bool async) {
L
Liu Jicong 已提交
1227
  int32_t code = 0;
L
Liu Jicong 已提交
1228
#if 0
L
Liu Jicong 已提交
1229
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
L
fix txn  
Liu Jicong 已提交
1230
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
1231
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
S
Shengliang Guan 已提交
1232
    tscTrace("consumer:%" PRId64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
1233
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
1234
  }
L
temp  
Liu Jicong 已提交
1235
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
1236
#endif
L
Liu Jicong 已提交
1237
  int32_t      tlen = sizeof(SMqAskEpReq);
L
Liu Jicong 已提交
1238
  SMqAskEpReq* req = taosMemoryCalloc(1, tlen);
L
Liu Jicong 已提交
1239
  if (req == NULL) {
L
Liu Jicong 已提交
1240
    tscError("failed to malloc get subscribe ep buf");
L
Liu Jicong 已提交
1241
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1242
    return -1;
L
Liu Jicong 已提交
1243
  }
L
Liu Jicong 已提交
1244 1245 1246
  req->consumerId = htobe64(tmq->consumerId);
  req->epoch = htonl(tmq->epoch);
  strcpy(req->cgroup, tmq->groupId);
1247

L
Liu Jicong 已提交
1248
  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
1249 1250
  if (pParam == NULL) {
    tscError("failed to malloc subscribe param");
wafwerar's avatar
wafwerar 已提交
1251
    taosMemoryFree(req);
L
Liu Jicong 已提交
1252
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1253
    return -1;
L
Liu Jicong 已提交
1254 1255
  }
  pParam->tmq = tmq;
L
Liu Jicong 已提交
1256
  pParam->async = async;
X
Xiaoyu Wang 已提交
1257
  tsem_init(&pParam->rspSem, 0, 0);
1258

1259
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1260 1261
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1262 1263
    taosMemoryFree(pParam);
    taosMemoryFree(req);
L
Liu Jicong 已提交
1264
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1265 1266 1267 1268 1269 1270 1271 1272 1273 1274
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
      .pData = req,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
L
Liu Jicong 已提交
1275 1276 1277
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1278
  sendInfo->msgType = TDMT_MND_MQ_ASK_EP;
1279

L
Liu Jicong 已提交
1280 1281
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

S
Shengliang Guan 已提交
1282
  tscDebug("consumer:%" PRId64 ", ask ep", tmq->consumerId);
L
add log  
Liu Jicong 已提交
1283

L
Liu Jicong 已提交
1284 1285
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1286

L
Liu Jicong 已提交
1287
  if (!async) {
L
Liu Jicong 已提交
1288 1289 1290 1291 1292
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }
  return code;
1293 1294
}

L
Liu Jicong 已提交
1295 1296
#if 0
int32_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
L
Liu Jicong 已提交
1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309
  const SMqOffset* pOffset = &offset->offset;
  if (strcmp(pOffset->cgroup, tmq->groupId) != 0) {
    return TMQ_RESP_ERR__FAIL;
  }
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* clientTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(clientTopic->topicName, pOffset->topicName) == 0) {
      int32_t vgSz = taosArrayGetSize(clientTopic->vgs);
      for (int32_t j = 0; j < vgSz; j++) {
        SMqClientVg* pVg = taosArrayGet(clientTopic->vgs, j);
        if (pVg->vgId == pOffset->vgId) {
          pVg->currentOffset = pOffset->offset;
L
Liu Jicong 已提交
1310
          tmqClearUnhandleMsg(tmq);
L
Liu Jicong 已提交
1311 1312 1313 1314 1315 1316 1317
          return TMQ_RESP_ERR__SUCCESS;
        }
      }
    }
  }
  return TMQ_RESP_ERR__FAIL;
}
L
Liu Jicong 已提交
1318
#endif
L
Liu Jicong 已提交
1319

1320
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1321
  SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq));
1322 1323 1324
  if (pReq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
1325

L
Liu Jicong 已提交
1326 1327 1328
  /*strcpy(pReq->topic, pTopic->topicName);*/
  /*strcpy(pReq->cgroup, tmq->groupId);*/

L
Liu Jicong 已提交
1329 1330 1331 1332
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1333

1334
  pReq->withTbName = tmq->withTbName;
1335
  pReq->timeout = timeout;
L
Liu Jicong 已提交
1336
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1337
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1338
  /*pReq->currentOffset = reqOffset;*/
L
Liu Jicong 已提交
1339
  pReq->reqOffset = pVg->currentOffset;
L
Liu Jicong 已提交
1340
  pReq->reqId = generateRequestId();
1341

L
Liu Jicong 已提交
1342 1343
  pReq->useSnapshot = tmq->useSnapshot;

1344
  pReq->head.vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
1345
  pReq->head.contLen = htonl(sizeof(SMqPollReq));
1346 1347 1348
  return pReq;
}

L
Liu Jicong 已提交
1349 1350
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
L
Liu Jicong 已提交
1351
  pRspObj->resType = RES_TYPE__TMQ_META;
L
Liu Jicong 已提交
1352 1353 1354 1355 1356 1357 1358 1359
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

L
Liu Jicong 已提交
1360 1361 1362
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1363 1364
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1365
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1366
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1367
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
L
Liu Jicong 已提交
1368

L
Liu Jicong 已提交
1369 1370
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
L
Liu Jicong 已提交
1371
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1372 1373
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1374

L
Liu Jicong 已提交
1375
  return pRspObj;
X
Xiaoyu Wang 已提交
1376 1377
}

1378
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1379
  /*tscDebug("call poll");*/
X
Xiaoyu Wang 已提交
1380 1381 1382 1383 1384 1385
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
      if (vgStatus != TMQ_VG_STATUS__IDLE) {
L
Liu Jicong 已提交
1386
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
L
Liu Jicong 已提交
1387 1388
        tscTrace("consumer:%" PRId64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId,
                 vgSkipCnt);
X
Xiaoyu Wang 已提交
1389
        continue;
L
Liu Jicong 已提交
1390
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1391 1392 1393 1394
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
S
Shengliang Guan 已提交
1395
        tscDebug("consumer:%" PRId64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId);
L
temp  
Liu Jicong 已提交
1396 1397
        }
#endif
X
Xiaoyu Wang 已提交
1398
      }
L
Liu Jicong 已提交
1399
      atomic_store_32(&pVg->vgSkipCnt, 0);
1400
      SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, timeout, pTopic, pVg);
X
Xiaoyu Wang 已提交
1401 1402
      if (pReq == NULL) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1403
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1404 1405
        return -1;
      }
wafwerar's avatar
wafwerar 已提交
1406
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1407
      if (pParam == NULL) {
wafwerar's avatar
wafwerar 已提交
1408
        taosMemoryFree(pReq);
X
Xiaoyu Wang 已提交
1409
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1410
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1411 1412
        return -1;
      }
L
Liu Jicong 已提交
1413 1414
      pParam->tmq = tmq;
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1415
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1416
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1417 1418
      pParam->epoch = tmq->epoch;

1419
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1420
      if (sendInfo == NULL) {
wafwerar's avatar
wafwerar 已提交
1421 1422
        taosMemoryFree(pReq);
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1423
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1424
        tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1425 1426 1427 1428
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1429
          .pData = pReq,
L
Liu Jicong 已提交
1430
          .len = sizeof(SMqPollReq),
X
Xiaoyu Wang 已提交
1431 1432
          .handle = NULL,
      };
L
Liu Jicong 已提交
1433
      sendInfo->requestId = pReq->reqId;
X
Xiaoyu Wang 已提交
1434
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1435
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1436
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1437
      sendInfo->msgType = TDMT_VND_CONSUME;
X
Xiaoyu Wang 已提交
1438 1439

      int64_t transporterId = 0;
L
fix  
Liu Jicong 已提交
1440
      /*printf("send poll\n");*/
L
Liu Jicong 已提交
1441

1442
      char offsetFormatBuf[80];
L
Liu Jicong 已提交
1443
      tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffset);
L
Liu Jicong 已提交
1444 1445
      tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64,
               tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId);
S
Shengliang Guan 已提交
1446
      /*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/
X
Xiaoyu Wang 已提交
1447 1448 1449 1450 1451 1452 1453 1454
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
  return 0;
}

L
Liu Jicong 已提交
1455 1456
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1457
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1458 1459
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1460
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
L
Liu Jicong 已提交
1461
      tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1462
      /*tmqClearUnhandleMsg(tmq);*/
X
Xiaoyu Wang 已提交
1463 1464 1465 1466 1467 1468 1469 1470 1471 1472
      *pReset = true;
    } else {
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
1473
void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
X
Xiaoyu Wang 已提交
1474
  while (1) {
L
Liu Jicong 已提交
1475 1476 1477
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1478
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1479 1480
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
      if (rspWrapper == NULL) return NULL;
X
Xiaoyu Wang 已提交
1481 1482
    }

L
Liu Jicong 已提交
1483 1484 1485 1486 1487
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(rspWrapper);
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
      return NULL;
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1488
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1489
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
1490
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1491
      if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1492
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1493
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1494
         * rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1495
        pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;
X
Xiaoyu Wang 已提交
1496
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1497
        if (pollRspWrapper->dataRsp.blockNum == 0) {
L
Liu Jicong 已提交
1498 1499
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1500 1501
          continue;
        }
L
Liu Jicong 已提交
1502
        // build rsp
L
Liu Jicong 已提交
1503
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1504
        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1505
        return pRsp;
X
Xiaoyu Wang 已提交
1506
      } else {
L
Liu Jicong 已提交
1507 1508 1509 1510 1511 1512 1513
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
                 pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
        taosFreeQitem(pollRspWrapper);
      }
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1514
      if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1515
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1516
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1517
         * rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1518 1519
        pVg->currentOffset.version = pollRspWrapper->metaRsp.rspOffset;
        pVg->currentOffset.type = TMQ_OFFSET__LOG;
L
Liu Jicong 已提交
1520 1521
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // build rsp
L
Liu Jicong 已提交
1522
        SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1523 1524 1525 1526
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
L
Liu Jicong 已提交
1527
                 pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1528
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1529 1530
      }
    } else {
L
fix  
Liu Jicong 已提交
1531
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1532
      bool reset = false;
L
Liu Jicong 已提交
1533 1534
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1535
      if (pollIfReset && reset) {
S
Shengliang Guan 已提交
1536
        tscDebug("consumer:%" PRId64 ", reset and repoll", tmq->consumerId);
1537
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
1538 1539 1540 1541 1542
      }
    }
  }
}

1543
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1544
  /*tscDebug("call poll1");*/
L
Liu Jicong 已提交
1545 1546
  void*   rspObj;
  int64_t startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1547

1548 1549 1550
#if 0
  tmqHandleAllDelayedTask(tmq);
  tmqPollImpl(tmq, timeout);
1551
  rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1552 1553
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1554
  }
1555
#endif
X
Xiaoyu Wang 已提交
1556

L
Liu Jicong 已提交
1557
  // in no topic status also need process delayed task
L
Liu Jicong 已提交
1558
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1559 1560 1561
    return NULL;
  }

L
Liu Jicong 已提交
1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
    int32_t retryCnt = 0;
    while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
      if (retryCnt++ > 10) {
        return NULL;
      }
      tscDebug("consumer not ready, retry");
      taosMsleep(500);
    }
  }

X
Xiaoyu Wang 已提交
1573
  while (1) {
L
Liu Jicong 已提交
1574
    tmqHandleAllDelayedTask(tmq);
1575
    if (tmqPollImpl(tmq, timeout) < 0) return NULL;
L
Liu Jicong 已提交
1576

1577
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1578 1579
    if (rspObj) {
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
1580 1581
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      return NULL;
X
Xiaoyu Wang 已提交
1582
    }
1583
    if (timeout != -1) {
X
Xiaoyu Wang 已提交
1584
      int64_t endTime = taosGetTimestampMs();
1585
      int64_t leftTime = endTime - startTime;
1586
      if (leftTime > timeout) {
S
Shengliang Guan 已提交
1587
        tscDebug("consumer:%" PRId64 ", (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
X
Xiaoyu Wang 已提交
1588 1589
        return NULL;
      }
1590
      tsem_timewait(&tmq->rspSem, leftTime * 1000);
L
Liu Jicong 已提交
1591 1592 1593
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
      tsem_timewait(&tmq->rspSem, 500 * 1000);
X
Xiaoyu Wang 已提交
1594 1595 1596 1597
    }
  }
}

L
Liu Jicong 已提交
1598
int32_t tmq_consumer_close(tmq_t* tmq) {
1599
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
L
Liu Jicong 已提交
1600 1601
    int32_t rsp = tmq_commit_sync(tmq, NULL);
    if (rsp != 0) {
L
Liu Jicong 已提交
1602
      return rsp;
1603 1604
    }

L
Liu Jicong 已提交
1605
    int32_t     retryCnt = 0;
1606
    tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
1607 1608 1609 1610 1611 1612 1613 1614 1615 1616
    while (1) {
      rsp = tmq_subscribe(tmq, lst);
      if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
        break;
      } else {
        retryCnt++;
        taosMsleep(500);
      }
    }

1617
    tmq_list_destroy(lst);
1618

L
Liu Jicong 已提交
1619 1620
    /*return rsp;*/
    return 0;
L
Liu Jicong 已提交
1621
  }
1622
  // TODO: free resources
L
Liu Jicong 已提交
1623
  return 0;
1624
}
L
Liu Jicong 已提交
1625

L
Liu Jicong 已提交
1626 1627
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
1628
    return "success";
L
Liu Jicong 已提交
1629
  } else if (err == -1) {
L
Liu Jicong 已提交
1630 1631 1632
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
1633 1634
  }
}
L
Liu Jicong 已提交
1635

L
Liu Jicong 已提交
1636 1637 1638 1639
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
wmmhello's avatar
wmmhello 已提交
1640 1641 1642 1643
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DELETE) {
      return TMQ_RES_DATA;
    }
L
Liu Jicong 已提交
1644 1645 1646 1647 1648 1649
    return TMQ_RES_TABLE_META;
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
1650
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
1651 1652
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
1653
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1654 1655 1656
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1657 1658 1659 1660 1661
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1662 1663 1664 1665
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1666 1667 1668
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1669 1670 1671 1672 1673
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1674 1675 1676 1677
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
1678 1679 1680
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
L
Liu Jicong 已提交
1681 1682 1683 1684
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
1685 1686 1687 1688 1689 1690 1691 1692

const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
1693
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
L
Liu Jicong 已提交
1694 1695 1696
  }
  return NULL;
}
1697

1698 1699
static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id,
                                  int8_t t) {
wmmhello's avatar
wmmhello 已提交
1700 1701 1702 1703 1704 1705 1706
  char*  string = NULL;
  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    return string;
  }
  cJSON* type = cJSON_CreateString("create");
  cJSON_AddItemToObject(json, "type", type);
wmmhello's avatar
wmmhello 已提交
1707

1708 1709 1710 1711
  //  char uid[32] = {0};
  //  sprintf(uid, "%"PRIi64, id);
  //  cJSON* id_ = cJSON_CreateString(uid);
  //  cJSON_AddItemToObject(json, "id", id_);
wmmhello's avatar
wmmhello 已提交
1712 1713 1714 1715
  cJSON* tableName = cJSON_CreateString(name);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super");
  cJSON_AddItemToObject(json, "tableType", tableType);
1716 1717
  //  cJSON* version = cJSON_CreateNumber(1);
  //  cJSON_AddItemToObject(json, "version", version);
wmmhello's avatar
wmmhello 已提交
1718 1719

  cJSON* columns = cJSON_CreateArray();
1720 1721 1722 1723
  for (int i = 0; i < schemaRow->nCols; i++) {
    cJSON*   column = cJSON_CreateObject();
    SSchema* s = schemaRow->pSchema + i;
    cJSON*   cname = cJSON_CreateString(s->name);
wmmhello's avatar
wmmhello 已提交
1724 1725 1726
    cJSON_AddItemToObject(column, "name", cname);
    cJSON* ctype = cJSON_CreateNumber(s->type);
    cJSON_AddItemToObject(column, "type", ctype);
1727
    if (s->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
1728
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
1729
      cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1730
      cJSON_AddItemToObject(column, "length", cbytes);
1731 1732 1733
    } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
      cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1734 1735
      cJSON_AddItemToObject(column, "length", cbytes);
    }
wmmhello's avatar
wmmhello 已提交
1736 1737 1738 1739 1740
    cJSON_AddItemToArray(columns, column);
  }
  cJSON_AddItemToObject(json, "columns", columns);

  cJSON* tags = cJSON_CreateArray();
1741 1742 1743 1744
  for (int i = 0; schemaTag && i < schemaTag->nCols; i++) {
    cJSON*   tag = cJSON_CreateObject();
    SSchema* s = schemaTag->pSchema + i;
    cJSON*   tname = cJSON_CreateString(s->name);
wmmhello's avatar
wmmhello 已提交
1745 1746 1747
    cJSON_AddItemToObject(tag, "name", tname);
    cJSON* ttype = cJSON_CreateNumber(s->type);
    cJSON_AddItemToObject(tag, "type", ttype);
1748
    if (s->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
1749
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
1750
      cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1751
      cJSON_AddItemToObject(tag, "length", cbytes);
1752 1753 1754
    } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
      cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1755 1756
      cJSON_AddItemToObject(tag, "length", cbytes);
    }
wmmhello's avatar
wmmhello 已提交
1757 1758 1759 1760 1761 1762 1763 1764 1765
    cJSON_AddItemToArray(tags, tag);
  }
  cJSON_AddItemToObject(json, "tags", tags);

  string = cJSON_PrintUnformatted(json);
  cJSON_Delete(json);
  return string;
}

1766 1767 1768 1769
static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) {
  SMAlterStbReq req = {0};
  cJSON*        json = NULL;
  char*         string = NULL;
wmmhello's avatar
wmmhello 已提交
1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780

  if (tDeserializeSMAlterStbReq(alterData, alterDataLen, &req) != 0) {
    goto end;
  }

  json = cJSON_CreateObject();
  if (json == NULL) {
    goto end;
  }
  cJSON* type = cJSON_CreateString("alter");
  cJSON_AddItemToObject(json, "type", type);
1781 1782
  //  cJSON* uid = cJSON_CreateNumber(id);
  //  cJSON_AddItemToObject(json, "uid", uid);
wmmhello's avatar
wmmhello 已提交
1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794
  SName name = {0};
  tNameFromString(&name, req.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
  cJSON* tableName = cJSON_CreateString(name.tname);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString("super");
  cJSON_AddItemToObject(json, "tableType", tableType);

  cJSON* alterType = cJSON_CreateNumber(req.alterType);
  cJSON_AddItemToObject(json, "alterType", alterType);
  switch (req.alterType) {
    case TSDB_ALTER_TABLE_ADD_TAG:
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
1795 1796
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
      cJSON*      colName = cJSON_CreateString(field->name);
wmmhello's avatar
wmmhello 已提交
1797 1798 1799 1800
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colType = cJSON_CreateNumber(field->type);
      cJSON_AddItemToObject(json, "colType", colType);

1801
      if (field->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
1802
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
1803
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1804
        cJSON_AddItemToObject(json, "colLength", cbytes);
1805 1806 1807
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1808 1809 1810 1811 1812
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
      break;
    }
    case TSDB_ALTER_TABLE_DROP_TAG:
1813 1814 1815
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
      cJSON*      colName = cJSON_CreateString(field->name);
wmmhello's avatar
wmmhello 已提交
1816 1817 1818 1819
      cJSON_AddItemToObject(json, "colName", colName);
      break;
    }
    case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
1820 1821 1822
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
      cJSON*      colName = cJSON_CreateString(field->name);
wmmhello's avatar
wmmhello 已提交
1823 1824 1825
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colType = cJSON_CreateNumber(field->type);
      cJSON_AddItemToObject(json, "colType", colType);
1826
      if (field->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
1827
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
1828
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1829
        cJSON_AddItemToObject(json, "colLength", cbytes);
1830 1831 1832
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1833 1834 1835 1836 1837
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
      break;
    }
    case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
1838 1839 1840 1841
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
      TAOS_FIELD* oldField = taosArrayGet(req.pFields, 0);
      TAOS_FIELD* newField = taosArrayGet(req.pFields, 1);
      cJSON*      colName = cJSON_CreateString(oldField->name);
wmmhello's avatar
wmmhello 已提交
1842 1843 1844 1845 1846 1847 1848 1849 1850 1851
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colNewName = cJSON_CreateString(newField->name);
      cJSON_AddItemToObject(json, "colNewName", colNewName);
      break;
    }
    default:
      break;
  }
  string = cJSON_PrintUnformatted(json);

1852
end:
wmmhello's avatar
wmmhello 已提交
1853 1854 1855 1856 1857
  cJSON_Delete(json);
  tFreeSMAltertbReq(&req);
  return string;
}

1858
static char* processCreateStb(SMqMetaRsp* metaRsp) {
wmmhello's avatar
wmmhello 已提交
1859 1860
  SVCreateStbReq req = {0};
  SDecoder       coder;
1861
  char*          string = NULL;
wmmhello's avatar
wmmhello 已提交
1862 1863

  // decode and process req
1864
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
1865 1866 1867 1868 1869 1870 1871 1872 1873 1874
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
    goto _err;
  }
  string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE);
  tDecoderClear(&coder);
  return string;

1875
_err:
wmmhello's avatar
wmmhello 已提交
1876 1877 1878 1879
  tDecoderClear(&coder);
  return string;
}

1880
static char* processAlterStb(SMqMetaRsp* metaRsp) {
wmmhello's avatar
wmmhello 已提交
1881 1882
  SVCreateStbReq req = {0};
  SDecoder       coder;
1883
  char*          string = NULL;
wmmhello's avatar
wmmhello 已提交
1884 1885

  // decode and process req
1886
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
1887 1888 1889 1890 1891 1892 1893 1894 1895 1896
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
    goto _err;
  }
  string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen);
  tDecoderClear(&coder);
  return string;

1897
_err:
wmmhello's avatar
wmmhello 已提交
1898 1899 1900 1901
  tDecoderClear(&coder);
  return string;
}

1902
static char* buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray* tagName, int64_t id, uint8_t tagNum) {
1903
  char*   string = NULL;
wmmhello's avatar
wmmhello 已提交
1904
  SArray* pTagVals = NULL;
1905
  cJSON*  json = cJSON_CreateObject();
wmmhello's avatar
wmmhello 已提交
1906 1907 1908 1909 1910
  if (json == NULL) {
    return string;
  }
  cJSON* type = cJSON_CreateString("create");
  cJSON_AddItemToObject(json, "type", type);
1911 1912 1913 1914
  //  char cid[32] = {0};
  //  sprintf(cid, "%"PRIi64, id);
  //  cJSON* cid_ = cJSON_CreateString(cid);
  //  cJSON_AddItemToObject(json, "id", cid_);
wmmhello's avatar
wmmhello 已提交
1915

wmmhello's avatar
wmmhello 已提交
1916 1917 1918 1919
  cJSON* tableName = cJSON_CreateString(name);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString("child");
  cJSON_AddItemToObject(json, "tableType", tableType);
wmmhello's avatar
wmmhello 已提交
1920
  cJSON* using = cJSON_CreateString(sname);
wmmhello's avatar
wmmhello 已提交
1921
  cJSON_AddItemToObject(json, "using", using);
1922 1923
  cJSON* tagNumJson = cJSON_CreateNumber(tagNum);
  cJSON_AddItemToObject(json, "tagNum", tagNumJson);
1924 1925
  //  cJSON* version = cJSON_CreateNumber(1);
  //  cJSON_AddItemToObject(json, "version", version);
wmmhello's avatar
wmmhello 已提交
1926

1927
  cJSON*  tags = cJSON_CreateArray();
wmmhello's avatar
wmmhello 已提交
1928 1929 1930 1931
  int32_t code = tTagToValArray(pTag, &pTagVals);
  if (code) {
    goto end;
  }
wmmhello's avatar
wmmhello 已提交
1932

wmmhello's avatar
wmmhello 已提交
1933 1934
  if (tTagIsJson(pTag)) {
    STag* p = (STag*)pTag;
1935
    if (p->nTag == 0) {
wmmhello's avatar
wmmhello 已提交
1936 1937
      goto end;
    }
1938 1939
    char*    pJson = parseTagDatatoJson(pTag);
    cJSON*   tag = cJSON_CreateObject();
wmmhello's avatar
wmmhello 已提交
1940 1941
    STagVal* pTagVal = taosArrayGet(pTagVals, 0);

wmmhello's avatar
wmmhello 已提交
1942 1943
    char*  ptname = taosArrayGet(tagName, 0);
    cJSON* tname = cJSON_CreateString(ptname);
wmmhello's avatar
wmmhello 已提交
1944
    cJSON_AddItemToObject(tag, "name", tname);
1945 1946
    //    cJSON* cid_ = cJSON_CreateString("");
    //    cJSON_AddItemToObject(tag, "cid", cid_);
wmmhello's avatar
wmmhello 已提交
1947 1948
    cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON);
    cJSON_AddItemToObject(tag, "type", ttype);
wmmhello's avatar
wmmhello 已提交
1949
    cJSON* tvalue = cJSON_CreateString(pJson);
wmmhello's avatar
wmmhello 已提交
1950 1951
    cJSON_AddItemToObject(tag, "value", tvalue);
    cJSON_AddItemToArray(tags, tag);
wmmhello's avatar
wmmhello 已提交
1952
    taosMemoryFree(pJson);
wmmhello's avatar
wmmhello 已提交
1953 1954 1955
    goto end;
  }

1956
  for (int i = 0; i < taosArrayGetSize(pTagVals); i++) {
wmmhello's avatar
wmmhello 已提交
1957 1958 1959
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);

    cJSON* tag = cJSON_CreateObject();
wmmhello's avatar
wmmhello 已提交
1960

wmmhello's avatar
wmmhello 已提交
1961 1962
    char*  ptname = taosArrayGet(tagName, i);
    cJSON* tname = cJSON_CreateString(ptname);
wmmhello's avatar
wmmhello 已提交
1963
    cJSON_AddItemToObject(tag, "name", tname);
1964 1965
    //    cJSON* cid = cJSON_CreateNumber(pTagVal->cid);
    //    cJSON_AddItemToObject(tag, "cid", cid);
wmmhello's avatar
wmmhello 已提交
1966 1967
    cJSON* ttype = cJSON_CreateNumber(pTagVal->type);
    cJSON_AddItemToObject(tag, "type", ttype);
wmmhello's avatar
wmmhello 已提交
1968

wmmhello's avatar
wmmhello 已提交
1969
    cJSON* tvalue = NULL;
wmmhello's avatar
wmmhello 已提交
1970
    if (IS_VAR_DATA_TYPE(pTagVal->type)) {
wmmhello's avatar
wmmhello 已提交
1971
      char* buf = taosMemoryCalloc(pTagVal->nData + 3, 1);
L
Liu Jicong 已提交
1972
      if (!buf) goto end;
wmmhello's avatar
wmmhello 已提交
1973
      dataConverToStr(buf, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL);
wmmhello's avatar
wmmhello 已提交
1974 1975
      tvalue = cJSON_CreateString(buf);
      taosMemoryFree(buf);
wmmhello's avatar
wmmhello 已提交
1976
    } else {
wmmhello's avatar
wmmhello 已提交
1977 1978 1979
      double val = 0;
      GET_TYPED_DATA(val, double, pTagVal->type, &pTagVal->i64);
      tvalue = cJSON_CreateNumber(val);
wmmhello's avatar
wmmhello 已提交
1980 1981
    }

wmmhello's avatar
wmmhello 已提交
1982 1983 1984
    cJSON_AddItemToObject(tag, "value", tvalue);
    cJSON_AddItemToArray(tags, tag);
  }
wmmhello's avatar
wmmhello 已提交
1985

1986
end:
wmmhello's avatar
wmmhello 已提交
1987 1988 1989
  cJSON_AddItemToObject(json, "tags", tags);
  string = cJSON_PrintUnformatted(json);
  cJSON_Delete(json);
wmmhello's avatar
wmmhello 已提交
1990
  taosArrayDestroy(pTagVals);
wmmhello's avatar
wmmhello 已提交
1991 1992 1993
  return string;
}

1994
static char* processCreateTable(SMqMetaRsp* metaRsp) {
wmmhello's avatar
wmmhello 已提交
1995 1996
  SDecoder           decoder = {0};
  SVCreateTbBatchReq req = {0};
1997 1998
  SVCreateTbReq*     pCreateReq;
  char*              string = NULL;
wmmhello's avatar
wmmhello 已提交
1999
  // decode
2000
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2001 2002 2003 2004 2005 2006 2007 2008 2009
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
    goto _exit;
  }

  // loop to create table
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;
2010 2011
    if (pCreateReq->type == TSDB_CHILD_TABLE) {
      string = buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.name, pCreateReq->name,
2012
                                     pCreateReq->ctb.tagName, pCreateReq->uid, pCreateReq->ctb.tagNum);
2013 2014 2015
    } else if (pCreateReq->type == TSDB_NORMAL_TABLE) {
      string =
          buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE);
wmmhello's avatar
wmmhello 已提交
2016 2017 2018 2019 2020
    }
  }

  tDecoderClear(&decoder);

2021
_exit:
wmmhello's avatar
wmmhello 已提交
2022 2023 2024 2025
  tDecoderClear(&decoder);
  return string;
}

2026 2027 2028 2029
static char* processAlterTable(SMqMetaRsp* metaRsp) {
  SDecoder     decoder = {0};
  SVAlterTbReq vAlterTbReq = {0};
  char*        string = NULL;
wmmhello's avatar
wmmhello 已提交
2030 2031

  // decode
2032
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVAlterTbReq(&decoder, &vAlterTbReq) < 0) {
    goto _exit;
  }

  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    goto _exit;
  }
  cJSON* type = cJSON_CreateString("alter");
  cJSON_AddItemToObject(json, "type", type);
2045 2046
  //  cJSON* uid = cJSON_CreateNumber(id);
  //  cJSON_AddItemToObject(json, "uid", uid);
wmmhello's avatar
wmmhello 已提交
2047 2048
  cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName);
  cJSON_AddItemToObject(json, "tableName", tableName);
wmmhello's avatar
wmmhello 已提交
2049
  cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ? "child" : "normal");
wmmhello's avatar
wmmhello 已提交
2050
  cJSON_AddItemToObject(json, "tableType", tableType);
wmmhello's avatar
wmmhello 已提交
2051 2052
  cJSON* alterType = cJSON_CreateNumber(vAlterTbReq.action);
  cJSON_AddItemToObject(json, "alterType", alterType);
wmmhello's avatar
wmmhello 已提交
2053 2054 2055 2056 2057 2058 2059

  switch (vAlterTbReq.action) {
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
      cJSON_AddItemToObject(json, "colType", colType);
wmmhello's avatar
wmmhello 已提交
2060

2061
      if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
2062
        int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
2063
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2064
        cJSON_AddItemToObject(json, "colLength", cbytes);
2065 2066 2067
      } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
        int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2068 2069
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
wmmhello's avatar
wmmhello 已提交
2070 2071
      break;
    }
2072
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
wmmhello's avatar
wmmhello 已提交
2073 2074 2075 2076
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
      break;
    }
2077
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
wmmhello's avatar
wmmhello 已提交
2078 2079
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
wmmhello's avatar
wmmhello 已提交
2080
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.colModType);
wmmhello's avatar
wmmhello 已提交
2081
      cJSON_AddItemToObject(json, "colType", colType);
2082
      if (vAlterTbReq.colModType == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
2083
        int32_t length = vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE;
2084
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2085
        cJSON_AddItemToObject(json, "colLength", cbytes);
2086 2087 2088
      } else if (vAlterTbReq.colModType == TSDB_DATA_TYPE_NCHAR) {
        int32_t length = (vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2089 2090
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
wmmhello's avatar
wmmhello 已提交
2091 2092
      break;
    }
2093
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
wmmhello's avatar
wmmhello 已提交
2094 2095 2096 2097 2098 2099
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colNewName = cJSON_CreateString(vAlterTbReq.colNewName);
      cJSON_AddItemToObject(json, "colNewName", colNewName);
      break;
    }
2100
    case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: {
wmmhello's avatar
wmmhello 已提交
2101 2102
      cJSON* tagName = cJSON_CreateString(vAlterTbReq.tagName);
      cJSON_AddItemToObject(json, "colName", tagName);
wmmhello's avatar
wmmhello 已提交
2103

wmmhello's avatar
wmmhello 已提交
2104
      bool isNull = vAlterTbReq.isNull;
2105 2106 2107
      if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
        STag* jsonTag = (STag*)vAlterTbReq.pTagVal;
        if (jsonTag->nTag == 0) isNull = true;
wmmhello's avatar
wmmhello 已提交
2108
      }
2109
      if (!isNull) {
wmmhello's avatar
wmmhello 已提交
2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124
        char* buf = NULL;

        if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
          ASSERT(tTagIsJson(vAlterTbReq.pTagVal) == true);
          buf = parseTagDatatoJson(vAlterTbReq.pTagVal);
        } else {
          buf = taosMemoryCalloc(vAlterTbReq.nTagVal + 1, 1);
          dataConverToStr(buf, vAlterTbReq.tagType, vAlterTbReq.pTagVal, vAlterTbReq.nTagVal, NULL);
        }

        cJSON* colValue = cJSON_CreateString(buf);
        cJSON_AddItemToObject(json, "colValue", colValue);
        taosMemoryFree(buf);
      }

wmmhello's avatar
wmmhello 已提交
2125 2126
      cJSON* isNullCJson = cJSON_CreateBool(isNull);
      cJSON_AddItemToObject(json, "colValueNull", isNullCJson);
wmmhello's avatar
wmmhello 已提交
2127 2128
      break;
    }
wmmhello's avatar
wmmhello 已提交
2129 2130 2131 2132 2133
    default:
      break;
  }
  string = cJSON_PrintUnformatted(json);

2134
_exit:
wmmhello's avatar
wmmhello 已提交
2135 2136 2137 2138
  tDecoderClear(&decoder);
  return string;
}

2139 2140 2141 2142
static char* processDropSTable(SMqMetaRsp* metaRsp) {
  SDecoder     decoder = {0};
  SVDropStbReq req = {0};
  char*        string = NULL;
wmmhello's avatar
wmmhello 已提交
2143 2144

  // decode
2145
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
    goto _exit;
  }

  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    goto _exit;
  }
  cJSON* type = cJSON_CreateString("drop");
  cJSON_AddItemToObject(json, "type", type);
  cJSON* tableName = cJSON_CreateString(req.name);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString("super");
  cJSON_AddItemToObject(json, "tableType", tableType);

  string = cJSON_PrintUnformatted(json);

2165
_exit:
wmmhello's avatar
wmmhello 已提交
2166 2167 2168 2169
  tDecoderClear(&decoder);
  return string;
}

2170 2171 2172 2173
static char* processDropTable(SMqMetaRsp* metaRsp) {
  SDecoder         decoder = {0};
  SVDropTbBatchReq req = {0};
  char*            string = NULL;
wmmhello's avatar
wmmhello 已提交
2174 2175

  // decode
2176
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVDropTbBatchReq(&decoder, &req) < 0) {
    goto _exit;
  }

  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    goto _exit;
  }
  cJSON* type = cJSON_CreateString("drop");
  cJSON_AddItemToObject(json, "type", type);
2189 2190 2191 2192
  //  cJSON* uid = cJSON_CreateNumber(id);
  //  cJSON_AddItemToObject(json, "uid", uid);
  //  cJSON* tableType = cJSON_CreateString("normal");
  //  cJSON_AddItemToObject(json, "tableType", tableType);
wmmhello's avatar
wmmhello 已提交
2193 2194 2195 2196 2197

  cJSON* tableNameList = cJSON_CreateArray();
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;

wmmhello's avatar
wmmhello 已提交
2198
    cJSON* tableName = cJSON_CreateString(pDropTbReq->name);
wmmhello's avatar
wmmhello 已提交
2199 2200 2201 2202 2203 2204
    cJSON_AddItemToArray(tableNameList, tableName);
  }
  cJSON_AddItemToObject(json, "tableNameList", tableNameList);

  string = cJSON_PrintUnformatted(json);

2205
_exit:
wmmhello's avatar
wmmhello 已提交
2206 2207 2208 2209
  tDecoderClear(&decoder);
  return string;
}

2210
static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
wmmhello's avatar
wmmhello 已提交
2211 2212 2213
  SVCreateStbReq req = {0};
  SDecoder       coder;
  SMCreateStbReq pReq = {0};
2214 2215
  int32_t        code = TSDB_CODE_SUCCESS;
  SRequestObj*   pRequest = NULL;
wmmhello's avatar
wmmhello 已提交
2216

2217
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2218 2219 2220 2221
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2222
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2223 2224 2225 2226
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2227
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2228 2229 2230 2231 2232 2233 2234 2235
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }
  // build create stable
  pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SField));
2236
  for (int32_t i = 0; i < req.schemaRow.nCols; i++) {
wmmhello's avatar
wmmhello 已提交
2237
    SSchema* pSchema = req.schemaRow.pSchema + i;
2238
    SField   field = {.type = pSchema->type, .bytes = pSchema->bytes};
wmmhello's avatar
wmmhello 已提交
2239 2240 2241 2242
    strcpy(field.name, pSchema->name);
    taosArrayPush(pReq.pColumns, &field);
  }
  pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField));
2243
  for (int32_t i = 0; i < req.schemaTag.nCols; i++) {
wmmhello's avatar
wmmhello 已提交
2244
    SSchema* pSchema = req.schemaTag.pSchema + i;
2245
    SField   field = {.type = pSchema->type, .bytes = pSchema->bytes};
wmmhello's avatar
wmmhello 已提交
2246 2247 2248
    strcpy(field.name, pSchema->name);
    taosArrayPush(pReq.pTags, &field);
  }
2249

2250 2251
  pReq.colVer = req.schemaRow.version;
  pReq.tagVer = req.schemaTag.version;
wmmhello's avatar
wmmhello 已提交
2252 2253 2254 2255
  pReq.numOfColumns = req.schemaRow.nCols;
  pReq.numOfTags = req.schemaTag.nCols;
  pReq.commentLen = -1;
  pReq.suid = req.suid;
wmmhello's avatar
wmmhello 已提交
2256
  pReq.source = TD_REQ_FROM_TAOX;
wmmhello's avatar
wmmhello 已提交
2257
  pReq.igExists = true;
wmmhello's avatar
wmmhello 已提交
2258

2259
  STscObj* pTscObj = pRequest->pTscObj;
2260
  SName    tableName;
wmmhello's avatar
wmmhello 已提交
2261
  tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
wmmhello's avatar
wmmhello 已提交
2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273

  SCmdMsgInfo pCmdMsg = {0};
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
  if (NULL == pCmdMsg.pMsg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);

2274
  SQuery pQuery = {0};
wmmhello's avatar
wmmhello 已提交
2275 2276 2277 2278 2279 2280
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
  pQuery.pCmdMsg = &pCmdMsg;
  pQuery.msgType = pQuery.pCmdMsg->msgType;
  pQuery.stableQuery = true;

  launchQueryImpl(pRequest, &pQuery, true, NULL);
wmmhello's avatar
wmmhello 已提交
2281

L
Liu Jicong 已提交
2282 2283
  if (pRequest->code == TSDB_CODE_SUCCESS) {
    SCatalog* pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2284 2285 2286 2287
    catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
    catalogRemoveTableMeta(pCatalog, &tableName);
  }

wmmhello's avatar
wmmhello 已提交
2288 2289 2290
  code = pRequest->code;
  taosMemoryFree(pCmdMsg.pMsg);

2291
end:
wmmhello's avatar
wmmhello 已提交
2292 2293 2294 2295 2296 2297
  destroyRequest(pRequest);
  tFreeSMCreateStbReq(&pReq);
  tDecoderClear(&coder);
  return code;
}

2298
static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
wmmhello's avatar
wmmhello 已提交
2299 2300 2301
  SVDropStbReq req = {0};
  SDecoder     coder;
  SMDropStbReq pReq = {0};
2302
  int32_t      code = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2303 2304
  SRequestObj* pRequest = NULL;

2305
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2306 2307 2308 2309
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2310
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2311 2312 2313 2314
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2315
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2316 2317 2318 2319 2320 2321 2322 2323 2324
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVDropStbReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

  // build drop stable
  pReq.igNotExists = true;
wmmhello's avatar
wmmhello 已提交
2325
  pReq.source = TD_REQ_FROM_TAOX;
wmmhello's avatar
wmmhello 已提交
2326
  pReq.suid = req.suid;
2327 2328

  STscObj* pTscObj = pRequest->pTscObj;
wmmhello's avatar
wmmhello 已提交
2329
  SName    tableName = {0};
wmmhello's avatar
wmmhello 已提交
2330
  tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
wmmhello's avatar
wmmhello 已提交
2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342

  SCmdMsgInfo pCmdMsg = {0};
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  pCmdMsg.msgType = TDMT_MND_DROP_STB;
  pCmdMsg.msgLen = tSerializeSMDropStbReq(NULL, 0, &pReq);
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
  if (NULL == pCmdMsg.pMsg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  tSerializeSMDropStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);

2343
  SQuery pQuery = {0};
wmmhello's avatar
wmmhello 已提交
2344 2345 2346 2347 2348 2349
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
  pQuery.pCmdMsg = &pCmdMsg;
  pQuery.msgType = pQuery.pCmdMsg->msgType;
  pQuery.stableQuery = true;

  launchQueryImpl(pRequest, &pQuery, true, NULL);
wmmhello's avatar
wmmhello 已提交
2350

L
Liu Jicong 已提交
2351 2352
  if (pRequest->code == TSDB_CODE_SUCCESS) {
    SCatalog* pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2353 2354 2355 2356
    catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
    catalogRemoveTableMeta(pCatalog, &tableName);
  }

wmmhello's avatar
wmmhello 已提交
2357 2358 2359
  code = pRequest->code;
  taosMemoryFree(pCmdMsg.pMsg);

2360
end:
wmmhello's avatar
wmmhello 已提交
2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  return code;
}

typedef struct SVgroupCreateTableBatch {
  SVCreateTbBatchReq req;
  SVgroupInfo        info;
  char               dbName[TSDB_DB_NAME_LEN];
} SVgroupCreateTableBatch;

static void destroyCreateTbReqBatch(void* data) {
2373
  SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*)data;
wmmhello's avatar
wmmhello 已提交
2374 2375 2376
  taosArrayDestroy(pTbBatch->req.pArray);
}

L
Liu Jicong 已提交
2377 2378 2379 2380 2381 2382 2383
static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
  SVCreateTbBatchReq req = {0};
  SDecoder           coder = {0};
  int32_t            code = TSDB_CODE_SUCCESS;
  SRequestObj*       pRequest = NULL;
  SQuery*            pQuery = NULL;
  SHashObj*          pVgroupHashmap = NULL;
wmmhello's avatar
wmmhello 已提交
2384

L
Liu Jicong 已提交
2385
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2386 2387 2388 2389
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2390
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2391 2392 2393 2394
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2395
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2396 2397 2398 2399 2400 2401 2402
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

2403 2404
  STscObj* pTscObj = pRequest->pTscObj;

L
Liu Jicong 已提交
2405 2406
  SVCreateTbReq* pCreateReq = NULL;
  SCatalog*      pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
  if (NULL == pVgroupHashmap) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);

  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
2420 2421 2422
                           .requestId = pRequest->requestId,
                           .requestObjRefId = pRequest->self,
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
wmmhello's avatar
wmmhello 已提交
2423 2424

  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
wmmhello's avatar
wmmhello 已提交
2425 2426 2427 2428 2429
  // loop to create table
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;

    SVgroupInfo pInfo = {0};
wmmhello's avatar
wmmhello 已提交
2430
    SName       pName = {0};
wmmhello's avatar
wmmhello 已提交
2431 2432 2433 2434 2435
    toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName);
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
    if (code != TSDB_CODE_SUCCESS) {
      goto end;
    }
wmmhello's avatar
wmmhello 已提交
2436
    taosArrayPush(pRequest->tableList, &pName);
wmmhello's avatar
wmmhello 已提交
2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462

    SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
    if (pTableBatch == NULL) {
      SVgroupCreateTableBatch tBatch = {0};
      tBatch.info = pInfo;
      strcpy(tBatch.dbName, pRequest->pDb);

      tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
      taosArrayPush(tBatch.req.pArray, pCreateReq);

      taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch));
    } else {  // add to the correct vgroup
      taosArrayPush(pTableBatch->req.pArray, pCreateReq);
    }
  }

  SArray* pBufArray = serializeVgroupsCreateTableBatch(pVgroupHashmap);
  if (NULL == pBufArray) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->msgType = TDMT_VND_CREATE_TABLE;
  pQuery->stableQuery = false;
2463
  pQuery->pRoot = nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT);
wmmhello's avatar
wmmhello 已提交
2464 2465 2466 2467 2468 2469

  code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
2470
  launchQueryImpl(pRequest, pQuery, true, NULL);
L
Liu Jicong 已提交
2471
  if (pRequest->code == TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2472 2473 2474
    removeMeta(pTscObj, pRequest->tableList);
  }

2475
  code = pRequest->code;
wmmhello's avatar
wmmhello 已提交
2476

2477
end:
wmmhello's avatar
wmmhello 已提交
2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495
  taosHashCleanup(pVgroupHashmap);
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  qDestroyQuery(pQuery);
  return code;
}

typedef struct SVgroupDropTableBatch {
  SVDropTbBatchReq req;
  SVgroupInfo      info;
  char             dbName[TSDB_DB_NAME_LEN];
} SVgroupDropTableBatch;

static void destroyDropTbReqBatch(void* data) {
  SVgroupDropTableBatch* pTbBatch = (SVgroupDropTableBatch*)data;
  taosArrayDestroy(pTbBatch->req.pArray);
}

L
Liu Jicong 已提交
2496 2497 2498 2499 2500 2501 2502
static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
  SVDropTbBatchReq req = {0};
  SDecoder         coder = {0};
  int32_t          code = TSDB_CODE_SUCCESS;
  SRequestObj*     pRequest = NULL;
  SQuery*          pQuery = NULL;
  SHashObj*        pVgroupHashmap = NULL;
wmmhello's avatar
wmmhello 已提交
2503

2504
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2505 2506 2507 2508
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2509
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2510 2511 2512 2513
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2514
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2515 2516 2517 2518 2519 2520 2521
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVDropTbBatchReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

2522 2523
  STscObj* pTscObj = pRequest->pTscObj;

L
Liu Jicong 已提交
2524 2525
  SVDropTbReq* pDropReq = NULL;
  SCatalog*    pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
  if (NULL == pVgroupHashmap) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);

  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
2539 2540 2541
                           .requestId = pRequest->requestId,
                           .requestObjRefId = pRequest->self,
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
wmmhello's avatar
wmmhello 已提交
2542
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
wmmhello's avatar
wmmhello 已提交
2543 2544 2545
  // loop to create table
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pDropReq = req.pReqs + iReq;
wmmhello's avatar
wmmhello 已提交
2546
    pDropReq->igNotExists = true;
wmmhello's avatar
wmmhello 已提交
2547 2548

    SVgroupInfo pInfo = {0};
wmmhello's avatar
wmmhello 已提交
2549
    SName       pName = {0};
wmmhello's avatar
wmmhello 已提交
2550 2551 2552 2553 2554 2555
    toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName);
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
    if (code != TSDB_CODE_SUCCESS) {
      goto end;
    }

wmmhello's avatar
wmmhello 已提交
2556
    taosArrayPush(pRequest->tableList, &pName);
wmmhello's avatar
wmmhello 已提交
2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579
    SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
    if (pTableBatch == NULL) {
      SVgroupDropTableBatch tBatch = {0};
      tBatch.info = pInfo;
      tBatch.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
      taosArrayPush(tBatch.req.pArray, pDropReq);

      taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch));
    } else {  // add to the correct vgroup
      taosArrayPush(pTableBatch->req.pArray, pDropReq);
    }
  }

  SArray* pBufArray = serializeVgroupsDropTableBatch(pVgroupHashmap);
  if (NULL == pBufArray) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->msgType = TDMT_VND_DROP_TABLE;
  pQuery->stableQuery = false;
2580
  pQuery->pRoot = nodesMakeNode(QUERY_NODE_DROP_TABLE_STMT);
wmmhello's avatar
wmmhello 已提交
2581 2582 2583 2584 2585 2586

  code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
2587
  launchQueryImpl(pRequest, pQuery, true, NULL);
L
Liu Jicong 已提交
2588
  if (pRequest->code == TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2589 2590
    removeMeta(pTscObj, pRequest->tableList);
  }
2591
  code = pRequest->code;
wmmhello's avatar
wmmhello 已提交
2592

2593
end:
wmmhello's avatar
wmmhello 已提交
2594 2595 2596 2597 2598 2599 2600
  taosHashCleanup(pVgroupHashmap);
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  qDestroyQuery(pQuery);
  return code;
}

wmmhello's avatar
wmmhello 已提交
2601 2602
// delete from db.tabl where ..       -> delete from tabl where ..
// delete from db    .tabl where ..   -> delete from tabl where ..
L
Liu Jicong 已提交
2603
// static void getTbName(char *sql){
2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631
//  char *ch = sql;
//
//  bool inBackQuote = false;
//  int8_t dotIndex = 0;
//  while(*ch != '\0'){
//    if(!inBackQuote && *ch == '`'){
//      inBackQuote = true;
//      ch++;
//      continue;
//    }
//
//    if(inBackQuote && *ch == '`'){
//      inBackQuote = false;
//      ch++;
//
//      continue;
//    }
//
//    if(!inBackQuote && *ch == '.'){
//      dotIndex ++;
//      if(dotIndex == 2){
//        memmove(sql, ch + 1, strlen(ch + 1) + 1);
//        break;
//      }
//    }
//    ch++;
//  }
//}
wmmhello's avatar
wmmhello 已提交
2632 2633

static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
L
Liu Jicong 已提交
2634 2635 2636
  SDeleteRes req = {0};
  SDecoder   coder = {0};
  int32_t    code = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2637 2638 2639 2640 2641 2642 2643 2644 2645 2646

  // decode and process req
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeDeleteRes(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

L
Liu Jicong 已提交
2647
  //  getTbName(req.tableFName);
wmmhello's avatar
wmmhello 已提交
2648
  char sql[256] = {0};
L
Liu Jicong 已提交
2649 2650
  sprintf(sql, "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName, req.tsColName,
          req.skey, req.tsColName, req.ekey);
wmmhello's avatar
wmmhello 已提交
2651 2652
  printf("delete sql:%s\n", sql);

L
Liu Jicong 已提交
2653 2654
  TAOS_RES*    res = taos_query(taos, sql);
  SRequestObj* pRequest = (SRequestObj*)res;
wmmhello's avatar
wmmhello 已提交
2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665
  code = pRequest->code;
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
    code = TSDB_CODE_SUCCESS;
  }
  taos_free_result(res);

end:
  tDecoderClear(&coder);
  return code;
}

2666 2667 2668 2669 2670 2671 2672 2673
static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
  SVAlterTbReq   req = {0};
  SDecoder       coder = {0};
  int32_t        code = TSDB_CODE_SUCCESS;
  SRequestObj*   pRequest = NULL;
  SQuery*        pQuery = NULL;
  SArray*        pArray = NULL;
  SVgDataBlocks* pVgData = NULL;
wmmhello's avatar
wmmhello 已提交
2674

2675
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2676 2677 2678 2679 2680

  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2681
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2682 2683 2684 2685
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2686
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2687 2688 2689 2690 2691 2692 2693 2694
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVAlterTbReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

  // do not deal TSDB_ALTER_TABLE_UPDATE_OPTIONS
2695
  if (req.action == TSDB_ALTER_TABLE_UPDATE_OPTIONS) {
wmmhello's avatar
wmmhello 已提交
2696 2697 2698
    goto end;
  }

2699
  STscObj*  pTscObj = pRequest->pTscObj;
2700
  SCatalog* pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2701 2702 2703 2704 2705 2706
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
2707 2708 2709
                           .requestId = pRequest->requestId,
                           .requestObjRefId = pRequest->self,
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
wmmhello's avatar
wmmhello 已提交
2710 2711

  SVgroupInfo pInfo = {0};
2712
  SName       pName = {0};
wmmhello's avatar
wmmhello 已提交
2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745
  toName(pTscObj->acctId, pRequest->pDb, req.tbName, &pName);
  code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  pArray = taosArrayInit(1, sizeof(void*));
  if (NULL == pArray) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }

  pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
  if (NULL == pVgData) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  pVgData->vg = pInfo;
  pVgData->pData = taosMemoryMalloc(metaLen);
  if (NULL == pVgData->pData) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  memcpy(pVgData->pData, meta, metaLen);
  ((SMsgHead*)pVgData->pData)->vgId = htonl(pInfo.vgId);
  pVgData->size = metaLen;
  pVgData->numOfTables = 1;
  taosArrayPush(pArray, &pVgData);

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->msgType = TDMT_VND_ALTER_TABLE;
  pQuery->stableQuery = false;
2746
  pQuery->pRoot = nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT);
wmmhello's avatar
wmmhello 已提交
2747 2748 2749 2750 2751 2752

  code = rewriteToVnodeModifyOpStmt(pQuery, pArray);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
2753 2754
  launchQueryImpl(pRequest, pQuery, true, NULL);

wmmhello's avatar
wmmhello 已提交
2755
  pVgData = NULL;
2756 2757 2758
  pArray = NULL;
  code = pRequest->code;
  if (code == TSDB_CODE_VND_TABLE_NOT_EXIST) {
wmmhello's avatar
wmmhello 已提交
2759
    code = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2760
  }
wmmhello's avatar
wmmhello 已提交
2761

L
Liu Jicong 已提交
2762
  if (pRequest->code == TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2763
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
L
Liu Jicong 已提交
2764
    if (pRes->res != NULL) {
wmmhello's avatar
wmmhello 已提交
2765 2766 2767
      code = handleAlterTbExecRes(pRes->res, pCatalog);
    }
  }
2768
end:
wmmhello's avatar
wmmhello 已提交
2769
  taosArrayDestroy(pArray);
2770
  if (pVgData) taosMemoryFreeClear(pVgData->pData);
wmmhello's avatar
wmmhello 已提交
2771 2772 2773 2774 2775 2776 2777
  taosMemoryFreeClear(pVgData);
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  qDestroyQuery(pQuery);
  return code;
}

L
Liu Jicong 已提交
2778
typedef struct {
2779
  SVgroupInfo vg;
L
Liu Jicong 已提交
2780 2781
  void*       data;
} VgData;
2782 2783 2784 2785 2786 2787

static void destroyVgHash(void* data) {
  VgData* vgData = (VgData*)data;
  taosMemoryFreeClear(vgData->data);
}

L
Liu Jicong 已提交
2788 2789
int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) {
  int32_t     code = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2790
  STableMeta* pTableMeta = NULL;
L
Liu Jicong 已提交
2791
  SQuery*     pQuery = NULL;
wmmhello's avatar
wmmhello 已提交
2792 2793

  SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT);
L
Liu Jicong 已提交
2794
  if (!pRequest) {
wmmhello's avatar
wmmhello 已提交
2795 2796 2797
    uError("WriteRaw:createRequest error request is null");
    code = terrno;
    goto end;
2798 2799
  }

wmmhello's avatar
wmmhello 已提交
2800 2801 2802 2803 2804 2805 2806 2807 2808 2809
  if (!pRequest->pDb) {
    uError("WriteRaw:not use db");
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }

  SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
  strcpy(pName.dbname, pRequest->pDb);
  strcpy(pName.tname, tbname);

L
Liu Jicong 已提交
2810
  struct SCatalog* pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2811
  code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
L
Liu Jicong 已提交
2812
  if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2813 2814 2815 2816 2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836
    uError("WriteRaw: get gatlog error");
    goto end;
  }

  SRequestConnInfo conn = {0};
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
  conn.requestId = pRequest->requestId;
  conn.requestObjRefId = pRequest->self;
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);

  SVgroupInfo vgData = {0};
  code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData);
  if (code != TSDB_CODE_SUCCESS) {
    uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbname);
    goto end;
  }

  code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
  if (code != TSDB_CODE_SUCCESS) {
    uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbname);
    goto end;
  }
  uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid);
  uint64_t uid = pTableMeta->uid;
L
Liu Jicong 已提交
2837
  int32_t  numOfCols = pTableMeta->tableInfo.numOfColumns;
wmmhello's avatar
wmmhello 已提交
2838 2839

  uint16_t fLen = 0;
L
Liu Jicong 已提交
2840 2841
  int32_t  rowSize = 0;
  int16_t  nVar = 0;
wmmhello's avatar
wmmhello 已提交
2842
  for (int i = 0; i < numOfCols; i++) {
L
Liu Jicong 已提交
2843
    SSchema* schema = pTableMeta->schema + i;
wmmhello's avatar
wmmhello 已提交
2844 2845
    fLen += TYPE_BYTES[schema->type];
    rowSize += schema->bytes;
L
Liu Jicong 已提交
2846 2847
    if (IS_VAR_DATA_TYPE(schema->type)) {
      nVar++;
wmmhello's avatar
wmmhello 已提交
2848 2849 2850 2851 2852 2853 2854 2855
    }
  }

  int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
                            (int32_t)TD_BITMAP_BYTES(numOfCols - 1);
  int32_t schemaLen = 0;
  int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;

L
Liu Jicong 已提交
2856
  int32_t     totalLen = sizeof(SSubmitReq) + submitLen;
wmmhello's avatar
wmmhello 已提交
2857 2858
  SSubmitReq* subReq = taosMemoryCalloc(1, totalLen);
  SSubmitBlk* blk = POINTER_SHIFT(subReq, sizeof(SSubmitReq));
L
Liu Jicong 已提交
2859 2860
  void*       blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk));
  STSRow*     rowData = POINTER_SHIFT(blkSchema, schemaLen);
wmmhello's avatar
wmmhello 已提交
2861 2862 2863 2864 2865 2866

  SRowBuilder rb = {0};
  tdSRowInit(&rb, pTableMeta->sversion);
  tdSRowSetTpInfo(&rb, numOfCols, fLen);
  int32_t dataLen = 0;

L
Liu Jicong 已提交
2867
  char*    pStart = pData + sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t));
wmmhello's avatar
wmmhello 已提交
2868 2869 2870
  int32_t* colLength = (int32_t*)pStart;
  pStart += sizeof(int32_t) * numOfCols;

L
Liu Jicong 已提交
2871
  SResultColumn* pCol = taosMemoryCalloc(numOfCols, sizeof(SResultColumn));
wmmhello's avatar
wmmhello 已提交
2872 2873 2874 2875 2876 2877 2878 2879 2880 2881 2882 2883 2884 2885 2886 2887 2888 2889

  for (int32_t i = 0; i < numOfCols; ++i) {
    if (IS_VAR_DATA_TYPE(pTableMeta->schema[i].type)) {
      pCol[i].offset = (int32_t*)pStart;
      pStart += rows * sizeof(int32_t);
    } else {
      pCol[i].nullbitmap = pStart;
      pStart += BitmapLen(rows);
    }

    pCol[i].pData = pStart;
    pStart += colLength[i];
  }

  for (int32_t j = 0; j < rows; j++) {
    tdSRowResetBuf(&rb, rowData);
    int32_t offset = 0;
    for (int32_t k = 0; k < numOfCols; k++) {
L
Liu Jicong 已提交
2890
      const SSchema* pColumn = &pTableMeta->schema[k];
wmmhello's avatar
wmmhello 已提交
2891 2892 2893 2894 2895 2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914 2915 2916 2917 2918 2919 2920

      if (IS_VAR_DATA_TYPE(pColumn->type)) {
        if (pCol[k].offset[j] != -1) {
          char* data = pCol[k].pData + pCol[k].offset[j];
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k);
        } else {
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
        }
      } else {
        if (!colDataIsNull_f(pCol[k].nullbitmap, j)) {
          char* data = pCol[k].pData + pColumn->bytes * j;
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k);
        } else {
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
        }
      }

      offset += TYPE_BYTES[pColumn->type];
    }
    int32_t rowLen = TD_ROW_LEN(rowData);
    rowData = POINTER_SHIFT(rowData, rowLen);
    dataLen += rowLen;
  }

  taosMemoryFree(pCol);

  blk->uid = htobe64(uid);
  blk->suid = htobe64(suid);
  blk->sversion = htonl(pTableMeta->sversion);
  blk->schemaLen = htonl(schemaLen);
2921
  blk->numOfRows = htonl(rows);
wmmhello's avatar
wmmhello 已提交
2922 2923 2924 2925 2926 2927 2928 2929 2930 2931 2932 2933 2934
  blk->dataLen = htonl(dataLen);
  subReq->length = sizeof(SSubmitReq) + sizeof(SSubmitBlk) + schemaLen + dataLen;
  subReq->numOfBlocks = 1;

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  if (NULL == pQuery) {
    uError("create SQuery error");
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->haveResultSet = false;
  pQuery->msgType = TDMT_VND_SUBMIT;
L
Liu Jicong 已提交
2935
  pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
wmmhello's avatar
wmmhello 已提交
2936 2937 2938 2939 2940
  if (NULL == pQuery->pRoot) {
    uError("create pQuery->pRoot error");
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
L
Liu Jicong 已提交
2941
  SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
wmmhello's avatar
wmmhello 已提交
2942 2943 2944
  nodeStmt->payloadType = PAYLOAD_TYPE_KV;
  nodeStmt->pDataBlocks = taosArrayInit(1, POINTER_BYTES);

L
Liu Jicong 已提交
2945
  SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
wmmhello's avatar
wmmhello 已提交
2946 2947 2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958
  if (NULL == dst) {
    code = TSDB_CODE_TSC_OUT_OF_MEMORY;
    goto end;
  }
  dst->vg = vgData;
  dst->numOfTables = subReq->numOfBlocks;
  dst->size = subReq->length;
  dst->pData = (char*)subReq;
  subReq->header.vgId = htonl(dst->vg.vgId);
  subReq->version = htonl(1);
  subReq->header.contLen = htonl(subReq->length);
  subReq->length = htonl(subReq->length);
  subReq->numOfBlocks = htonl(subReq->numOfBlocks);
L
Liu Jicong 已提交
2959
  subReq = NULL;  // no need free
wmmhello's avatar
wmmhello 已提交
2960 2961 2962 2963 2964 2965 2966 2967 2968 2969 2970
  taosArrayPush(nodeStmt->pDataBlocks, &dst);

  launchQueryImpl(pRequest, pQuery, true, NULL);
  code = pRequest->code;

end:
  taosMemoryFreeClear(pTableMeta);
  qDestroyQuery(pQuery);
  return code;
}

L
Liu Jicong 已提交
2971 2972 2973 2974
static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) {
  int32_t   code = TSDB_CODE_SUCCESS;
  SHashObj* pVgHash = NULL;
  SQuery*   pQuery = NULL;
wmmhello's avatar
wmmhello 已提交
2975
  SMqRspObj rspObj = {0};
L
Liu Jicong 已提交
2976
  SDecoder  decoder = {0};
2977 2978 2979

  terrno = TSDB_CODE_SUCCESS;
  SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT);
L
Liu Jicong 已提交
2980
  if (!pRequest) {
2981 2982 2983 2984
    uError("WriteRaw:createRequest error request is null");
    return terrno;
  }

wmmhello's avatar
wmmhello 已提交
2985 2986 2987 2988 2989
  rspObj.resIter = -1;
  rspObj.resType = RES_TYPE__TMQ;

  tDecoderInit(&decoder, data, dataLen);
  code = tDecodeSMqDataRsp(&decoder, &rspObj.rsp);
L
Liu Jicong 已提交
2990
  if (code != 0) {
wmmhello's avatar
wmmhello 已提交
2991 2992 2993 2994 2995
    uError("WriteRaw:decode smqDataRsp error");
    code = TSDB_CODE_INVALID_MSG;
    goto end;
  }

2996 2997 2998 2999 3000 3001 3002 3003
  if (!pRequest->pDb) {
    uError("WriteRaw:not use db");
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }

  pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
  taosHashSetFreeFp(pVgHash, destroyVgHash);
L
Liu Jicong 已提交
3004
  struct SCatalog* pCatalog = NULL;
3005
  code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
L
Liu Jicong 已提交
3006
  if (code != TSDB_CODE_SUCCESS) {
3007 3008 3009 3010 3011 3012 3013 3014 3015
    uError("WriteRaw: get gatlog error");
    goto end;
  }

  SRequestConnInfo conn = {0};
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
  conn.requestId = pRequest->requestId;
  conn.requestObjRefId = pRequest->self;
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
wmmhello's avatar
wmmhello 已提交
3016 3017 3018 3019 3020 3021

  printf("raw data block num:%d\n", rspObj.rsp.blockNum);
  while (++rspObj.resIter < rspObj.rsp.blockNum) {
    SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter);
    if (!rspObj.rsp.withSchema) {
      uError("WriteRaw:no schema, iter:%d", rspObj.resIter);
3022 3023
      goto end;
    }
wmmhello's avatar
wmmhello 已提交
3024 3025
    SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter);
    setResSchemaInfo(&rspObj.resInfo, pSW->pSchema, pSW->nCols);
3026

wmmhello's avatar
wmmhello 已提交
3027
    code = setQueryResultFromRsp(&rspObj.resInfo, pRetrieve, false, false);
L
Liu Jicong 已提交
3028
    if (code != TSDB_CODE_SUCCESS) {
3029 3030 3031 3032 3033
      uError("WriteRaw: setQueryResultFromRsp error");
      goto end;
    }

    uint16_t fLen = 0;
L
Liu Jicong 已提交
3034 3035
    int32_t  rowSize = 0;
    int16_t  nVar = 0;
3036
    for (int i = 0; i < pSW->nCols; i++) {
L
Liu Jicong 已提交
3037
      SSchema* schema = pSW->pSchema + i;
3038 3039
      fLen += TYPE_BYTES[schema->type];
      rowSize += schema->bytes;
L
Liu Jicong 已提交
3040 3041
      if (IS_VAR_DATA_TYPE(schema->type)) {
        nVar++;
3042 3043 3044
      }
    }

wmmhello's avatar
wmmhello 已提交
3045
    int32_t rows = rspObj.resInfo.numOfRows;
3046 3047 3048 3049 3050
    int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
                              (int32_t)TD_BITMAP_BYTES(pSW->nCols - 1);
    int32_t schemaLen = 0;
    int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;

wmmhello's avatar
wmmhello 已提交
3051
    const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter);
L
Liu Jicong 已提交
3052
    if (!tbName) {
3053 3054 3055 3056 3057
      uError("WriteRaw: tbname is null");
      code = TSDB_CODE_TMQ_INVALID_MSG;
      goto end;
    }

3058
    printf("raw data tbname:%s\n", tbName);
3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071
    SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
    strcpy(pName.dbname, pRequest->pDb);
    strcpy(pName.tname, tbName);

    VgData vgData = {0};
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &(vgData.vg));
    if (code != TSDB_CODE_SUCCESS) {
      uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName);
      goto end;
    }

    SSubmitReq* subReq = NULL;
    SSubmitBlk* blk = NULL;
L
Liu Jicong 已提交
3072 3073
    void*       hData = taosHashGet(pVgHash, &vgData.vg.vgId, sizeof(vgData.vg.vgId));
    if (hData) {
3074 3075 3076
      vgData = *(VgData*)hData;

      int32_t totalLen = ((SSubmitReq*)(vgData.data))->length + submitLen;
L
Liu Jicong 已提交
3077
      void*   tmp = taosMemoryRealloc(vgData.data, totalLen);
3078 3079 3080 3081 3082 3083 3084 3085
      if (tmp == NULL) {
        code = TSDB_CODE_TSC_OUT_OF_MEMORY;
        goto end;
      }
      vgData.data = tmp;
      ((VgData*)hData)->data = tmp;
      subReq = (SSubmitReq*)(vgData.data);
      blk = POINTER_SHIFT(vgData.data, subReq->length);
L
Liu Jicong 已提交
3086
    } else {
3087
      int32_t totalLen = sizeof(SSubmitReq) + submitLen;
L
Liu Jicong 已提交
3088
      void*   tmp = taosMemoryCalloc(1, totalLen);
3089 3090 3091 3092 3093
      if (tmp == NULL) {
        code = TSDB_CODE_TSC_OUT_OF_MEMORY;
        goto end;
      }
      vgData.data = tmp;
L
Liu Jicong 已提交
3094
      taosHashPut(pVgHash, (const char*)&vgData.vg.vgId, sizeof(vgData.vg.vgId), (char*)&vgData, sizeof(vgData));
3095 3096 3097 3098 3099 3100 3101
      subReq = (SSubmitReq*)(vgData.data);
      subReq->length = sizeof(SSubmitReq);
      subReq->numOfBlocks = 0;

      blk = POINTER_SHIFT(vgData.data, sizeof(SSubmitReq));
    }

3102 3103
    STableMeta* pTableMeta = NULL;
    code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
3104 3105 3106 3107
    if (code != TSDB_CODE_SUCCESS) {
      uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbName);
      goto end;
    }
3108 3109 3110
    uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid);
    uint64_t uid = pTableMeta->uid;
    taosMemoryFreeClear(pTableMeta);
3111

L
Liu Jicong 已提交
3112
    void*   blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk));
3113 3114 3115 3116 3117 3118 3119 3120 3121 3122
    STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);

    SRowBuilder rb = {0};
    tdSRowInit(&rb, pSW->version);
    tdSRowSetTpInfo(&rb, pSW->nCols, fLen);
    int32_t dataLen = 0;

    for (int32_t j = 0; j < rows; j++) {
      tdSRowResetBuf(&rb, rowData);

wmmhello's avatar
wmmhello 已提交
3123 3124
      doSetOneRowPtr(&rspObj.resInfo);
      rspObj.resInfo.current += 1;
3125 3126 3127

      int32_t offset = 0;
      for (int32_t k = 0; k < pSW->nCols; k++) {
L
Liu Jicong 已提交
3128 3129
        const SSchema* pColumn = &pSW->pSchema[k];
        char*          data = rspObj.resInfo.row[k];
3130 3131 3132
        if (!data) {
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
        } else {
L
Liu Jicong 已提交
3133
          if (IS_VAR_DATA_TYPE(pColumn->type)) {
3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148
            data -= VARSTR_HEADER_SIZE;
          }
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k);
        }
        offset += TYPE_BYTES[pColumn->type];
      }
      int32_t rowLen = TD_ROW_LEN(rowData);
      rowData = POINTER_SHIFT(rowData, rowLen);
      dataLen += rowLen;
    }

    blk->uid = htobe64(uid);
    blk->suid = htobe64(suid);
    blk->sversion = htonl(pSW->version);
    blk->schemaLen = htonl(schemaLen);
3149
    blk->numOfRows = htonl(rows);
3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161 3162 3163
    blk->dataLen = htonl(dataLen);
    subReq->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
    subReq->numOfBlocks++;
  }

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  if (NULL == pQuery) {
    uError("create SQuery error");
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->haveResultSet = false;
  pQuery->msgType = TDMT_VND_SUBMIT;
L
Liu Jicong 已提交
3164
  pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
3165 3166 3167 3168 3169
  if (NULL == pQuery->pRoot) {
    uError("create pQuery->pRoot error");
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
L
Liu Jicong 已提交
3170
  SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
3171 3172 3173 3174 3175
  nodeStmt->payloadType = PAYLOAD_TYPE_KV;

  int32_t numOfVg = taosHashGetSize(pVgHash);
  nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);

L
Liu Jicong 已提交
3176
  VgData* vData = (VgData*)taosHashIterate(pVgHash, NULL);
3177
  while (vData) {
L
Liu Jicong 已提交
3178
    SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
3179 3180 3181 3182
    if (NULL == dst) {
      code = TSDB_CODE_TSC_OUT_OF_MEMORY;
      goto end;
    }
3183 3184
    dst->vg = vData->vg;
    SSubmitReq* subReq = (SSubmitReq*)(vData->data);
3185 3186 3187
    dst->numOfTables = subReq->numOfBlocks;
    dst->size = subReq->length;
    dst->pData = (char*)subReq;
L
Liu Jicong 已提交
3188
    vData->data = NULL;  // no need free
3189 3190 3191 3192 3193 3194
    subReq->header.vgId = htonl(dst->vg.vgId);
    subReq->version = htonl(1);
    subReq->header.contLen = htonl(subReq->length);
    subReq->length = htonl(subReq->length);
    subReq->numOfBlocks = htonl(subReq->numOfBlocks);
    taosArrayPush(nodeStmt->pDataBlocks, &dst);
L
Liu Jicong 已提交
3195
    vData = (VgData*)taosHashIterate(pVgHash, vData);
3196 3197 3198 3199
  }

  launchQueryImpl(pRequest, pQuery, true, NULL);
  code = pRequest->code;
wmmhello's avatar
wmmhello 已提交
3200

3201
end:
wmmhello's avatar
wmmhello 已提交
3202 3203
  tDecoderClear(&decoder);
  taos_free_result(&rspObj);
3204 3205 3206 3207
  qDestroyQuery(pQuery);
  destroyRequest(pRequest);
  taosHashCleanup(pVgHash);
  return code;
wmmhello's avatar
wmmhello 已提交
3208 3209
}

wmmhello's avatar
wmmhello 已提交
3210 3211 3212 3213 3214 3215 3216 3217 3218 3219 3220 3221 3222 3223 3224 3225 3226 3227 3228 3229 3230 3231 3232 3233
char* tmq_get_json_meta(TAOS_RES* res) {
  if (!TD_RES_TMQ_META(res)) {
    return NULL;
  }

  SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
  if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB) {
    return processCreateStb(&pMetaRspObj->metaRsp);
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_STB) {
    return processAlterStb(&pMetaRspObj->metaRsp);
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_STB) {
    return processDropSTable(&pMetaRspObj->metaRsp);
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_TABLE) {
    return processCreateTable(&pMetaRspObj->metaRsp);
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_TABLE) {
    return processAlterTable(&pMetaRspObj->metaRsp);
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE) {
    return processDropTable(&pMetaRspObj->metaRsp);
  }
  return NULL;
}

void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }

L
Liu Jicong 已提交
3234 3235
int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
  if (!raw || !res) {
wmmhello's avatar
wmmhello 已提交
3236 3237 3238 3239 3240 3241 3242
    return TSDB_CODE_INVALID_PARA;
  }
  if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    raw->raw = pMetaRspObj->metaRsp.metaRsp;
    raw->raw_len = pMetaRspObj->metaRsp.metaRspLen;
    raw->raw_type = pMetaRspObj->metaRsp.resMsgType;
L
Liu Jicong 已提交
3243 3244
  } else if (TD_RES_TMQ(res)) {
    SMqRspObj* rspObj = ((SMqRspObj*)res);
wmmhello's avatar
wmmhello 已提交
3245 3246 3247 3248 3249 3250 3251 3252

    int32_t len = 0;
    int32_t code = 0;
    tEncodeSize(tEncodeSMqDataRsp, &rspObj->rsp, len, code);
    if (code < 0) {
      return -1;
    }

L
Liu Jicong 已提交
3253
    void*    buf = taosMemoryCalloc(1, len);
wmmhello's avatar
wmmhello 已提交
3254 3255 3256 3257 3258 3259 3260 3261 3262 3263 3264 3265 3266 3267 3268
    SEncoder encoder = {0};
    tEncoderInit(&encoder, buf, len);
    tEncodeSMqDataRsp(&encoder, &rspObj->rsp);
    tEncoderClear(&encoder);

    raw->raw = buf;
    raw->raw_len = len;
    raw->raw_type = RES_TYPE__TMQ;
  } else {
    return TSDB_CODE_TMQ_INVALID_MSG;
  }
  return TSDB_CODE_SUCCESS;
}

void tmq_free_raw(tmq_raw_data raw) {
L
Liu Jicong 已提交
3269
  if (raw.raw_type == RES_TYPE__TMQ) {
wmmhello's avatar
wmmhello 已提交
3270 3271 3272 3273
    taosMemoryFree(raw.raw);
  }
}

L
Liu Jicong 已提交
3274
int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
wmmhello's avatar
wmmhello 已提交
3275 3276 3277 3278
  if (!taos) {
    return TSDB_CODE_INVALID_PARA;
  }

L
Liu Jicong 已提交
3279
  if (raw.raw_type == TDMT_VND_CREATE_STB) {
wmmhello's avatar
wmmhello 已提交
3280
    return taosCreateStb(taos, raw.raw, raw.raw_len);
L
Liu Jicong 已提交
3281
  } else if (raw.raw_type == TDMT_VND_ALTER_STB) {
wmmhello's avatar
wmmhello 已提交
3282
    return taosCreateStb(taos, raw.raw, raw.raw_len);
L
Liu Jicong 已提交
3283
  } else if (raw.raw_type == TDMT_VND_DROP_STB) {
wmmhello's avatar
wmmhello 已提交
3284
    return taosDropStb(taos, raw.raw, raw.raw_len);
L
Liu Jicong 已提交
3285
  } else if (raw.raw_type == TDMT_VND_CREATE_TABLE) {
wmmhello's avatar
wmmhello 已提交
3286
    return taosCreateTable(taos, raw.raw, raw.raw_len);
L
Liu Jicong 已提交
3287
  } else if (raw.raw_type == TDMT_VND_ALTER_TABLE) {
wmmhello's avatar
wmmhello 已提交
3288
    return taosAlterTable(taos, raw.raw, raw.raw_len);
L
Liu Jicong 已提交
3289
  } else if (raw.raw_type == TDMT_VND_DROP_TABLE) {
wmmhello's avatar
wmmhello 已提交
3290
    return taosDropTable(taos, raw.raw, raw.raw_len);
L
Liu Jicong 已提交
3291
  } else if (raw.raw_type == TDMT_VND_DELETE) {
wmmhello's avatar
wmmhello 已提交
3292
    return taosDeleteData(taos, raw.raw, raw.raw_len);
L
Liu Jicong 已提交
3293
  } else if (raw.raw_type == RES_TYPE__TMQ) {
wmmhello's avatar
wmmhello 已提交
3294 3295 3296 3297 3298
    return tmqWriteRaw(taos, raw.raw, raw.raw_len);
  }
  return TSDB_CODE_INVALID_PARA;
}

L
Liu Jicong 已提交
3299
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
3300
  //
L
Liu Jicong 已提交
3301
  tmqCommitInner(tmq, msg, 0, 1, cb, param);
L
Liu Jicong 已提交
3302 3303
}

3304 3305
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) {
  //
L
Liu Jicong 已提交
3306
  return tmqCommitInner(tmq, msg, 0, 0, NULL, NULL);
3307
}