tmq.c 107.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "cJSON.h"
17 18 19
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22 23
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
24
#include "tqueue.h"
25
#include "tref.h"
L
Liu Jicong 已提交
26 27
#include "ttimer.h"

L
Liu Jicong 已提交
28 29 30 31 32 33 34 35
int32_t tmqAskEp(tmq_t* tmq, bool async);

typedef struct {
  int8_t inited;
  tmr_h  timer;
} SMqMgmt;

static SMqMgmt tmqMgmt = {0};
36

L
Liu Jicong 已提交
37 38 39 40 41 42
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
43 44 45
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
46 47
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
48
struct tmq_list_t {
L
Liu Jicong 已提交
49
  SArray container;
L
Liu Jicong 已提交
50
};
L
Liu Jicong 已提交
51

L
Liu Jicong 已提交
52
struct tmq_conf_t {
53 54 55 56 57 58 59 60 61 62
  char    clientId[256];
  char    groupId[TSDB_CGROUP_LEN];
  int8_t  autoCommit;
  int8_t  resetOffset;
  int8_t  withTbName;
  int8_t  ssEnable;
  int32_t ssBatchSize;

  bool hbBgEnable;

63 64 65 66 67
  uint16_t       port;
  int32_t        autoCommitInterval;
  char*          ip;
  char*          user;
  char*          pass;
68
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
69
  void*          commitCbUserParam;
L
Liu Jicong 已提交
70 71 72
};

struct tmq_t {
L
Liu Jicong 已提交
73
  // conf
74 75 76 77 78 79 80 81 82 83 84
  char    groupId[TSDB_CGROUP_LEN];
  char    clientId[256];
  int8_t  withTbName;
  int8_t  useSnapshot;
  int8_t  autoCommit;
  int32_t autoCommitInterval;
  int32_t resetOffsetCfg;
  int64_t consumerId;

  bool hbBgEnable;

L
Liu Jicong 已提交
85 86
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
87 88 89 90

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
91 92
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
93
  int32_t epSkipCnt;
L
Liu Jicong 已提交
94
#endif
L
Liu Jicong 已提交
95 96
  int64_t pollCnt;

L
Liu Jicong 已提交
97
  // timer
98 99
  tmr_h hbLiveTimer;
  tmr_h epTimer;
L
Liu Jicong 已提交
100 101 102
  tmr_h reportTimer;
  tmr_h commitTimer;

L
Liu Jicong 已提交
103 104 105 106
  // connection
  STscObj* pTscObj;

  // container
L
Liu Jicong 已提交
107
  SArray*     clientTopics;  // SArray<SMqClientTopic>
L
Liu Jicong 已提交
108
  STaosQueue* mqueue;        // queue of rsp
L
Liu Jicong 已提交
109
  STaosQall*  qall;
L
Liu Jicong 已提交
110 111 112 113
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit

  // ctl
  tsem_t rspSem;
L
Liu Jicong 已提交
114 115
};

X
Xiaoyu Wang 已提交
116 117 118 119 120 121 122 123
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
124
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
125 126
};

L
Liu Jicong 已提交
127
enum {
128
  TMQ_DELAYED_TASK__ASK_EP = 1,
L
Liu Jicong 已提交
129 130 131 132
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

L
Liu Jicong 已提交
133
typedef struct {
134 135 136
  // statistics
  int64_t pollCnt;
  // offset
L
Liu Jicong 已提交
137 138 139 140
  /*int64_t      committedOffset;*/
  /*int64_t      currentOffset;*/
  STqOffsetVal committedOffsetNew;
  STqOffsetVal currentOffsetNew;
L
Liu Jicong 已提交
141
  // connection info
142
  int32_t vgId;
X
Xiaoyu Wang 已提交
143
  int32_t vgStatus;
L
Liu Jicong 已提交
144
  int32_t vgSkipCnt;
145 146 147
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
148
typedef struct {
149
  // subscribe info
L
Liu Jicong 已提交
150
  char* topicName;
L
Liu Jicong 已提交
151
  char  db[TSDB_DB_FNAME_LEN];
L
Liu Jicong 已提交
152 153 154

  SArray* vgs;  // SArray<SMqClientVg>

L
Liu Jicong 已提交
155 156
  int8_t         isSchemaAdaptive;
  SSchemaWrapper schema;
157 158
} SMqClientTopic;

L
Liu Jicong 已提交
159 160 161 162 163
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
164
  union {
L
Liu Jicong 已提交
165 166
    SMqDataRsp dataRsp;
    SMqMetaRsp metaRsp;
L
Liu Jicong 已提交
167
  };
L
Liu Jicong 已提交
168 169
} SMqPollRspWrapper;

L
Liu Jicong 已提交
170
typedef struct {
L
Liu Jicong 已提交
171 172 173
  tmq_t*  tmq;
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
174
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
175

L
Liu Jicong 已提交
176
typedef struct {
177
  tmq_t*  tmq;
L
Liu Jicong 已提交
178
  int32_t code;
L
Liu Jicong 已提交
179
  int32_t async;
X
Xiaoyu Wang 已提交
180
  tsem_t  rspSem;
181 182
} SMqAskEpCbParam;

L
Liu Jicong 已提交
183
typedef struct {
L
Liu Jicong 已提交
184 185
  tmq_t*          tmq;
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
186
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
187
  int32_t         epoch;
L
Liu Jicong 已提交
188
  int32_t         vgId;
L
Liu Jicong 已提交
189
  tsem_t          rspSem;
X
Xiaoyu Wang 已提交
190
} SMqPollCbParam;
191

L
Liu Jicong 已提交
192
#if 0
L
Liu Jicong 已提交
193
typedef struct {
L
Liu Jicong 已提交
194
  tmq_t*         tmq;
L
Liu Jicong 已提交
195 196
  int8_t         async;
  int8_t         automatic;
L
Liu Jicong 已提交
197
  int8_t         freeOffsets;
L
Liu Jicong 已提交
198
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
199
  tsem_t         rspSem;
L
Liu Jicong 已提交
200
  int32_t        rspErr;
L
Liu Jicong 已提交
201
  SArray*        offsets;
L
Liu Jicong 已提交
202
  void*          userParam;
L
Liu Jicong 已提交
203
} SMqCommitCbParam;
L
Liu Jicong 已提交
204
#endif
L
Liu Jicong 已提交
205

206
typedef struct {
L
Liu Jicong 已提交
207 208 209 210
  tmq_t* tmq;
  int8_t automatic;
  int8_t async;
  /*int8_t         freeOffsets;*/
L
Liu Jicong 已提交
211 212
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
L
Liu Jicong 已提交
213
  int32_t        rspErr;
214
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
215 216 217 218
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
  void*  userParam;
  tsem_t rspSem;
219 220 221 222 223 224 225
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
  STqOffset*           pOffset;
} SMqCommitCbParam2;

226
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
227
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
228
  conf->withTbName = false;
L
Liu Jicong 已提交
229
  conf->autoCommit = true;
L
Liu Jicong 已提交
230
  conf->autoCommitInterval = 5000;
X
Xiaoyu Wang 已提交
231
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
232 233 234
  return conf;
}

L
Liu Jicong 已提交
235
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
236 237 238 239 240 241
  if (conf) {
    if (conf->ip) taosMemoryFree(conf->ip);
    if (conf->user) taosMemoryFree(conf->user);
    if (conf->pass) taosMemoryFree(conf->pass);
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
242 243 244
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
245 246
  if (strcmp(key, "group.id") == 0) {
    strcpy(conf->groupId, value);
L
Liu Jicong 已提交
247
    return TMQ_CONF_OK;
248
  }
L
Liu Jicong 已提交
249

250 251
  if (strcmp(key, "client.id") == 0) {
    strcpy(conf->clientId, value);
L
Liu Jicong 已提交
252 253
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
254

L
Liu Jicong 已提交
255 256
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
257
      conf->autoCommit = true;
L
Liu Jicong 已提交
258 259
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
260
      conf->autoCommit = false;
L
Liu Jicong 已提交
261 262 263 264
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
265
  }
L
Liu Jicong 已提交
266

L
Liu Jicong 已提交
267 268 269 270 271
  if (strcmp(key, "auto.commit.interval.ms") == 0) {
    conf->autoCommitInterval = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
272 273 274 275 276 277 278 279 280 281 282 283 284 285
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
286

287 288
  if (strcmp(key, "msg.with.table.name") == 0) {
    if (strcmp(value, "true") == 0) {
289
      conf->withTbName = true;
L
Liu Jicong 已提交
290
      return TMQ_CONF_OK;
291
    } else if (strcmp(value, "false") == 0) {
292
      conf->withTbName = false;
L
Liu Jicong 已提交
293
      return TMQ_CONF_OK;
294 295 296 297 298
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
299
  if (strcmp(key, "experimental.snapshot.enable") == 0) {
L
Liu Jicong 已提交
300
    if (strcmp(value, "true") == 0) {
301 302 303 304 305 306 307 308 309 310 311 312 313
      conf->ssEnable = true;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
      conf->ssEnable = false;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

  if (strcmp(key, "enable.heartbeat.background") == 0) {
    if (strcmp(value, "true") == 0) {
      conf->hbBgEnable = true;
L
Liu Jicong 已提交
314 315
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
316
      conf->hbBgEnable = false;
L
Liu Jicong 已提交
317 318 319 320
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
321
    return TMQ_CONF_OK;
L
Liu Jicong 已提交
322 323
  }

L
Liu Jicong 已提交
324
  if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
325
    conf->ssBatchSize = atoi(value);
L
Liu Jicong 已提交
326 327 328
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
329
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
330 331 332
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
333
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
334 335 336
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
337
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
338 339 340
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
341
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
342 343 344
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
345
  if (strcmp(key, "td.connect.db") == 0) {
346
    /*conf->db = strdup(value);*/
L
Liu Jicong 已提交
347 348 349
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
350
  return TMQ_CONF_UNKNOWN;
351 352 353
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
354 355
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
356 357
}

L
Liu Jicong 已提交
358 359
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
360 361 362 363 364
  if (src == NULL || src[0] == 0) return -1;
  char* topic = strdup(src);
  if (topic[0] != '`') {
    strtolower(topic, src);
  }
L
fix  
Liu Jicong 已提交
365
  if (taosArrayPush(container, &topic) == NULL) return -1;
366 367 368
  return 0;
}

L
Liu Jicong 已提交
369
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
370
  SArray* container = &list->container;
L
Liu Jicong 已提交
371
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
372 373
}

L
Liu Jicong 已提交
374 375 376 377 378 379 380 381 382 383
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

L
Liu Jicong 已提交
384 385 386 387
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

L
Liu Jicong 已提交
388
#if 0
L
Liu Jicong 已提交
389 390
int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
  SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
L
Liu Jicong 已提交
391
  pParam->rspErr = code;
L
Liu Jicong 已提交
392 393
  if (pParam->async) {
    if (pParam->automatic && pParam->tmq->commitCb) {
L
Liu Jicong 已提交
394
      pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, pParam->tmq->commitCbUserParam);
L
Liu Jicong 已提交
395
    } else if (!pParam->automatic && pParam->userCb) {
L
Liu Jicong 已提交
396
      pParam->userCb(pParam->tmq, pParam->rspErr, pParam->userParam);
L
Liu Jicong 已提交
397 398
    }

L
Liu Jicong 已提交
399
    if (pParam->freeOffsets) {
L
Liu Jicong 已提交
400 401 402 403 404 405 406 407 408
      taosArrayDestroy(pParam->offsets);
    }

    taosMemoryFree(pParam);
  } else {
    tsem_post(&pParam->rspSem);
  }
  return 0;
}
L
Liu Jicong 已提交
409
#endif
L
Liu Jicong 已提交
410

D
dapan1121 已提交
411
int32_t tmqCommitCb2(void* param, SDataBuf* pBuf, int32_t code) {
412 413 414
  SMqCommitCbParam2*   pParam = (SMqCommitCbParam2*)param;
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
  // push into array
L
Liu Jicong 已提交
415
#if 0
416 417 418 419 420
  if (code == 0) {
    taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset);
  } else {
    taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
  }
L
Liu Jicong 已提交
421
#endif
L
Liu Jicong 已提交
422

S
Shengliang Guan 已提交
423
  /*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
L
Liu Jicong 已提交
424 425
   * pOffset->version);*/

426
  // count down waiting rsp
L
Liu Jicong 已提交
427
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
428 429 430 431 432 433 434
  ASSERT(waitingRspNum >= 0);

  if (waitingRspNum == 0) {
    // if no more waiting rsp
    if (pParamSet->async) {
      // call async cb func
      if (pParamSet->automatic && pParamSet->tmq->commitCb) {
L
Liu Jicong 已提交
435
        pParamSet->tmq->commitCb(pParamSet->tmq, pParamSet->rspErr, pParamSet->tmq->commitCbUserParam);
436 437
      } else if (!pParamSet->automatic && pParamSet->userCb) {
        // sem post
L
Liu Jicong 已提交
438
        pParamSet->userCb(pParamSet->tmq, pParamSet->rspErr, pParamSet->userParam);
439
      }
L
Liu Jicong 已提交
440 441
    } else {
      tsem_post(&pParamSet->rspSem);
442 443
    }

L
Liu Jicong 已提交
444
#if 0
445 446
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
L
Liu Jicong 已提交
447
#endif
448 449 450 451
  }
  return 0;
}

L
Liu Jicong 已提交
452 453 454 455 456 457 458
static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pTopic, SMqCommitCbParamSet* pParamSet) {
  STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
  if (pOffset == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pOffset->val = pVg->currentOffsetNew;
459

L
Liu Jicong 已提交
460 461 462 463
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pOffset->subKey, tmq->groupId, groupLen);
  pOffset->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);
L
Liu Jicong 已提交
464

L
Liu Jicong 已提交
465 466 467 468 469 470 471 472 473 474
  int32_t len;
  int32_t code;
  tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
  if (code < 0) {
    ASSERT(0);
    return -1;
  }
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
  if (buf == NULL) return -1;
  ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
475

L
Liu Jicong 已提交
476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
  tEncodeSTqOffset(&encoder, pOffset);

  // build param
  SMqCommitCbParam2* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam2));
  pParam->params = pParamSet;
  pParam->pOffset = pOffset;

  // build send info
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
    return -1;
  }
  pMsgSendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = sizeof(SMsgHead) + len,
      .handle = NULL,
  };

S
Shengliang Guan 已提交
498 499
  tscDebug("consumer:%" PRId64 ", commit offset of %s on vgId:%d, offset is %" PRId64, tmq->consumerId, pOffset->subKey,
           pVg->vgId, pOffset->val.version);
L
Liu Jicong 已提交
500 501 502 503 504 505 506

  // TODO: put into cb
  pVg->committedOffsetNew = pVg->currentOffsetNew;

  pMsgSendInfo->requestId = generateRequestId();
  pMsgSendInfo->requestObjRefId = 0;
  pMsgSendInfo->param = pParam;
L
Liu Jicong 已提交
507
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
L
Liu Jicong 已提交
508 509 510 511
  pMsgSendInfo->fp = tmqCommitCb2;
  pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
  // send msg

L
Liu Jicong 已提交
512 513 514
  atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
  atomic_add_fetch_32(&pParamSet->totalRspNum, 1);

L
Liu Jicong 已提交
515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
  return 0;
}

int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
  char*   topic;
  int32_t vgId;
  ASSERT(msg != NULL);
  if (TD_RES_TMQ(msg)) {
    SMqRspObj* pRspObj = (SMqRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
  } else if (TD_RES_TMQ_META(msg)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
    topic = pMetaRspObj->topic;
    vgId = pMetaRspObj->vgId;
  } else {
    return TSDB_CODE_TMQ_INVALID_MSG;
  }

  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pParamSet->tmq = tmq;
  pParamSet->automatic = 0;
  pParamSet->async = async;
  /*pParamSet->freeOffsets = 1;*/
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

L
Liu Jicong 已提交
549 550
  int32_t code = -1;

L
Liu Jicong 已提交
551 552 553 554 555 556 557 558 559 560
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(pTopic->topicName, topic) != 0) continue;
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      if (pVg->vgId != vgId) continue;

      if (pVg->currentOffsetNew.type > 0 && !tOffsetEqual(&pVg->currentOffsetNew, &pVg->committedOffsetNew)) {
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          goto FAIL;
L
Liu Jicong 已提交
561
        }
L
Liu Jicong 已提交
562
        goto HANDLE_RSP;
L
Liu Jicong 已提交
563 564
      }
    }
L
Liu Jicong 已提交
565
  }
L
Liu Jicong 已提交
566

L
Liu Jicong 已提交
567 568 569 570
HANDLE_RSP:
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
571 572 573
    return 0;
  }

L
Liu Jicong 已提交
574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
    return code;
  } else {
    code = 0;
  }

FAIL:
  if (code != 0 && async) {
    userCb(tmq, code, userParam);
  }
  return 0;
}

int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                        void* userParam) {
  int32_t code = -1;

  if (msg != NULL) {
    return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
  }

598 599 600 601 602 603 604 605
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pParamSet->tmq = tmq;
  pParamSet->automatic = automatic;
  pParamSet->async = async;
L
Liu Jicong 已提交
606
  /*pParamSet->freeOffsets = 1;*/
607 608 609 610 611 612
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
613

S
Shengliang Guan 已提交
614
    tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgNum %d", tmq->consumerId, pTopic->topicName,
L
Liu Jicong 已提交
615 616
             (int32_t)taosArrayGetSize(pTopic->vgs));

617
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
L
Liu Jicong 已提交
618 619
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);

S
Shengliang Guan 已提交
620 621
      tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgId:%d", tmq->consumerId, pTopic->topicName,
               pVg->vgId);
L
Liu Jicong 已提交
622

L
Liu Jicong 已提交
623 624 625 626
      if (pVg->currentOffsetNew.type > 0 && !tOffsetEqual(&pVg->currentOffsetNew, &pVg->committedOffsetNew)) {
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          continue;
        }
627 628 629 630
      }
    }
  }

L
Liu Jicong 已提交
631 632 633 634 635 636
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
    return 0;
  }

637 638 639 640 641 642 643 644 645 646
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
  } else {
    code = 0;
  }

  if (code != 0 && async) {
    if (automatic) {
L
Liu Jicong 已提交
647
      tmq->commitCb(tmq, code, tmq->commitCbUserParam);
648
    } else {
L
Liu Jicong 已提交
649
      userCb(tmq, code, userParam);
650 651 652
    }
  }

L
Liu Jicong 已提交
653
#if 0
654 655 656 657
  if (!async) {
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
  }
L
Liu Jicong 已提交
658
#endif
659 660 661 662

  return 0;
}

L
Liu Jicong 已提交
663 664
#if 0
int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async,
L
Liu Jicong 已提交
665 666 667 668 669 670
                       tmq_commit_cb* userCb, void* userParam) {
  SMqCMCommitOffsetReq req;
  SArray*              pOffsets = NULL;
  void*                buf = NULL;
  SMqCommitCbParam*    pParam = NULL;
  SMsgSendInfo*        sendInfo = NULL;
L
Liu Jicong 已提交
671 672
  int8_t               freeOffsets;
  int32_t              code = -1;
L
Liu Jicong 已提交
673

L
Liu Jicong 已提交
674
  if (msg == NULL) {
L
Liu Jicong 已提交
675
    freeOffsets = 1;
L
Liu Jicong 已提交
676 677 678 679 680 681 682 683 684 685 686 687 688 689
    pOffsets = taosArrayInit(0, sizeof(SMqOffset));
    for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
      for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
        SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
        SMqOffset    offset;
        tstrncpy(offset.topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
        tstrncpy(offset.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
        offset.vgId = pVg->vgId;
        offset.offset = pVg->currentOffset;
        taosArrayPush(pOffsets, &offset);
      }
    }
  } else {
L
Liu Jicong 已提交
690
    freeOffsets = 0;
L
Liu Jicong 已提交
691
    pOffsets = (SArray*)&msg->container;
L
Liu Jicong 已提交
692 693 694 695 696
  }

  req.num = (int32_t)pOffsets->size;
  req.offsets = pOffsets->pData;

L
Liu Jicong 已提交
697 698 699 700
  SEncoder encoder;

  tEncoderInit(&encoder, NULL, 0);
  code = tEncodeSMqCMCommitOffsetReq(&encoder, &req);
L
Liu Jicong 已提交
701 702 703
  if (code < 0) {
    goto END;
  }
L
Liu Jicong 已提交
704
  int32_t tlen = encoder.pos;
L
Liu Jicong 已提交
705 706
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) {
L
Liu Jicong 已提交
707
    tEncoderClear(&encoder);
L
Liu Jicong 已提交
708 709
    goto END;
  }
L
Liu Jicong 已提交
710 711
  tEncoderClear(&encoder);

L
Liu Jicong 已提交
712 713 714 715 716 717 718 719 720 721 722 723
  tEncoderInit(&encoder, buf, tlen);
  tEncodeSMqCMCommitOffsetReq(&encoder, &req);
  tEncoderClear(&encoder);

  pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
  if (pParam == NULL) {
    goto END;
  }
  pParam->tmq = tmq;
  pParam->automatic = automatic;
  pParam->async = async;
  pParam->offsets = pOffsets;
L
Liu Jicong 已提交
724
  pParam->freeOffsets = freeOffsets;
L
Liu Jicong 已提交
725 726 727 728
  pParam->userCb = userCb;
  pParam->userParam = userParam;
  if (!async) tsem_init(&pParam->rspSem, 0, 0);

729
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752
  if (sendInfo == NULL) goto END;
  sendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqCommitCb;
  sendInfo->msgType = TDMT_MND_MQ_COMMIT_OFFSET;

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

  if (!async) {
    tsem_wait(&pParam->rspSem);
    code = pParam->rspErr;
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
753 754
  } else {
    code = 0;
L
Liu Jicong 已提交
755 756 757 758 759 760 761
  }

  // avoid double free if msg is sent
  buf = NULL;

END:
  if (buf) taosMemoryFree(buf);
L
Liu Jicong 已提交
762 763
  /*if (pParam) taosMemoryFree(pParam);*/
  /*if (sendInfo) taosMemoryFree(sendInfo);*/
L
Liu Jicong 已提交
764 765 766

  if (code != 0 && async) {
    if (automatic) {
L
Liu Jicong 已提交
767
      tmq->commitCb(tmq, code, (tmq_topic_vgroup_list_t*)pOffsets, tmq->commitCbUserParam);
L
Liu Jicong 已提交
768
    } else {
L
Liu Jicong 已提交
769
      userCb(tmq, code, (tmq_topic_vgroup_list_t*)pOffsets, userParam);
L
Liu Jicong 已提交
770 771 772
    }
  }

L
Liu Jicong 已提交
773
  if (!async && freeOffsets) {
L
Liu Jicong 已提交
774 775 776 777
    taosArrayDestroy(pOffsets);
  }
  return code;
}
L
Liu Jicong 已提交
778
#endif
L
Liu Jicong 已提交
779

780
void tmqAssignAskEpTask(void* param, void* tmrId) {
L
Liu Jicong 已提交
781
  tmq_t*  tmq = (tmq_t*)param;
782
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
783
  *pTaskType = TMQ_DELAYED_TASK__ASK_EP;
L
Liu Jicong 已提交
784
  taosWriteQitem(tmq->delayedTask, pTaskType);
785
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
786 787 788 789
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
790
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
791 792
  *pTaskType = TMQ_DELAYED_TASK__COMMIT;
  taosWriteQitem(tmq->delayedTask, pTaskType);
793
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
794 795 796 797
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
798
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
799 800
  *pTaskType = TMQ_DELAYED_TASK__REPORT;
  taosWriteQitem(tmq->delayedTask, pTaskType);
801
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
802 803
}

804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
  if (pMsg && pMsg->pData) taosMemoryFree(pMsg->pData);
  return 0;
}

void tmqSendHbReq(void* param, void* tmrId) {
  // TODO replace with ref
  tmq_t*    tmq = (tmq_t*)param;
  int64_t   consumerId = tmq->consumerId;
  int32_t   epoch = tmq->epoch;
  SMqHbReq* pReq = taosMemoryMalloc(sizeof(SMqHbReq));
  if (pReq == NULL) goto OVER;
  pReq->consumerId = consumerId;
  pReq->epoch = epoch;

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pReq);
  }
  sendInfo->msgInfo = (SDataBuf){
      .pData = pReq,
      .len = sizeof(SMqHbReq),
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = NULL;
  sendInfo->fp = tmqHbCb;
  sendInfo->msgType = TDMT_MND_MQ_HB;

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

OVER:
  taosTmrReset(tmqSendHbReq, 1000, tmq, tmqMgmt.timer, &tmq->hbLiveTimer);
}

L
Liu Jicong 已提交
844 845 846 847 848 849 850 851
int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
  STaosQall* qall = taosAllocateQall();
  taosReadAllQitems(tmq->delayedTask, qall);
  while (1) {
    int8_t* pTaskType = NULL;
    taosGetQitem(qall, (void**)&pTaskType);
    if (pTaskType == NULL) break;

852
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
L
Liu Jicong 已提交
853
      tmqAskEp(tmq, true);
854
      taosTmrReset(tmqAssignAskEpTask, 1000, tmq, tmqMgmt.timer, &tmq->epTimer);
L
Liu Jicong 已提交
855
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
L
Liu Jicong 已提交
856
      tmqCommitInner2(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
L
Liu Jicong 已提交
857 858 859 860 861
      taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer);
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    } else {
      ASSERT(0);
    }
L
Liu Jicong 已提交
862
    taosFreeQitem(pTaskType);
L
Liu Jicong 已提交
863 864 865 866 867
  }
  taosFreeQall(qall);
  return 0;
}

L
Liu Jicong 已提交
868
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
869
  SMqRspWrapper* msg = NULL;
L
Liu Jicong 已提交
870 871 872 873 874 875 876 877
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }

L
fix  
Liu Jicong 已提交
878
  msg = NULL;
L
Liu Jicong 已提交
879 880 881 882 883 884 885 886 887 888
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }
}

D
dapan1121 已提交
889
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
890 891
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
L
Liu Jicong 已提交
892
  /*tmq_t* tmq = pParam->tmq;*/
L
Liu Jicong 已提交
893 894 895
  tsem_post(&pParam->rspSem);
  return 0;
}
896

L
Liu Jicong 已提交
897
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
X
Xiaoyu Wang 已提交
898 899 900 901
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
902
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
903
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
904
  }
L
Liu Jicong 已提交
905
  return 0;
X
Xiaoyu Wang 已提交
906 907
}

L
Liu Jicong 已提交
908 909 910
int32_t tmq_unsubscribe(tmq_t* tmq) {
  tmq_list_t* lst = tmq_list_new();
  int32_t     rsp = tmq_subscribe(tmq, lst);
L
Liu Jicong 已提交
911 912
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
913 914
}

L
Liu Jicong 已提交
915
#if 0
916
tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
wafwerar's avatar
wafwerar 已提交
917
  tmq_t* pTmq = taosMemoryCalloc(sizeof(tmq_t), 1);
918 919 920 921 922 923
  if (pTmq == NULL) {
    return NULL;
  }
  pTmq->pTscObj = (STscObj*)conn;
  pTmq->status = 0;
  pTmq->pollCnt = 0;
L
Liu Jicong 已提交
924
  pTmq->epoch = 0;
L
fix txn  
Liu Jicong 已提交
925
  pTmq->epStatus = 0;
L
temp  
Liu Jicong 已提交
926
  pTmq->epSkipCnt = 0;
L
Liu Jicong 已提交
927
  // set conf
928 929
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
L
Liu Jicong 已提交
930
  pTmq->autoCommit = conf->autoCommit;
931
  pTmq->commit_cb = conf->commit_cb;
L
Liu Jicong 已提交
932
  pTmq->resetOffsetCfg = conf->resetOffset;
L
Liu Jicong 已提交
933

L
Liu Jicong 已提交
934 935 936 937 938 939 940 941 942 943
  pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  if (pTmq->clientTopics == NULL) {
    taosMemoryFree(pTmq);
    return NULL;
  }

  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();

L
Liu Jicong 已提交
944 945
  tsem_init(&pTmq->rspSem, 0, 0);

L
Liu Jicong 已提交
946 947
  return pTmq;
}
L
Liu Jicong 已提交
948
#endif
L
Liu Jicong 已提交
949

L
Liu Jicong 已提交
950
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
L
Liu Jicong 已提交
951 952 953 954 955 956 957 958 959 960 961
  // init timer
  int8_t inited = atomic_val_compare_exchange_8(&tmqMgmt.inited, 0, 1);
  if (inited == 0) {
    tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
    if (tmqMgmt.timer == NULL) {
      atomic_store_8(&tmqMgmt.inited, 0);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
  }

L
Liu Jicong 已提交
962 963
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
L
Liu Jicong 已提交
964
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
965 966
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
967 968
    return NULL;
  }
L
Liu Jicong 已提交
969

L
Liu Jicong 已提交
970 971 972 973 974
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

  ASSERT(user);
  ASSERT(pass);
L
Liu Jicong 已提交
975
  ASSERT(conf->groupId[0]);
L
Liu Jicong 已提交
976

L
Liu Jicong 已提交
977 978 979 980 981 982
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
  pTmq->delayedTask = taosOpenQueue();

  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
L
Liu Jicong 已提交
983
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
984 985
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
986 987
    goto FAIL;
  }
L
Liu Jicong 已提交
988

L
Liu Jicong 已提交
989 990
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
991 992
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
993 994
  /*pTmq->epStatus = 0;*/
  /*pTmq->epSkipCnt = 0;*/
L
Liu Jicong 已提交
995

L
Liu Jicong 已提交
996 997 998
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
999
  pTmq->withTbName = conf->withTbName;
1000
  pTmq->useSnapshot = conf->ssEnable;
L
Liu Jicong 已提交
1001
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
1002
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
1003 1004
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
1005 1006
  pTmq->resetOffsetCfg = conf->resetOffset;

1007 1008
  pTmq->hbBgEnable = conf->hbBgEnable;

L
Liu Jicong 已提交
1009
  // assign consumerId
L
Liu Jicong 已提交
1010
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
1011

L
Liu Jicong 已提交
1012 1013
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
S
Shengliang Guan 已提交
1014 1015
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
1016 1017
    goto FAIL;
  }
L
Liu Jicong 已提交
1018

L
Liu Jicong 已提交
1019 1020 1021
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
S
Shengliang Guan 已提交
1022 1023
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
1024 1025 1026
    tsem_destroy(&pTmq->rspSem);
    goto FAIL;
  }
L
Liu Jicong 已提交
1027

1028 1029 1030 1031
  if (pTmq->hbBgEnable) {
    pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pTmq, tmqMgmt.timer);
  }

S
Shengliang Guan 已提交
1032
  tscInfo("consumer %" PRId64 " is setup, consumer group %s", pTmq->consumerId, pTmq->groupId);
L
Liu Jicong 已提交
1033

1034
  return pTmq;
L
Liu Jicong 已提交
1035 1036 1037 1038 1039 1040 1041 1042

FAIL:
  if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
  if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
  if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
  if (pTmq->qall) taosFreeQall(pTmq->qall);
  taosMemoryFree(pTmq);
  return NULL;
1043 1044
}

L
Liu Jicong 已提交
1045 1046
#if 0
int32_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
L
Liu Jicong 已提交
1047
  return tmqCommitInner2(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam);
L
Liu Jicong 已提交
1048
}
L
Liu Jicong 已提交
1049
#endif
L
Liu Jicong 已提交
1050

L
Liu Jicong 已提交
1051
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
L
Liu Jicong 已提交
1052 1053 1054 1055 1056
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
  SCMSubscribeReq req = {0};
  int32_t         code = -1;
1057 1058

  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
1059
  tstrncpy(req.clientId, tmq->clientId, 256);
L
Liu Jicong 已提交
1060
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
1061
  req.topicNames = taosArrayInit(sz, sizeof(void*));
L
Liu Jicong 已提交
1062
  if (req.topicNames == NULL) goto FAIL;
1063

L
Liu Jicong 已提交
1064 1065
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
1066 1067

    SName name = {0};
L
Liu Jicong 已提交
1068
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
1069

L
Liu Jicong 已提交
1070 1071 1072
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
1073
    }
L
Liu Jicong 已提交
1074
    tNameExtractFullName(&name, topicFName);
1075

L
Liu Jicong 已提交
1076 1077 1078
    tscDebug("subscribe topic: %s", topicFName);

    taosArrayPush(req.topicNames, &topicFName);
1079 1080
  }

L
Liu Jicong 已提交
1081 1082 1083 1084
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) goto FAIL;

1085 1086 1087
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

1088
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1089
  if (sendInfo == NULL) goto FAIL;
1090

X
Xiaoyu Wang 已提交
1091
  SMqSubscribeCbParam param = {
L
Liu Jicong 已提交
1092
      .rspErr = 0,
X
Xiaoyu Wang 已提交
1093 1094
      .tmq = tmq,
  };
L
Liu Jicong 已提交
1095

L
Liu Jicong 已提交
1096 1097 1098
  if (tsem_init(&param.rspSem, 0, 0) != 0) goto FAIL;

  sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1099 1100 1101 1102
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
1103

L
Liu Jicong 已提交
1104 1105
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1106 1107
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
1108 1109
  sendInfo->msgType = TDMT_MND_SUBSCRIBE;

1110 1111 1112 1113 1114
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
1115 1116 1117
  // avoid double free if msg is sent
  buf = NULL;

L
Liu Jicong 已提交
1118 1119
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
1120

L
Liu Jicong 已提交
1121 1122 1123
  code = param.rspErr;
  if (code != 0) goto FAIL;

L
Liu Jicong 已提交
1124
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
L
fix  
Liu Jicong 已提交
1125
    tscDebug("consumer not ready, retry");
L
Liu Jicong 已提交
1126 1127
    taosMsleep(500);
  }
1128

1129 1130 1131
  // init ep timer
  if (tmq->epTimer == NULL) {
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, tmq, tmqMgmt.timer);
1132
  }
L
Liu Jicong 已提交
1133 1134

  // init auto commit timer
1135
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
L
Liu Jicong 已提交
1136 1137 1138
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer);
  }

L
Liu Jicong 已提交
1139 1140 1141
  code = 0;
FAIL:
  if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
1142
  if (code != 0 && buf) {
L
Liu Jicong 已提交
1143 1144 1145
    taosMemoryFree(buf);
  }
  return code;
1146 1147
}

L
Liu Jicong 已提交
1148
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
1149
  //
1150
  conf->commitCb = cb;
L
Liu Jicong 已提交
1151
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
1152
}
1153

D
dapan1121 已提交
1154
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1155 1156
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
1157
  SMqClientTopic* pTopic = pParam->pTopic;
X
Xiaoyu Wang 已提交
1158
  tmq_t*          tmq = pParam->tmq;
L
Liu Jicong 已提交
1159 1160 1161
  int32_t         vgId = pParam->vgId;
  int32_t         epoch = pParam->epoch;
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
1162
  if (code != 0) {
S
Shengliang Guan 已提交
1163
    tscWarn("msg discard from vgId:%d, epoch %d, code:%x", vgId, epoch, code);
L
Liu Jicong 已提交
1164
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1165 1166 1167 1168
    if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
      if (pRspWrapper == NULL) {
        taosMemoryFree(pMsg->pData);
S
Shengliang Guan 已提交
1169
        tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
Liu Jicong 已提交
1170 1171 1172 1173 1174 1175 1176 1177
        goto CREATE_MSG_FAIL;
      }
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      /*pRspWrapper->vgHandle = pVg;*/
      /*pRspWrapper->topicHandle = pTopic;*/
      taosWriteQitem(tmq->mqueue, pRspWrapper);
      tsem_post(&tmq->rspSem);
    }
L
fix txn  
Liu Jicong 已提交
1178
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1179 1180
  }

X
Xiaoyu Wang 已提交
1181 1182 1183
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
1184
    // do not write into queue since updating epoch reset
S
Shengliang Guan 已提交
1185
    tscWarn("msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d", vgId, msgEpoch,
L
Liu Jicong 已提交
1186
            tmqEpoch);
1187
    tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1188
    taosMemoryFree(pMsg->pData);
X
Xiaoyu Wang 已提交
1189 1190 1191 1192
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
S
Shengliang Guan 已提交
1193
    tscWarn("mismatch rsp from vgId:%d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
X
Xiaoyu Wang 已提交
1194 1195
  }

L
Liu Jicong 已提交
1196 1197 1198
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

1199
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1200
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1201
    taosMemoryFree(pMsg->pData);
S
Shengliang Guan 已提交
1202
    tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
fix txn  
Liu Jicong 已提交
1203
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1204
  }
L
Liu Jicong 已提交
1205

L
Liu Jicong 已提交
1206
  pRspWrapper->tmqRspType = rspType;
L
Liu Jicong 已提交
1207 1208
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
1209

L
Liu Jicong 已提交
1210
  if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1211 1212 1213
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
wmmhello's avatar
wmmhello 已提交
1214
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1215
    memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1216 1217 1218
  } else {
    ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP);
    tDecodeSMqMetaRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->metaRsp);
L
Liu Jicong 已提交
1219
    memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1220
  }
L
Liu Jicong 已提交
1221

L
Liu Jicong 已提交
1222
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1223

S
Shengliang Guan 已提交
1224 1225 1226
  tscDebug("consumer:%" PRId64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d",
           tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version,
           rspType);
L
fix  
Liu Jicong 已提交
1227

L
Liu Jicong 已提交
1228
  taosWriteQitem(tmq->mqueue, pRspWrapper);
1229
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1230

L
Liu Jicong 已提交
1231
  return 0;
L
fix txn  
Liu Jicong 已提交
1232
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1233
  if (epoch == tmq->epoch) {
L
Liu Jicong 已提交
1234 1235
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
1236
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1237
  return -1;
1238 1239
}

1240 1241 1242 1243 1244
bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
  bool set = false;

  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
S
Shengliang Guan 已提交
1245
  tscDebug("consumer:%" PRId64 ", update ep epoch %d to epoch %d, topic num:%d", tmq->consumerId, tmq->epoch, epoch,
1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263
           topicNumGet);

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
S
Shengliang Guan 已提交
1264
      tscDebug("consumer:%" PRId64 ", new vg num: %d", tmq->consumerId, vgNumCur);
1265 1266 1267
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
        sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
L
Liu Jicong 已提交
1268 1269 1270 1271
        char buf[80];
        tFormatOffset(buf, 80, &pVgCur->currentOffsetNew);
        tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch,
                 pVgCur->vgId, vgKey, buf);
L
Liu Jicong 已提交
1272
        taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffsetNew, sizeof(STqOffsetVal));
1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
    topic.schema = pTopicEp->schema;
    topic.topicName = strdup(pTopicEp->topic);
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);

S
Shengliang Guan 已提交
1284
    tscDebug("consumer:%" PRId64 ", update topic: %s", tmq->consumerId, topic.topicName);
1285 1286 1287 1288 1289 1290

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
L
Liu Jicong 已提交
1291 1292
      STqOffsetVal* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      STqOffsetVal  offsetNew = {.type = tmq->resetOffsetCfg};
1293
      if (pOffset != NULL) {
L
Liu Jicong 已提交
1294
        offsetNew = *pOffset;
1295 1296 1297 1298
      }

      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1299
          .currentOffsetNew = offsetNew,
1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
          .vgSkipCnt = 0,
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
    taosArrayPush(newTopics, &topic);
  }
  if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
  taosHashCleanup(pHash);
  tmq->clientTopics = newTopics;

  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);

  atomic_store_32(&tmq->epoch, epoch);
  return set;
}

L
Liu Jicong 已提交
1323
#if 0
L
Liu Jicong 已提交
1324
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
L
Liu Jicong 已提交
1325
  /*printf("call update ep %d\n", epoch);*/
X
Xiaoyu Wang 已提交
1326
  bool    set = false;
L
Liu Jicong 已提交
1327 1328
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
S
Shengliang Guan 已提交
1329
  tscDebug("consumer:%" PRId64 ", update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch,
L
Liu Jicong 已提交
1330
           topicNumGet);
L
Liu Jicong 已提交
1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342
  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }
  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }

  // find topic, build hash
  for (int32_t i = 0; i < topicNumGet; i++) {
X
Xiaoyu Wang 已提交
1343 1344
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
L
Liu Jicong 已提交
1345
    topic.schema = pTopicEp->schema;
L
Liu Jicong 已提交
1346
    taosHashClear(pHash);
X
Xiaoyu Wang 已提交
1347
    topic.topicName = strdup(pTopicEp->topic);
L
Liu Jicong 已提交
1348
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1349

S
Shengliang Guan 已提交
1350
    tscDebug("consumer:%" PRId64 ", update topic: %s", tmq->consumerId, topic.topicName);
L
Liu Jicong 已提交
1351 1352 1353 1354 1355 1356
    int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
    for (int32_t j = 0; j < topicNumCur; j++) {
      // find old topic
      SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
      if (pTopicCur->vgs && strcmp(pTopicCur->topicName, pTopicEp->topic) == 0) {
        int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
S
Shengliang Guan 已提交
1357
        tscDebug("consumer:%" PRId64 ", new vg num: %d", tmq->consumerId, vgNumCur);
L
Liu Jicong 已提交
1358 1359 1360 1361
        if (vgNumCur == 0) break;
        for (int32_t k = 0; k < vgNumCur; k++) {
          SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k);
          sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId);
S
Shengliang Guan 已提交
1362
          tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d build %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey);
L
Liu Jicong 已提交
1363 1364 1365 1366 1367 1368 1369 1370 1371
          taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
        }
        break;
      }
    }

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
X
Xiaoyu Wang 已提交
1372
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
L
Liu Jicong 已提交
1373 1374 1375
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
      int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      int64_t  offset = pVgEp->offset;
S
Shengliang Guan 已提交
1376
      tscDebug("consumer:%" PRId64 ", (epoch %d) original offset of vgId:%d is %" PRId64, tmq->consumerId, epoch, pVgEp->vgId, offset);
L
Liu Jicong 已提交
1377 1378
      if (pOffset != NULL) {
        offset = *pOffset;
S
Shengliang Guan 已提交
1379
        tscDebug("consumer:%" PRId64 ", (epoch %d) receive offset of vgId:%d, full key is %s", tmq->consumerId, epoch, pVgEp->vgId,
1380
                 vgKey);
L
Liu Jicong 已提交
1381
      }
S
Shengliang Guan 已提交
1382
      tscDebug("consumer:%" PRId64 ", (epoch %d) offset of vgId:%d updated to %" PRId64, tmq->consumerId, epoch, pVgEp->vgId, offset);
X
Xiaoyu Wang 已提交
1383 1384
      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1385
          .currentOffset = offset,
X
Xiaoyu Wang 已提交
1386 1387 1388
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
L
Liu Jicong 已提交
1389
          .vgSkipCnt = 0,
X
Xiaoyu Wang 已提交
1390 1391 1392 1393
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
L
Liu Jicong 已提交
1394
    taosArrayPush(newTopics, &topic);
X
Xiaoyu Wang 已提交
1395
  }
L
Liu Jicong 已提交
1396
  if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
L
Liu Jicong 已提交
1397
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
1398
  tmq->clientTopics = newTopics;
1399

1400 1401 1402 1403
  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
1404

X
Xiaoyu Wang 已提交
1405 1406 1407
  atomic_store_32(&tmq->epoch, epoch);
  return set;
}
L
Liu Jicong 已提交
1408
#endif
X
Xiaoyu Wang 已提交
1409

D
dapan1121 已提交
1410
int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
1411
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1412
  tmq_t*           tmq = pParam->tmq;
L
Liu Jicong 已提交
1413
  int8_t           async = pParam->async;
L
Liu Jicong 已提交
1414
  pParam->code = code;
1415
  if (code != 0) {
S
Shengliang Guan 已提交
1416
    tscError("consumer:%" PRId64 ", get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->async);
L
Liu Jicong 已提交
1417
    goto END;
1418
  }
L
Liu Jicong 已提交
1419

L
Liu Jicong 已提交
1420
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1421
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1422
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1423 1424
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
S
Shengliang Guan 已提交
1425
  tscDebug("consumer:%" PRId64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1426 1427
  if (head->epoch <= epoch) {
    goto END;
1428
  }
L
Liu Jicong 已提交
1429

L
Liu Jicong 已提交
1430
  if (!async) {
L
Liu Jicong 已提交
1431 1432
    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
S
Shengliang Guan 已提交
1433 1434
    /*printf("rsp epoch %" PRId64 " sz %" PRId64 "\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %" PRId64 " sz %" PRId64 "\n", tmq->epoch, tmq->clientTopics->size);*/
L
Liu Jicong 已提交
1435
    tmqUpdateEp2(tmq, head->epoch, &rsp);
L
Liu Jicong 已提交
1436
    tDeleteSMqAskEpRsp(&rsp);
X
Xiaoyu Wang 已提交
1437
  } else {
1438
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1439
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1440
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1441 1442
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1443
    }
L
Liu Jicong 已提交
1444 1445 1446
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1447
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1448

L
Liu Jicong 已提交
1449
    taosWriteQitem(tmq->mqueue, pWrapper);
1450
    tsem_post(&tmq->rspSem);
1451
  }
L
Liu Jicong 已提交
1452 1453

END:
L
Liu Jicong 已提交
1454
  /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1455
  if (!async) {
L
Liu Jicong 已提交
1456
    tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
1457 1458
  } else {
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
1459 1460
  }
  return code;
1461 1462
}

L
Liu Jicong 已提交
1463
int32_t tmqAskEp(tmq_t* tmq, bool async) {
L
Liu Jicong 已提交
1464
  int32_t code = 0;
L
Liu Jicong 已提交
1465
#if 0
L
Liu Jicong 已提交
1466
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
L
fix txn  
Liu Jicong 已提交
1467
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
1468
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
S
Shengliang Guan 已提交
1469
    tscTrace("consumer:%" PRId64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
1470
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
1471
  }
L
temp  
Liu Jicong 已提交
1472
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
1473
#endif
L
Liu Jicong 已提交
1474
  int32_t      tlen = sizeof(SMqAskEpReq);
L
Liu Jicong 已提交
1475
  SMqAskEpReq* req = taosMemoryCalloc(1, tlen);
L
Liu Jicong 已提交
1476
  if (req == NULL) {
L
Liu Jicong 已提交
1477
    tscError("failed to malloc get subscribe ep buf");
L
Liu Jicong 已提交
1478
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1479
    return -1;
L
Liu Jicong 已提交
1480
  }
L
Liu Jicong 已提交
1481 1482 1483
  req->consumerId = htobe64(tmq->consumerId);
  req->epoch = htonl(tmq->epoch);
  strcpy(req->cgroup, tmq->groupId);
1484

L
Liu Jicong 已提交
1485
  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
1486 1487
  if (pParam == NULL) {
    tscError("failed to malloc subscribe param");
wafwerar's avatar
wafwerar 已提交
1488
    taosMemoryFree(req);
L
Liu Jicong 已提交
1489
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1490
    return -1;
L
Liu Jicong 已提交
1491 1492
  }
  pParam->tmq = tmq;
L
Liu Jicong 已提交
1493
  pParam->async = async;
X
Xiaoyu Wang 已提交
1494
  tsem_init(&pParam->rspSem, 0, 0);
1495

1496
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1497 1498
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1499 1500
    taosMemoryFree(pParam);
    taosMemoryFree(req);
L
Liu Jicong 已提交
1501
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1502 1503 1504 1505 1506 1507 1508 1509 1510 1511
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
      .pData = req,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
L
Liu Jicong 已提交
1512 1513 1514
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1515
  sendInfo->msgType = TDMT_MND_MQ_ASK_EP;
1516

L
Liu Jicong 已提交
1517 1518
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

S
Shengliang Guan 已提交
1519
  tscDebug("consumer:%" PRId64 ", ask ep", tmq->consumerId);
L
add log  
Liu Jicong 已提交
1520

L
Liu Jicong 已提交
1521 1522
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1523

L
Liu Jicong 已提交
1524
  if (!async) {
L
Liu Jicong 已提交
1525 1526 1527 1528 1529
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }
  return code;
1530 1531
}

L
Liu Jicong 已提交
1532 1533
#if 0
int32_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
L
Liu Jicong 已提交
1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546
  const SMqOffset* pOffset = &offset->offset;
  if (strcmp(pOffset->cgroup, tmq->groupId) != 0) {
    return TMQ_RESP_ERR__FAIL;
  }
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* clientTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(clientTopic->topicName, pOffset->topicName) == 0) {
      int32_t vgSz = taosArrayGetSize(clientTopic->vgs);
      for (int32_t j = 0; j < vgSz; j++) {
        SMqClientVg* pVg = taosArrayGet(clientTopic->vgs, j);
        if (pVg->vgId == pOffset->vgId) {
          pVg->currentOffset = pOffset->offset;
L
Liu Jicong 已提交
1547
          tmqClearUnhandleMsg(tmq);
L
Liu Jicong 已提交
1548 1549 1550 1551 1552 1553 1554
          return TMQ_RESP_ERR__SUCCESS;
        }
      }
    }
  }
  return TMQ_RESP_ERR__FAIL;
}
L
Liu Jicong 已提交
1555
#endif
L
Liu Jicong 已提交
1556

1557
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1558 1559 1560 1561 1562 1563 1564 1565 1566 1567
  /*int64_t reqOffset;*/
  /*if (pVg->currentOffset >= 0) {*/
  /*reqOffset = pVg->currentOffset;*/
  /*} else {*/
  /*if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {*/
  /*tscError("unable to poll since no committed offset but reset offset is set to none");*/
  /*return NULL;*/
  /*}*/
  /*reqOffset = tmq->resetOffsetCfg;*/
  /*}*/
L
Liu Jicong 已提交
1568

L
Liu Jicong 已提交
1569
  SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq));
1570 1571 1572
  if (pReq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
1573

L
Liu Jicong 已提交
1574 1575 1576
  /*strcpy(pReq->topic, pTopic->topicName);*/
  /*strcpy(pReq->cgroup, tmq->groupId);*/

L
Liu Jicong 已提交
1577 1578 1579 1580
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1581

1582
  pReq->withTbName = tmq->withTbName;
1583
  pReq->timeout = timeout;
L
Liu Jicong 已提交
1584
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1585
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1586 1587
  /*pReq->currentOffset = reqOffset;*/
  pReq->reqOffset = pVg->currentOffsetNew;
L
Liu Jicong 已提交
1588
  pReq->reqId = generateRequestId();
1589

L
Liu Jicong 已提交
1590 1591
  pReq->useSnapshot = tmq->useSnapshot;

1592
  pReq->head.vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
1593
  pReq->head.contLen = htonl(sizeof(SMqPollReq));
1594 1595 1596
  return pReq;
}

L
Liu Jicong 已提交
1597 1598
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
L
Liu Jicong 已提交
1599
  pRspObj->resType = RES_TYPE__TMQ_META;
L
Liu Jicong 已提交
1600 1601 1602 1603 1604 1605 1606 1607
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

L
Liu Jicong 已提交
1608 1609 1610
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1611 1612
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1613
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1614
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1615
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
L
Liu Jicong 已提交
1616

L
Liu Jicong 已提交
1617 1618
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
L
Liu Jicong 已提交
1619
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1620 1621
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1622

L
Liu Jicong 已提交
1623
  return pRspObj;
X
Xiaoyu Wang 已提交
1624 1625
}

1626
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1627
  /*tscDebug("call poll");*/
X
Xiaoyu Wang 已提交
1628 1629 1630 1631 1632 1633
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
      if (vgStatus != TMQ_VG_STATUS__IDLE) {
L
Liu Jicong 已提交
1634
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
L
Liu Jicong 已提交
1635 1636
        tscTrace("consumer:%" PRId64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId,
                 vgSkipCnt);
X
Xiaoyu Wang 已提交
1637
        continue;
L
Liu Jicong 已提交
1638
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1639 1640 1641 1642
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
S
Shengliang Guan 已提交
1643
        tscDebug("consumer:%" PRId64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId);
L
temp  
Liu Jicong 已提交
1644 1645
        }
#endif
X
Xiaoyu Wang 已提交
1646
      }
L
Liu Jicong 已提交
1647
      atomic_store_32(&pVg->vgSkipCnt, 0);
1648
      SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, timeout, pTopic, pVg);
X
Xiaoyu Wang 已提交
1649 1650
      if (pReq == NULL) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1651
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1652 1653
        return -1;
      }
wafwerar's avatar
wafwerar 已提交
1654
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1655
      if (pParam == NULL) {
wafwerar's avatar
wafwerar 已提交
1656
        taosMemoryFree(pReq);
X
Xiaoyu Wang 已提交
1657
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1658
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1659 1660
        return -1;
      }
L
Liu Jicong 已提交
1661 1662
      pParam->tmq = tmq;
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1663
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1664
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1665 1666
      pParam->epoch = tmq->epoch;

1667
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1668
      if (sendInfo == NULL) {
wafwerar's avatar
wafwerar 已提交
1669 1670
        taosMemoryFree(pReq);
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1671
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1672
        tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1673 1674 1675 1676
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1677
          .pData = pReq,
L
Liu Jicong 已提交
1678
          .len = sizeof(SMqPollReq),
X
Xiaoyu Wang 已提交
1679 1680
          .handle = NULL,
      };
L
Liu Jicong 已提交
1681
      sendInfo->requestId = pReq->reqId;
X
Xiaoyu Wang 已提交
1682
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1683
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1684
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1685
      sendInfo->msgType = TDMT_VND_CONSUME;
X
Xiaoyu Wang 已提交
1686 1687

      int64_t transporterId = 0;
L
fix  
Liu Jicong 已提交
1688
      /*printf("send poll\n");*/
L
Liu Jicong 已提交
1689

1690 1691
      char offsetFormatBuf[80];
      tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffsetNew);
L
Liu Jicong 已提交
1692 1693
      tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64,
               tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId);
S
Shengliang Guan 已提交
1694
      /*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/
X
Xiaoyu Wang 已提交
1695 1696 1697 1698 1699 1700 1701 1702
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
  return 0;
}

L
Liu Jicong 已提交
1703 1704
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1705
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1706 1707
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1708
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
L
Liu Jicong 已提交
1709
      tmqUpdateEp2(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1710
      /*tmqClearUnhandleMsg(tmq);*/
X
Xiaoyu Wang 已提交
1711 1712 1713 1714 1715 1716 1717 1718 1719 1720
      *pReset = true;
    } else {
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
1721
void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
X
Xiaoyu Wang 已提交
1722
  while (1) {
L
Liu Jicong 已提交
1723 1724 1725
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1726
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1727 1728
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
      if (rspWrapper == NULL) return NULL;
X
Xiaoyu Wang 已提交
1729 1730
    }

L
Liu Jicong 已提交
1731 1732 1733 1734 1735
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(rspWrapper);
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
      return NULL;
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1736
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1737
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
1738
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1739
      if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1740
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1741
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1742
         * rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1743
        pVg->currentOffsetNew = pollRspWrapper->dataRsp.rspOffset;
X
Xiaoyu Wang 已提交
1744
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1745
        if (pollRspWrapper->dataRsp.blockNum == 0) {
L
Liu Jicong 已提交
1746 1747
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1748 1749
          continue;
        }
L
Liu Jicong 已提交
1750
        // build rsp
L
Liu Jicong 已提交
1751
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1752
        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1753
        return pRsp;
X
Xiaoyu Wang 已提交
1754
      } else {
L
Liu Jicong 已提交
1755 1756 1757 1758 1759 1760 1761
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
                 pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
        taosFreeQitem(pollRspWrapper);
      }
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1762
      if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1763
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1764
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1765
         * rspMsg->msg.rspOffset);*/
wmmhello's avatar
wmmhello 已提交
1766 1767
        pVg->currentOffsetNew.version = pollRspWrapper->metaRsp.rspOffset;
        pVg->currentOffsetNew.type = TMQ_OFFSET__LOG;
L
Liu Jicong 已提交
1768 1769
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // build rsp
L
Liu Jicong 已提交
1770
        SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1771 1772 1773 1774
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
L
Liu Jicong 已提交
1775
                 pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1776
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1777 1778
      }
    } else {
L
fix  
Liu Jicong 已提交
1779
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1780
      bool reset = false;
L
Liu Jicong 已提交
1781 1782
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1783
      if (pollIfReset && reset) {
S
Shengliang Guan 已提交
1784
        tscDebug("consumer:%" PRId64 ", reset and repoll", tmq->consumerId);
1785
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
1786 1787 1788 1789 1790
      }
    }
  }
}

1791
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1792
  /*tscDebug("call poll1");*/
L
Liu Jicong 已提交
1793 1794
  void*   rspObj;
  int64_t startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1795

1796 1797 1798
#if 0
  tmqHandleAllDelayedTask(tmq);
  tmqPollImpl(tmq, timeout);
1799
  rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1800 1801
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1802
  }
1803
#endif
X
Xiaoyu Wang 已提交
1804

L
Liu Jicong 已提交
1805
  // in no topic status also need process delayed task
L
Liu Jicong 已提交
1806
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1807 1808 1809
    return NULL;
  }

X
Xiaoyu Wang 已提交
1810
  while (1) {
L
Liu Jicong 已提交
1811
    tmqHandleAllDelayedTask(tmq);
1812
    if (tmqPollImpl(tmq, timeout) < 0) return NULL;
L
Liu Jicong 已提交
1813

1814
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1815 1816
    if (rspObj) {
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
1817 1818
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      return NULL;
X
Xiaoyu Wang 已提交
1819
    }
1820
    if (timeout != -1) {
X
Xiaoyu Wang 已提交
1821
      int64_t endTime = taosGetTimestampMs();
1822
      int64_t leftTime = endTime - startTime;
1823
      if (leftTime > timeout) {
S
Shengliang Guan 已提交
1824
        tscDebug("consumer:%" PRId64 ", (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
X
Xiaoyu Wang 已提交
1825 1826
        return NULL;
      }
1827
      tsem_timewait(&tmq->rspSem, leftTime * 1000);
L
Liu Jicong 已提交
1828 1829 1830
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
      tsem_timewait(&tmq->rspSem, 500 * 1000);
X
Xiaoyu Wang 已提交
1831 1832 1833 1834
    }
  }
}

L
Liu Jicong 已提交
1835
int32_t tmq_consumer_close(tmq_t* tmq) {
1836
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
L
Liu Jicong 已提交
1837 1838
    int32_t rsp = tmq_commit_sync(tmq, NULL);
    if (rsp != 0) {
L
Liu Jicong 已提交
1839
      return rsp;
1840 1841 1842 1843
    }

    tmq_list_t* lst = tmq_list_new();
    rsp = tmq_subscribe(tmq, lst);
1844
    tmq_list_destroy(lst);
1845

L
Liu Jicong 已提交
1846
    if (rsp != 0) {
L
Liu Jicong 已提交
1847
      return rsp;
1848
    }
L
Liu Jicong 已提交
1849
  }
1850
  // TODO: free resources
L
Liu Jicong 已提交
1851
  return 0;
1852
}
L
Liu Jicong 已提交
1853

L
Liu Jicong 已提交
1854 1855
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
1856
    return "success";
L
Liu Jicong 已提交
1857
  } else if (err == -1) {
L
Liu Jicong 已提交
1858 1859 1860
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
1861 1862
  }
}
L
Liu Jicong 已提交
1863

L
Liu Jicong 已提交
1864 1865 1866 1867
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
wmmhello's avatar
wmmhello 已提交
1868 1869 1870 1871
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DELETE) {
      return TMQ_RES_DATA;
    }
L
Liu Jicong 已提交
1872 1873 1874 1875 1876 1877
    return TMQ_RES_TABLE_META;
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
1878
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
1879 1880
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
1881
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1882 1883 1884
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1885 1886 1887 1888 1889
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1890 1891 1892 1893
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1894 1895 1896
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1897 1898 1899 1900 1901
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1902 1903 1904 1905
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
1906 1907 1908
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
L
Liu Jicong 已提交
1909 1910 1911 1912
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
1913 1914 1915 1916 1917 1918 1919 1920

const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
1921
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
L
Liu Jicong 已提交
1922 1923 1924
  }
  return NULL;
}
1925

1926 1927
static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id,
                                  int8_t t) {
wmmhello's avatar
wmmhello 已提交
1928 1929 1930 1931 1932 1933 1934
  char*  string = NULL;
  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    return string;
  }
  cJSON* type = cJSON_CreateString("create");
  cJSON_AddItemToObject(json, "type", type);
wmmhello's avatar
wmmhello 已提交
1935

1936 1937 1938 1939
  //  char uid[32] = {0};
  //  sprintf(uid, "%"PRIi64, id);
  //  cJSON* id_ = cJSON_CreateString(uid);
  //  cJSON_AddItemToObject(json, "id", id_);
wmmhello's avatar
wmmhello 已提交
1940 1941 1942 1943
  cJSON* tableName = cJSON_CreateString(name);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super");
  cJSON_AddItemToObject(json, "tableType", tableType);
1944 1945
  //  cJSON* version = cJSON_CreateNumber(1);
  //  cJSON_AddItemToObject(json, "version", version);
wmmhello's avatar
wmmhello 已提交
1946 1947

  cJSON* columns = cJSON_CreateArray();
1948 1949 1950 1951
  for (int i = 0; i < schemaRow->nCols; i++) {
    cJSON*   column = cJSON_CreateObject();
    SSchema* s = schemaRow->pSchema + i;
    cJSON*   cname = cJSON_CreateString(s->name);
wmmhello's avatar
wmmhello 已提交
1952 1953 1954
    cJSON_AddItemToObject(column, "name", cname);
    cJSON* ctype = cJSON_CreateNumber(s->type);
    cJSON_AddItemToObject(column, "type", ctype);
1955
    if (s->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
1956
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
1957
      cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1958
      cJSON_AddItemToObject(column, "length", cbytes);
1959 1960 1961
    } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
      cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1962 1963
      cJSON_AddItemToObject(column, "length", cbytes);
    }
wmmhello's avatar
wmmhello 已提交
1964 1965 1966 1967 1968
    cJSON_AddItemToArray(columns, column);
  }
  cJSON_AddItemToObject(json, "columns", columns);

  cJSON* tags = cJSON_CreateArray();
1969 1970 1971 1972
  for (int i = 0; schemaTag && i < schemaTag->nCols; i++) {
    cJSON*   tag = cJSON_CreateObject();
    SSchema* s = schemaTag->pSchema + i;
    cJSON*   tname = cJSON_CreateString(s->name);
wmmhello's avatar
wmmhello 已提交
1973 1974 1975
    cJSON_AddItemToObject(tag, "name", tname);
    cJSON* ttype = cJSON_CreateNumber(s->type);
    cJSON_AddItemToObject(tag, "type", ttype);
1976
    if (s->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
1977
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
1978
      cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1979
      cJSON_AddItemToObject(tag, "length", cbytes);
1980 1981 1982
    } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
      cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
1983 1984
      cJSON_AddItemToObject(tag, "length", cbytes);
    }
wmmhello's avatar
wmmhello 已提交
1985 1986 1987 1988 1989 1990 1991 1992 1993
    cJSON_AddItemToArray(tags, tag);
  }
  cJSON_AddItemToObject(json, "tags", tags);

  string = cJSON_PrintUnformatted(json);
  cJSON_Delete(json);
  return string;
}

1994 1995 1996 1997
static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) {
  SMAlterStbReq req = {0};
  cJSON*        json = NULL;
  char*         string = NULL;
wmmhello's avatar
wmmhello 已提交
1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008

  if (tDeserializeSMAlterStbReq(alterData, alterDataLen, &req) != 0) {
    goto end;
  }

  json = cJSON_CreateObject();
  if (json == NULL) {
    goto end;
  }
  cJSON* type = cJSON_CreateString("alter");
  cJSON_AddItemToObject(json, "type", type);
2009 2010
  //  cJSON* uid = cJSON_CreateNumber(id);
  //  cJSON_AddItemToObject(json, "uid", uid);
wmmhello's avatar
wmmhello 已提交
2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022
  SName name = {0};
  tNameFromString(&name, req.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
  cJSON* tableName = cJSON_CreateString(name.tname);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString("super");
  cJSON_AddItemToObject(json, "tableType", tableType);

  cJSON* alterType = cJSON_CreateNumber(req.alterType);
  cJSON_AddItemToObject(json, "alterType", alterType);
  switch (req.alterType) {
    case TSDB_ALTER_TABLE_ADD_TAG:
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
2023 2024
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
      cJSON*      colName = cJSON_CreateString(field->name);
wmmhello's avatar
wmmhello 已提交
2025 2026 2027 2028
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colType = cJSON_CreateNumber(field->type);
      cJSON_AddItemToObject(json, "colType", colType);

2029
      if (field->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
2030
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
2031
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2032
        cJSON_AddItemToObject(json, "colLength", cbytes);
2033 2034 2035
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2036 2037 2038 2039 2040
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
      break;
    }
    case TSDB_ALTER_TABLE_DROP_TAG:
2041 2042 2043
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
      cJSON*      colName = cJSON_CreateString(field->name);
wmmhello's avatar
wmmhello 已提交
2044 2045 2046 2047
      cJSON_AddItemToObject(json, "colName", colName);
      break;
    }
    case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
2048 2049 2050
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
      cJSON*      colName = cJSON_CreateString(field->name);
wmmhello's avatar
wmmhello 已提交
2051 2052 2053
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colType = cJSON_CreateNumber(field->type);
      cJSON_AddItemToObject(json, "colType", colType);
2054
      if (field->type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
2055
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
2056
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2057
        cJSON_AddItemToObject(json, "colLength", cbytes);
2058 2059 2060
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2061 2062 2063 2064 2065
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
      break;
    }
    case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
2066 2067 2068 2069
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
      TAOS_FIELD* oldField = taosArrayGet(req.pFields, 0);
      TAOS_FIELD* newField = taosArrayGet(req.pFields, 1);
      cJSON*      colName = cJSON_CreateString(oldField->name);
wmmhello's avatar
wmmhello 已提交
2070 2071 2072 2073 2074 2075 2076 2077 2078 2079
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colNewName = cJSON_CreateString(newField->name);
      cJSON_AddItemToObject(json, "colNewName", colNewName);
      break;
    }
    default:
      break;
  }
  string = cJSON_PrintUnformatted(json);

2080
end:
wmmhello's avatar
wmmhello 已提交
2081 2082 2083 2084 2085
  cJSON_Delete(json);
  tFreeSMAltertbReq(&req);
  return string;
}

2086
static char* processCreateStb(SMqMetaRsp* metaRsp) {
wmmhello's avatar
wmmhello 已提交
2087 2088
  SVCreateStbReq req = {0};
  SDecoder       coder;
2089
  char*          string = NULL;
wmmhello's avatar
wmmhello 已提交
2090 2091

  // decode and process req
2092
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2093 2094 2095 2096 2097 2098 2099 2100 2101 2102
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
    goto _err;
  }
  string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE);
  tDecoderClear(&coder);
  return string;

2103
_err:
wmmhello's avatar
wmmhello 已提交
2104 2105 2106 2107
  tDecoderClear(&coder);
  return string;
}

2108
static char* processAlterStb(SMqMetaRsp* metaRsp) {
wmmhello's avatar
wmmhello 已提交
2109 2110
  SVCreateStbReq req = {0};
  SDecoder       coder;
2111
  char*          string = NULL;
wmmhello's avatar
wmmhello 已提交
2112 2113

  // decode and process req
2114
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2115 2116 2117 2118 2119 2120 2121 2122 2123 2124
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
    goto _err;
  }
  string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen);
  tDecoderClear(&coder);
  return string;

2125
_err:
wmmhello's avatar
wmmhello 已提交
2126 2127 2128 2129
  tDecoderClear(&coder);
  return string;
}

2130
static char* buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray* tagName, int64_t id, uint8_t tagNum) {
2131
  char*   string = NULL;
wmmhello's avatar
wmmhello 已提交
2132
  SArray* pTagVals = NULL;
2133
  cJSON*  json = cJSON_CreateObject();
wmmhello's avatar
wmmhello 已提交
2134 2135 2136 2137 2138
  if (json == NULL) {
    return string;
  }
  cJSON* type = cJSON_CreateString("create");
  cJSON_AddItemToObject(json, "type", type);
2139 2140 2141 2142
  //  char cid[32] = {0};
  //  sprintf(cid, "%"PRIi64, id);
  //  cJSON* cid_ = cJSON_CreateString(cid);
  //  cJSON_AddItemToObject(json, "id", cid_);
wmmhello's avatar
wmmhello 已提交
2143

wmmhello's avatar
wmmhello 已提交
2144 2145 2146 2147
  cJSON* tableName = cJSON_CreateString(name);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString("child");
  cJSON_AddItemToObject(json, "tableType", tableType);
wmmhello's avatar
wmmhello 已提交
2148
  cJSON* using = cJSON_CreateString(sname);
wmmhello's avatar
wmmhello 已提交
2149
  cJSON_AddItemToObject(json, "using", using);
2150 2151
  cJSON* tagNumJson = cJSON_CreateNumber(tagNum);
  cJSON_AddItemToObject(json, "tagNum", tagNumJson);
2152 2153
  //  cJSON* version = cJSON_CreateNumber(1);
  //  cJSON_AddItemToObject(json, "version", version);
wmmhello's avatar
wmmhello 已提交
2154

2155
  cJSON*  tags = cJSON_CreateArray();
wmmhello's avatar
wmmhello 已提交
2156 2157 2158 2159
  int32_t code = tTagToValArray(pTag, &pTagVals);
  if (code) {
    goto end;
  }
wmmhello's avatar
wmmhello 已提交
2160

wmmhello's avatar
wmmhello 已提交
2161 2162
  if (tTagIsJson(pTag)) {
    STag* p = (STag*)pTag;
2163
    if (p->nTag == 0) {
wmmhello's avatar
wmmhello 已提交
2164 2165
      goto end;
    }
2166 2167
    char*    pJson = parseTagDatatoJson(pTag);
    cJSON*   tag = cJSON_CreateObject();
wmmhello's avatar
wmmhello 已提交
2168 2169
    STagVal* pTagVal = taosArrayGet(pTagVals, 0);

wmmhello's avatar
wmmhello 已提交
2170 2171
    char*  ptname = taosArrayGet(tagName, 0);
    cJSON* tname = cJSON_CreateString(ptname);
wmmhello's avatar
wmmhello 已提交
2172
    cJSON_AddItemToObject(tag, "name", tname);
2173 2174
    //    cJSON* cid_ = cJSON_CreateString("");
    //    cJSON_AddItemToObject(tag, "cid", cid_);
wmmhello's avatar
wmmhello 已提交
2175 2176
    cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON);
    cJSON_AddItemToObject(tag, "type", ttype);
wmmhello's avatar
wmmhello 已提交
2177
    cJSON* tvalue = cJSON_CreateString(pJson);
wmmhello's avatar
wmmhello 已提交
2178 2179
    cJSON_AddItemToObject(tag, "value", tvalue);
    cJSON_AddItemToArray(tags, tag);
wmmhello's avatar
wmmhello 已提交
2180
    taosMemoryFree(pJson);
wmmhello's avatar
wmmhello 已提交
2181 2182 2183
    goto end;
  }

2184
  for (int i = 0; i < taosArrayGetSize(pTagVals); i++) {
wmmhello's avatar
wmmhello 已提交
2185 2186 2187
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);

    cJSON* tag = cJSON_CreateObject();
wmmhello's avatar
wmmhello 已提交
2188

wmmhello's avatar
wmmhello 已提交
2189 2190
    char*  ptname = taosArrayGet(tagName, i);
    cJSON* tname = cJSON_CreateString(ptname);
wmmhello's avatar
wmmhello 已提交
2191
    cJSON_AddItemToObject(tag, "name", tname);
2192 2193
    //    cJSON* cid = cJSON_CreateNumber(pTagVal->cid);
    //    cJSON_AddItemToObject(tag, "cid", cid);
wmmhello's avatar
wmmhello 已提交
2194 2195
    cJSON* ttype = cJSON_CreateNumber(pTagVal->type);
    cJSON_AddItemToObject(tag, "type", ttype);
wmmhello's avatar
wmmhello 已提交
2196

wmmhello's avatar
wmmhello 已提交
2197
    cJSON* tvalue = NULL;
wmmhello's avatar
wmmhello 已提交
2198
    if (IS_VAR_DATA_TYPE(pTagVal->type)) {
wmmhello's avatar
wmmhello 已提交
2199
      char* buf = taosMemoryCalloc(pTagVal->nData + 3, 1);
L
Liu Jicong 已提交
2200
      if (!buf) goto end;
wmmhello's avatar
wmmhello 已提交
2201
      dataConverToStr(buf, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL);
wmmhello's avatar
wmmhello 已提交
2202 2203
      tvalue = cJSON_CreateString(buf);
      taosMemoryFree(buf);
wmmhello's avatar
wmmhello 已提交
2204
    } else {
wmmhello's avatar
wmmhello 已提交
2205 2206 2207
      double val = 0;
      GET_TYPED_DATA(val, double, pTagVal->type, &pTagVal->i64);
      tvalue = cJSON_CreateNumber(val);
wmmhello's avatar
wmmhello 已提交
2208 2209
    }

wmmhello's avatar
wmmhello 已提交
2210 2211 2212
    cJSON_AddItemToObject(tag, "value", tvalue);
    cJSON_AddItemToArray(tags, tag);
  }
wmmhello's avatar
wmmhello 已提交
2213

2214
end:
wmmhello's avatar
wmmhello 已提交
2215 2216 2217
  cJSON_AddItemToObject(json, "tags", tags);
  string = cJSON_PrintUnformatted(json);
  cJSON_Delete(json);
wmmhello's avatar
wmmhello 已提交
2218
  taosArrayDestroy(pTagVals);
wmmhello's avatar
wmmhello 已提交
2219 2220 2221
  return string;
}

2222
static char* processCreateTable(SMqMetaRsp* metaRsp) {
wmmhello's avatar
wmmhello 已提交
2223 2224
  SDecoder           decoder = {0};
  SVCreateTbBatchReq req = {0};
2225 2226
  SVCreateTbReq*     pCreateReq;
  char*              string = NULL;
wmmhello's avatar
wmmhello 已提交
2227
  // decode
2228
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2229 2230 2231 2232 2233 2234 2235 2236 2237
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
    goto _exit;
  }

  // loop to create table
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;
2238 2239
    if (pCreateReq->type == TSDB_CHILD_TABLE) {
      string = buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.name, pCreateReq->name,
2240
                                     pCreateReq->ctb.tagName, pCreateReq->uid, pCreateReq->ctb.tagNum);
2241 2242 2243
    } else if (pCreateReq->type == TSDB_NORMAL_TABLE) {
      string =
          buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE);
wmmhello's avatar
wmmhello 已提交
2244 2245 2246 2247 2248
    }
  }

  tDecoderClear(&decoder);

2249
_exit:
wmmhello's avatar
wmmhello 已提交
2250 2251 2252 2253
  tDecoderClear(&decoder);
  return string;
}

2254 2255 2256 2257
static char* processAlterTable(SMqMetaRsp* metaRsp) {
  SDecoder     decoder = {0};
  SVAlterTbReq vAlterTbReq = {0};
  char*        string = NULL;
wmmhello's avatar
wmmhello 已提交
2258 2259

  // decode
2260
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVAlterTbReq(&decoder, &vAlterTbReq) < 0) {
    goto _exit;
  }

  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    goto _exit;
  }
  cJSON* type = cJSON_CreateString("alter");
  cJSON_AddItemToObject(json, "type", type);
2273 2274
  //  cJSON* uid = cJSON_CreateNumber(id);
  //  cJSON_AddItemToObject(json, "uid", uid);
wmmhello's avatar
wmmhello 已提交
2275 2276
  cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName);
  cJSON_AddItemToObject(json, "tableName", tableName);
wmmhello's avatar
wmmhello 已提交
2277
  cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ? "child" : "normal");
wmmhello's avatar
wmmhello 已提交
2278
  cJSON_AddItemToObject(json, "tableType", tableType);
wmmhello's avatar
wmmhello 已提交
2279 2280
  cJSON* alterType = cJSON_CreateNumber(vAlterTbReq.action);
  cJSON_AddItemToObject(json, "alterType", alterType);
wmmhello's avatar
wmmhello 已提交
2281 2282 2283 2284 2285 2286 2287

  switch (vAlterTbReq.action) {
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
      cJSON_AddItemToObject(json, "colType", colType);
wmmhello's avatar
wmmhello 已提交
2288

2289
      if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
2290
        int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
2291
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2292
        cJSON_AddItemToObject(json, "colLength", cbytes);
2293 2294 2295
      } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
        int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2296 2297
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
wmmhello's avatar
wmmhello 已提交
2298 2299
      break;
    }
2300
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
wmmhello's avatar
wmmhello 已提交
2301 2302 2303 2304
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
      break;
    }
2305
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
wmmhello's avatar
wmmhello 已提交
2306 2307
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
wmmhello's avatar
wmmhello 已提交
2308
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.colModType);
wmmhello's avatar
wmmhello 已提交
2309
      cJSON_AddItemToObject(json, "colType", colType);
2310
      if (vAlterTbReq.colModType == TSDB_DATA_TYPE_BINARY) {
wmmhello's avatar
wmmhello 已提交
2311
        int32_t length = vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE;
2312
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2313
        cJSON_AddItemToObject(json, "colLength", cbytes);
2314 2315 2316
      } else if (vAlterTbReq.colModType == TSDB_DATA_TYPE_NCHAR) {
        int32_t length = (vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        cJSON*  cbytes = cJSON_CreateNumber(length);
wmmhello's avatar
wmmhello 已提交
2317 2318
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
wmmhello's avatar
wmmhello 已提交
2319 2320
      break;
    }
2321
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
wmmhello's avatar
wmmhello 已提交
2322 2323 2324 2325 2326 2327
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colNewName = cJSON_CreateString(vAlterTbReq.colNewName);
      cJSON_AddItemToObject(json, "colNewName", colNewName);
      break;
    }
2328
    case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: {
wmmhello's avatar
wmmhello 已提交
2329 2330
      cJSON* tagName = cJSON_CreateString(vAlterTbReq.tagName);
      cJSON_AddItemToObject(json, "colName", tagName);
wmmhello's avatar
wmmhello 已提交
2331

wmmhello's avatar
wmmhello 已提交
2332
      bool isNull = vAlterTbReq.isNull;
2333 2334 2335
      if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
        STag* jsonTag = (STag*)vAlterTbReq.pTagVal;
        if (jsonTag->nTag == 0) isNull = true;
wmmhello's avatar
wmmhello 已提交
2336
      }
2337
      if (!isNull) {
wmmhello's avatar
wmmhello 已提交
2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352
        char* buf = NULL;

        if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
          ASSERT(tTagIsJson(vAlterTbReq.pTagVal) == true);
          buf = parseTagDatatoJson(vAlterTbReq.pTagVal);
        } else {
          buf = taosMemoryCalloc(vAlterTbReq.nTagVal + 1, 1);
          dataConverToStr(buf, vAlterTbReq.tagType, vAlterTbReq.pTagVal, vAlterTbReq.nTagVal, NULL);
        }

        cJSON* colValue = cJSON_CreateString(buf);
        cJSON_AddItemToObject(json, "colValue", colValue);
        taosMemoryFree(buf);
      }

wmmhello's avatar
wmmhello 已提交
2353 2354
      cJSON* isNullCJson = cJSON_CreateBool(isNull);
      cJSON_AddItemToObject(json, "colValueNull", isNullCJson);
wmmhello's avatar
wmmhello 已提交
2355 2356
      break;
    }
wmmhello's avatar
wmmhello 已提交
2357 2358 2359 2360 2361
    default:
      break;
  }
  string = cJSON_PrintUnformatted(json);

2362
_exit:
wmmhello's avatar
wmmhello 已提交
2363 2364 2365 2366
  tDecoderClear(&decoder);
  return string;
}

2367 2368 2369 2370
static char* processDropSTable(SMqMetaRsp* metaRsp) {
  SDecoder     decoder = {0};
  SVDropStbReq req = {0};
  char*        string = NULL;
wmmhello's avatar
wmmhello 已提交
2371 2372

  // decode
2373
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
    goto _exit;
  }

  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    goto _exit;
  }
  cJSON* type = cJSON_CreateString("drop");
  cJSON_AddItemToObject(json, "type", type);
  cJSON* tableName = cJSON_CreateString(req.name);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString("super");
  cJSON_AddItemToObject(json, "tableType", tableType);

  string = cJSON_PrintUnformatted(json);

2393
_exit:
wmmhello's avatar
wmmhello 已提交
2394 2395 2396 2397
  tDecoderClear(&decoder);
  return string;
}

2398 2399 2400 2401
static char* processDropTable(SMqMetaRsp* metaRsp) {
  SDecoder         decoder = {0};
  SVDropTbBatchReq req = {0};
  char*            string = NULL;
wmmhello's avatar
wmmhello 已提交
2402 2403

  // decode
2404
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVDropTbBatchReq(&decoder, &req) < 0) {
    goto _exit;
  }

  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    goto _exit;
  }
  cJSON* type = cJSON_CreateString("drop");
  cJSON_AddItemToObject(json, "type", type);
2417 2418 2419 2420
  //  cJSON* uid = cJSON_CreateNumber(id);
  //  cJSON_AddItemToObject(json, "uid", uid);
  //  cJSON* tableType = cJSON_CreateString("normal");
  //  cJSON_AddItemToObject(json, "tableType", tableType);
wmmhello's avatar
wmmhello 已提交
2421 2422 2423 2424 2425

  cJSON* tableNameList = cJSON_CreateArray();
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;

wmmhello's avatar
wmmhello 已提交
2426
    cJSON* tableName = cJSON_CreateString(pDropTbReq->name);
wmmhello's avatar
wmmhello 已提交
2427 2428 2429 2430 2431 2432
    cJSON_AddItemToArray(tableNameList, tableName);
  }
  cJSON_AddItemToObject(json, "tableNameList", tableNameList);

  string = cJSON_PrintUnformatted(json);

2433
_exit:
wmmhello's avatar
wmmhello 已提交
2434 2435 2436 2437
  tDecoderClear(&decoder);
  return string;
}

2438
static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
wmmhello's avatar
wmmhello 已提交
2439 2440 2441
  SVCreateStbReq req = {0};
  SDecoder       coder;
  SMCreateStbReq pReq = {0};
2442 2443
  int32_t        code = TSDB_CODE_SUCCESS;
  SRequestObj*   pRequest = NULL;
wmmhello's avatar
wmmhello 已提交
2444

2445
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2446 2447 2448 2449
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2450
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2451 2452 2453 2454
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2455
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2456 2457 2458 2459 2460 2461 2462 2463
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }
  // build create stable
  pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SField));
2464
  for (int32_t i = 0; i < req.schemaRow.nCols; i++) {
wmmhello's avatar
wmmhello 已提交
2465
    SSchema* pSchema = req.schemaRow.pSchema + i;
2466
    SField   field = {.type = pSchema->type, .bytes = pSchema->bytes};
wmmhello's avatar
wmmhello 已提交
2467 2468 2469 2470
    strcpy(field.name, pSchema->name);
    taosArrayPush(pReq.pColumns, &field);
  }
  pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField));
2471
  for (int32_t i = 0; i < req.schemaTag.nCols; i++) {
wmmhello's avatar
wmmhello 已提交
2472
    SSchema* pSchema = req.schemaTag.pSchema + i;
2473
    SField   field = {.type = pSchema->type, .bytes = pSchema->bytes};
wmmhello's avatar
wmmhello 已提交
2474 2475 2476
    strcpy(field.name, pSchema->name);
    taosArrayPush(pReq.pTags, &field);
  }
2477

2478 2479
  pReq.colVer = req.schemaRow.version;
  pReq.tagVer = req.schemaTag.version;
wmmhello's avatar
wmmhello 已提交
2480 2481 2482 2483
  pReq.numOfColumns = req.schemaRow.nCols;
  pReq.numOfTags = req.schemaTag.nCols;
  pReq.commentLen = -1;
  pReq.suid = req.suid;
wmmhello's avatar
wmmhello 已提交
2484
  pReq.source = TD_REQ_FROM_TAOX;
wmmhello's avatar
wmmhello 已提交
2485
  pReq.igExists = true;
wmmhello's avatar
wmmhello 已提交
2486

2487
  STscObj* pTscObj = pRequest->pTscObj;
2488
  SName    tableName;
wmmhello's avatar
wmmhello 已提交
2489
  tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
wmmhello's avatar
wmmhello 已提交
2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501

  SCmdMsgInfo pCmdMsg = {0};
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
  if (NULL == pCmdMsg.pMsg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);

2502
  SQuery pQuery = {0};
wmmhello's avatar
wmmhello 已提交
2503 2504 2505 2506 2507 2508
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
  pQuery.pCmdMsg = &pCmdMsg;
  pQuery.msgType = pQuery.pCmdMsg->msgType;
  pQuery.stableQuery = true;

  launchQueryImpl(pRequest, &pQuery, true, NULL);
wmmhello's avatar
wmmhello 已提交
2509

L
Liu Jicong 已提交
2510 2511
  if (pRequest->code == TSDB_CODE_SUCCESS) {
    SCatalog* pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2512 2513 2514 2515
    catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
    catalogRemoveTableMeta(pCatalog, &tableName);
  }

wmmhello's avatar
wmmhello 已提交
2516 2517 2518
  code = pRequest->code;
  taosMemoryFree(pCmdMsg.pMsg);

2519
end:
wmmhello's avatar
wmmhello 已提交
2520 2521 2522 2523 2524 2525
  destroyRequest(pRequest);
  tFreeSMCreateStbReq(&pReq);
  tDecoderClear(&coder);
  return code;
}

2526
static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
wmmhello's avatar
wmmhello 已提交
2527 2528 2529
  SVDropStbReq req = {0};
  SDecoder     coder;
  SMDropStbReq pReq = {0};
2530
  int32_t      code = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2531 2532
  SRequestObj* pRequest = NULL;

2533
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2534 2535 2536 2537
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2538
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2539 2540 2541 2542
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2543
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2544 2545 2546 2547 2548 2549 2550 2551 2552
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVDropStbReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

  // build drop stable
  pReq.igNotExists = true;
wmmhello's avatar
wmmhello 已提交
2553
  pReq.source = TD_REQ_FROM_TAOX;
wmmhello's avatar
wmmhello 已提交
2554
  pReq.suid = req.suid;
2555 2556

  STscObj* pTscObj = pRequest->pTscObj;
wmmhello's avatar
wmmhello 已提交
2557
  SName    tableName = {0};
wmmhello's avatar
wmmhello 已提交
2558
  tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
wmmhello's avatar
wmmhello 已提交
2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570

  SCmdMsgInfo pCmdMsg = {0};
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  pCmdMsg.msgType = TDMT_MND_DROP_STB;
  pCmdMsg.msgLen = tSerializeSMDropStbReq(NULL, 0, &pReq);
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
  if (NULL == pCmdMsg.pMsg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  tSerializeSMDropStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);

2571
  SQuery pQuery = {0};
wmmhello's avatar
wmmhello 已提交
2572 2573 2574 2575 2576 2577
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
  pQuery.pCmdMsg = &pCmdMsg;
  pQuery.msgType = pQuery.pCmdMsg->msgType;
  pQuery.stableQuery = true;

  launchQueryImpl(pRequest, &pQuery, true, NULL);
wmmhello's avatar
wmmhello 已提交
2578

L
Liu Jicong 已提交
2579 2580
  if (pRequest->code == TSDB_CODE_SUCCESS) {
    SCatalog* pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2581 2582 2583 2584
    catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
    catalogRemoveTableMeta(pCatalog, &tableName);
  }

wmmhello's avatar
wmmhello 已提交
2585 2586 2587
  code = pRequest->code;
  taosMemoryFree(pCmdMsg.pMsg);

2588
end:
wmmhello's avatar
wmmhello 已提交
2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  return code;
}

typedef struct SVgroupCreateTableBatch {
  SVCreateTbBatchReq req;
  SVgroupInfo        info;
  char               dbName[TSDB_DB_NAME_LEN];
} SVgroupCreateTableBatch;

static void destroyCreateTbReqBatch(void* data) {
2601
  SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*)data;
wmmhello's avatar
wmmhello 已提交
2602 2603 2604
  taosArrayDestroy(pTbBatch->req.pArray);
}

L
Liu Jicong 已提交
2605 2606 2607 2608 2609 2610 2611
static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
  SVCreateTbBatchReq req = {0};
  SDecoder           coder = {0};
  int32_t            code = TSDB_CODE_SUCCESS;
  SRequestObj*       pRequest = NULL;
  SQuery*            pQuery = NULL;
  SHashObj*          pVgroupHashmap = NULL;
wmmhello's avatar
wmmhello 已提交
2612

L
Liu Jicong 已提交
2613
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2614 2615 2616 2617
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2618
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2619 2620 2621 2622
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2623
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2624 2625 2626 2627 2628 2629 2630
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

2631 2632
  STscObj* pTscObj = pRequest->pTscObj;

L
Liu Jicong 已提交
2633 2634
  SVCreateTbReq* pCreateReq = NULL;
  SCatalog*      pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
  if (NULL == pVgroupHashmap) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);

  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
2648 2649 2650
                           .requestId = pRequest->requestId,
                           .requestObjRefId = pRequest->self,
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
wmmhello's avatar
wmmhello 已提交
2651 2652

  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
wmmhello's avatar
wmmhello 已提交
2653 2654 2655 2656 2657
  // loop to create table
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;

    SVgroupInfo pInfo = {0};
wmmhello's avatar
wmmhello 已提交
2658
    SName       pName = {0};
wmmhello's avatar
wmmhello 已提交
2659 2660 2661 2662 2663
    toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName);
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
    if (code != TSDB_CODE_SUCCESS) {
      goto end;
    }
wmmhello's avatar
wmmhello 已提交
2664
    taosArrayPush(pRequest->tableList, &pName);
wmmhello's avatar
wmmhello 已提交
2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690

    SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
    if (pTableBatch == NULL) {
      SVgroupCreateTableBatch tBatch = {0};
      tBatch.info = pInfo;
      strcpy(tBatch.dbName, pRequest->pDb);

      tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
      taosArrayPush(tBatch.req.pArray, pCreateReq);

      taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch));
    } else {  // add to the correct vgroup
      taosArrayPush(pTableBatch->req.pArray, pCreateReq);
    }
  }

  SArray* pBufArray = serializeVgroupsCreateTableBatch(pVgroupHashmap);
  if (NULL == pBufArray) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->msgType = TDMT_VND_CREATE_TABLE;
  pQuery->stableQuery = false;
2691
  pQuery->pRoot = nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT);
wmmhello's avatar
wmmhello 已提交
2692 2693 2694 2695 2696 2697

  code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
2698
  launchQueryImpl(pRequest, pQuery, true, NULL);
L
Liu Jicong 已提交
2699
  if (pRequest->code == TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2700 2701 2702
    removeMeta(pTscObj, pRequest->tableList);
  }

2703
  code = pRequest->code;
wmmhello's avatar
wmmhello 已提交
2704

2705
end:
wmmhello's avatar
wmmhello 已提交
2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723
  taosHashCleanup(pVgroupHashmap);
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  qDestroyQuery(pQuery);
  return code;
}

typedef struct SVgroupDropTableBatch {
  SVDropTbBatchReq req;
  SVgroupInfo      info;
  char             dbName[TSDB_DB_NAME_LEN];
} SVgroupDropTableBatch;

static void destroyDropTbReqBatch(void* data) {
  SVgroupDropTableBatch* pTbBatch = (SVgroupDropTableBatch*)data;
  taosArrayDestroy(pTbBatch->req.pArray);
}

L
Liu Jicong 已提交
2724 2725 2726 2727 2728 2729 2730
static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
  SVDropTbBatchReq req = {0};
  SDecoder         coder = {0};
  int32_t          code = TSDB_CODE_SUCCESS;
  SRequestObj*     pRequest = NULL;
  SQuery*          pQuery = NULL;
  SHashObj*        pVgroupHashmap = NULL;
wmmhello's avatar
wmmhello 已提交
2731

2732
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2733 2734 2735 2736
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2737
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2738 2739 2740 2741
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2742
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2743 2744 2745 2746 2747 2748 2749
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVDropTbBatchReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

2750 2751
  STscObj* pTscObj = pRequest->pTscObj;

L
Liu Jicong 已提交
2752 2753
  SVDropTbReq* pDropReq = NULL;
  SCatalog*    pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765 2766
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
  if (NULL == pVgroupHashmap) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);

  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
2767 2768 2769
                           .requestId = pRequest->requestId,
                           .requestObjRefId = pRequest->self,
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
wmmhello's avatar
wmmhello 已提交
2770
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
wmmhello's avatar
wmmhello 已提交
2771 2772 2773
  // loop to create table
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pDropReq = req.pReqs + iReq;
wmmhello's avatar
wmmhello 已提交
2774
    pDropReq->igNotExists = true;
wmmhello's avatar
wmmhello 已提交
2775 2776

    SVgroupInfo pInfo = {0};
wmmhello's avatar
wmmhello 已提交
2777
    SName       pName = {0};
wmmhello's avatar
wmmhello 已提交
2778 2779 2780 2781 2782 2783
    toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName);
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
    if (code != TSDB_CODE_SUCCESS) {
      goto end;
    }

wmmhello's avatar
wmmhello 已提交
2784
    taosArrayPush(pRequest->tableList, &pName);
wmmhello's avatar
wmmhello 已提交
2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807
    SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
    if (pTableBatch == NULL) {
      SVgroupDropTableBatch tBatch = {0};
      tBatch.info = pInfo;
      tBatch.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
      taosArrayPush(tBatch.req.pArray, pDropReq);

      taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch));
    } else {  // add to the correct vgroup
      taosArrayPush(pTableBatch->req.pArray, pDropReq);
    }
  }

  SArray* pBufArray = serializeVgroupsDropTableBatch(pVgroupHashmap);
  if (NULL == pBufArray) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->msgType = TDMT_VND_DROP_TABLE;
  pQuery->stableQuery = false;
2808
  pQuery->pRoot = nodesMakeNode(QUERY_NODE_DROP_TABLE_STMT);
wmmhello's avatar
wmmhello 已提交
2809 2810 2811 2812 2813 2814

  code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
2815
  launchQueryImpl(pRequest, pQuery, true, NULL);
L
Liu Jicong 已提交
2816
  if (pRequest->code == TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2817 2818
    removeMeta(pTscObj, pRequest->tableList);
  }
2819
  code = pRequest->code;
wmmhello's avatar
wmmhello 已提交
2820

2821
end:
wmmhello's avatar
wmmhello 已提交
2822 2823 2824 2825 2826 2827 2828
  taosHashCleanup(pVgroupHashmap);
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  qDestroyQuery(pQuery);
  return code;
}

wmmhello's avatar
wmmhello 已提交
2829 2830
// delete from db.tabl where ..       -> delete from tabl where ..
// delete from db    .tabl where ..   -> delete from tabl where ..
L
Liu Jicong 已提交
2831
// static void getTbName(char *sql){
2832 2833 2834 2835 2836 2837 2838 2839 2840 2841 2842 2843 2844 2845 2846 2847 2848 2849 2850 2851 2852 2853 2854 2855 2856 2857 2858 2859
//  char *ch = sql;
//
//  bool inBackQuote = false;
//  int8_t dotIndex = 0;
//  while(*ch != '\0'){
//    if(!inBackQuote && *ch == '`'){
//      inBackQuote = true;
//      ch++;
//      continue;
//    }
//
//    if(inBackQuote && *ch == '`'){
//      inBackQuote = false;
//      ch++;
//
//      continue;
//    }
//
//    if(!inBackQuote && *ch == '.'){
//      dotIndex ++;
//      if(dotIndex == 2){
//        memmove(sql, ch + 1, strlen(ch + 1) + 1);
//        break;
//      }
//    }
//    ch++;
//  }
//}
wmmhello's avatar
wmmhello 已提交
2860 2861

static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
L
Liu Jicong 已提交
2862 2863 2864
  SDeleteRes req = {0};
  SDecoder   coder = {0};
  int32_t    code = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2865 2866 2867 2868 2869 2870 2871 2872 2873 2874

  // decode and process req
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeDeleteRes(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

L
Liu Jicong 已提交
2875
  //  getTbName(req.tableFName);
wmmhello's avatar
wmmhello 已提交
2876
  char sql[256] = {0};
L
Liu Jicong 已提交
2877 2878
  sprintf(sql, "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName, req.tsColName,
          req.skey, req.tsColName, req.ekey);
wmmhello's avatar
wmmhello 已提交
2879 2880
  printf("delete sql:%s\n", sql);

L
Liu Jicong 已提交
2881 2882
  TAOS_RES*    res = taos_query(taos, sql);
  SRequestObj* pRequest = (SRequestObj*)res;
wmmhello's avatar
wmmhello 已提交
2883 2884 2885 2886 2887 2888 2889 2890 2891 2892 2893
  code = pRequest->code;
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
    code = TSDB_CODE_SUCCESS;
  }
  taos_free_result(res);

end:
  tDecoderClear(&coder);
  return code;
}

2894 2895 2896 2897 2898 2899 2900 2901
static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
  SVAlterTbReq   req = {0};
  SDecoder       coder = {0};
  int32_t        code = TSDB_CODE_SUCCESS;
  SRequestObj*   pRequest = NULL;
  SQuery*        pQuery = NULL;
  SArray*        pArray = NULL;
  SVgDataBlocks* pVgData = NULL;
wmmhello's avatar
wmmhello 已提交
2902

2903
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2904 2905 2906 2907 2908

  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

2909
  if (!pRequest->pDb) {
wmmhello's avatar
wmmhello 已提交
2910 2911 2912 2913
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
2914
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
wmmhello's avatar
wmmhello 已提交
2915 2916 2917 2918 2919 2920 2921 2922
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVAlterTbReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

  // do not deal TSDB_ALTER_TABLE_UPDATE_OPTIONS
2923
  if (req.action == TSDB_ALTER_TABLE_UPDATE_OPTIONS) {
wmmhello's avatar
wmmhello 已提交
2924 2925 2926
    goto end;
  }

2927
  STscObj*  pTscObj = pRequest->pTscObj;
2928
  SCatalog* pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2929 2930 2931 2932 2933 2934
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
2935 2936 2937
                           .requestId = pRequest->requestId,
                           .requestObjRefId = pRequest->self,
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
wmmhello's avatar
wmmhello 已提交
2938 2939

  SVgroupInfo pInfo = {0};
2940
  SName       pName = {0};
wmmhello's avatar
wmmhello 已提交
2941 2942 2943 2944 2945 2946 2947 2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958 2959 2960 2961 2962 2963 2964 2965 2966 2967 2968 2969 2970 2971 2972 2973
  toName(pTscObj->acctId, pRequest->pDb, req.tbName, &pName);
  code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  pArray = taosArrayInit(1, sizeof(void*));
  if (NULL == pArray) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }

  pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
  if (NULL == pVgData) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  pVgData->vg = pInfo;
  pVgData->pData = taosMemoryMalloc(metaLen);
  if (NULL == pVgData->pData) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  memcpy(pVgData->pData, meta, metaLen);
  ((SMsgHead*)pVgData->pData)->vgId = htonl(pInfo.vgId);
  pVgData->size = metaLen;
  pVgData->numOfTables = 1;
  taosArrayPush(pArray, &pVgData);

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->msgType = TDMT_VND_ALTER_TABLE;
  pQuery->stableQuery = false;
2974
  pQuery->pRoot = nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT);
wmmhello's avatar
wmmhello 已提交
2975 2976 2977 2978 2979 2980

  code = rewriteToVnodeModifyOpStmt(pQuery, pArray);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
2981 2982
  launchQueryImpl(pRequest, pQuery, true, NULL);

wmmhello's avatar
wmmhello 已提交
2983
  pVgData = NULL;
2984 2985 2986
  pArray = NULL;
  code = pRequest->code;
  if (code == TSDB_CODE_VND_TABLE_NOT_EXIST) {
wmmhello's avatar
wmmhello 已提交
2987
    code = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2988
  }
wmmhello's avatar
wmmhello 已提交
2989

L
Liu Jicong 已提交
2990
  if (pRequest->code == TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2991
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
L
Liu Jicong 已提交
2992
    if (pRes->res != NULL) {
wmmhello's avatar
wmmhello 已提交
2993 2994 2995
      code = handleAlterTbExecRes(pRes->res, pCatalog);
    }
  }
2996
end:
wmmhello's avatar
wmmhello 已提交
2997
  taosArrayDestroy(pArray);
2998
  if (pVgData) taosMemoryFreeClear(pVgData->pData);
wmmhello's avatar
wmmhello 已提交
2999 3000 3001 3002 3003 3004 3005
  taosMemoryFreeClear(pVgData);
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  qDestroyQuery(pQuery);
  return code;
}

L
Liu Jicong 已提交
3006
typedef struct {
3007
  SVgroupInfo vg;
L
Liu Jicong 已提交
3008 3009
  void*       data;
} VgData;
3010 3011 3012 3013 3014 3015

static void destroyVgHash(void* data) {
  VgData* vgData = (VgData*)data;
  taosMemoryFreeClear(vgData->data);
}

L
Liu Jicong 已提交
3016 3017
int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) {
  int32_t     code = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
3018
  STableMeta* pTableMeta = NULL;
L
Liu Jicong 已提交
3019
  SQuery*     pQuery = NULL;
wmmhello's avatar
wmmhello 已提交
3020 3021

  SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT);
L
Liu Jicong 已提交
3022
  if (!pRequest) {
wmmhello's avatar
wmmhello 已提交
3023 3024 3025
    uError("WriteRaw:createRequest error request is null");
    code = terrno;
    goto end;
3026 3027
  }

wmmhello's avatar
wmmhello 已提交
3028 3029 3030 3031 3032 3033 3034 3035 3036 3037
  if (!pRequest->pDb) {
    uError("WriteRaw:not use db");
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }

  SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
  strcpy(pName.dbname, pRequest->pDb);
  strcpy(pName.tname, tbname);

L
Liu Jicong 已提交
3038
  struct SCatalog* pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
3039
  code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
L
Liu Jicong 已提交
3040
  if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
3041 3042 3043 3044 3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060 3061 3062 3063 3064
    uError("WriteRaw: get gatlog error");
    goto end;
  }

  SRequestConnInfo conn = {0};
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
  conn.requestId = pRequest->requestId;
  conn.requestObjRefId = pRequest->self;
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);

  SVgroupInfo vgData = {0};
  code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData);
  if (code != TSDB_CODE_SUCCESS) {
    uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbname);
    goto end;
  }

  code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
  if (code != TSDB_CODE_SUCCESS) {
    uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbname);
    goto end;
  }
  uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid);
  uint64_t uid = pTableMeta->uid;
L
Liu Jicong 已提交
3065
  int32_t  numOfCols = pTableMeta->tableInfo.numOfColumns;
wmmhello's avatar
wmmhello 已提交
3066 3067

  uint16_t fLen = 0;
L
Liu Jicong 已提交
3068 3069
  int32_t  rowSize = 0;
  int16_t  nVar = 0;
wmmhello's avatar
wmmhello 已提交
3070
  for (int i = 0; i < numOfCols; i++) {
L
Liu Jicong 已提交
3071
    SSchema* schema = pTableMeta->schema + i;
wmmhello's avatar
wmmhello 已提交
3072 3073
    fLen += TYPE_BYTES[schema->type];
    rowSize += schema->bytes;
L
Liu Jicong 已提交
3074 3075
    if (IS_VAR_DATA_TYPE(schema->type)) {
      nVar++;
wmmhello's avatar
wmmhello 已提交
3076 3077 3078 3079 3080 3081 3082 3083
    }
  }

  int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
                            (int32_t)TD_BITMAP_BYTES(numOfCols - 1);
  int32_t schemaLen = 0;
  int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;

L
Liu Jicong 已提交
3084
  int32_t     totalLen = sizeof(SSubmitReq) + submitLen;
wmmhello's avatar
wmmhello 已提交
3085 3086
  SSubmitReq* subReq = taosMemoryCalloc(1, totalLen);
  SSubmitBlk* blk = POINTER_SHIFT(subReq, sizeof(SSubmitReq));
L
Liu Jicong 已提交
3087 3088
  void*       blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk));
  STSRow*     rowData = POINTER_SHIFT(blkSchema, schemaLen);
wmmhello's avatar
wmmhello 已提交
3089 3090 3091 3092 3093 3094

  SRowBuilder rb = {0};
  tdSRowInit(&rb, pTableMeta->sversion);
  tdSRowSetTpInfo(&rb, numOfCols, fLen);
  int32_t dataLen = 0;

L
Liu Jicong 已提交
3095
  char*    pStart = pData + sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t));
wmmhello's avatar
wmmhello 已提交
3096 3097 3098
  int32_t* colLength = (int32_t*)pStart;
  pStart += sizeof(int32_t) * numOfCols;

L
Liu Jicong 已提交
3099
  SResultColumn* pCol = taosMemoryCalloc(numOfCols, sizeof(SResultColumn));
wmmhello's avatar
wmmhello 已提交
3100 3101 3102 3103 3104 3105 3106 3107 3108 3109 3110 3111 3112 3113 3114 3115 3116 3117

  for (int32_t i = 0; i < numOfCols; ++i) {
    if (IS_VAR_DATA_TYPE(pTableMeta->schema[i].type)) {
      pCol[i].offset = (int32_t*)pStart;
      pStart += rows * sizeof(int32_t);
    } else {
      pCol[i].nullbitmap = pStart;
      pStart += BitmapLen(rows);
    }

    pCol[i].pData = pStart;
    pStart += colLength[i];
  }

  for (int32_t j = 0; j < rows; j++) {
    tdSRowResetBuf(&rb, rowData);
    int32_t offset = 0;
    for (int32_t k = 0; k < numOfCols; k++) {
L
Liu Jicong 已提交
3118
      const SSchema* pColumn = &pTableMeta->schema[k];
wmmhello's avatar
wmmhello 已提交
3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148

      if (IS_VAR_DATA_TYPE(pColumn->type)) {
        if (pCol[k].offset[j] != -1) {
          char* data = pCol[k].pData + pCol[k].offset[j];
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k);
        } else {
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
        }
      } else {
        if (!colDataIsNull_f(pCol[k].nullbitmap, j)) {
          char* data = pCol[k].pData + pColumn->bytes * j;
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k);
        } else {
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
        }
      }

      offset += TYPE_BYTES[pColumn->type];
    }
    int32_t rowLen = TD_ROW_LEN(rowData);
    rowData = POINTER_SHIFT(rowData, rowLen);
    dataLen += rowLen;
  }

  taosMemoryFree(pCol);

  blk->uid = htobe64(uid);
  blk->suid = htobe64(suid);
  blk->sversion = htonl(pTableMeta->sversion);
  blk->schemaLen = htonl(schemaLen);
3149
  blk->numOfRows = htonl(rows);
wmmhello's avatar
wmmhello 已提交
3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161 3162
  blk->dataLen = htonl(dataLen);
  subReq->length = sizeof(SSubmitReq) + sizeof(SSubmitBlk) + schemaLen + dataLen;
  subReq->numOfBlocks = 1;

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  if (NULL == pQuery) {
    uError("create SQuery error");
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->haveResultSet = false;
  pQuery->msgType = TDMT_VND_SUBMIT;
L
Liu Jicong 已提交
3163
  pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
wmmhello's avatar
wmmhello 已提交
3164 3165 3166 3167 3168
  if (NULL == pQuery->pRoot) {
    uError("create pQuery->pRoot error");
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
L
Liu Jicong 已提交
3169
  SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
wmmhello's avatar
wmmhello 已提交
3170 3171 3172
  nodeStmt->payloadType = PAYLOAD_TYPE_KV;
  nodeStmt->pDataBlocks = taosArrayInit(1, POINTER_BYTES);

L
Liu Jicong 已提交
3173
  SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
wmmhello's avatar
wmmhello 已提交
3174 3175 3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186
  if (NULL == dst) {
    code = TSDB_CODE_TSC_OUT_OF_MEMORY;
    goto end;
  }
  dst->vg = vgData;
  dst->numOfTables = subReq->numOfBlocks;
  dst->size = subReq->length;
  dst->pData = (char*)subReq;
  subReq->header.vgId = htonl(dst->vg.vgId);
  subReq->version = htonl(1);
  subReq->header.contLen = htonl(subReq->length);
  subReq->length = htonl(subReq->length);
  subReq->numOfBlocks = htonl(subReq->numOfBlocks);
L
Liu Jicong 已提交
3187
  subReq = NULL;  // no need free
wmmhello's avatar
wmmhello 已提交
3188 3189 3190 3191 3192 3193 3194 3195 3196 3197 3198
  taosArrayPush(nodeStmt->pDataBlocks, &dst);

  launchQueryImpl(pRequest, pQuery, true, NULL);
  code = pRequest->code;

end:
  taosMemoryFreeClear(pTableMeta);
  qDestroyQuery(pQuery);
  return code;
}

L
Liu Jicong 已提交
3199 3200 3201 3202
static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) {
  int32_t   code = TSDB_CODE_SUCCESS;
  SHashObj* pVgHash = NULL;
  SQuery*   pQuery = NULL;
wmmhello's avatar
wmmhello 已提交
3203
  SMqRspObj rspObj = {0};
L
Liu Jicong 已提交
3204
  SDecoder  decoder = {0};
3205 3206 3207

  terrno = TSDB_CODE_SUCCESS;
  SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT);
L
Liu Jicong 已提交
3208
  if (!pRequest) {
3209 3210 3211 3212
    uError("WriteRaw:createRequest error request is null");
    return terrno;
  }

wmmhello's avatar
wmmhello 已提交
3213 3214 3215 3216 3217
  rspObj.resIter = -1;
  rspObj.resType = RES_TYPE__TMQ;

  tDecoderInit(&decoder, data, dataLen);
  code = tDecodeSMqDataRsp(&decoder, &rspObj.rsp);
L
Liu Jicong 已提交
3218
  if (code != 0) {
wmmhello's avatar
wmmhello 已提交
3219 3220 3221 3222 3223
    uError("WriteRaw:decode smqDataRsp error");
    code = TSDB_CODE_INVALID_MSG;
    goto end;
  }

3224 3225 3226 3227 3228 3229 3230 3231
  if (!pRequest->pDb) {
    uError("WriteRaw:not use db");
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }

  pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
  taosHashSetFreeFp(pVgHash, destroyVgHash);
L
Liu Jicong 已提交
3232
  struct SCatalog* pCatalog = NULL;
3233
  code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
L
Liu Jicong 已提交
3234
  if (code != TSDB_CODE_SUCCESS) {
3235 3236 3237 3238 3239 3240 3241 3242 3243
    uError("WriteRaw: get gatlog error");
    goto end;
  }

  SRequestConnInfo conn = {0};
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
  conn.requestId = pRequest->requestId;
  conn.requestObjRefId = pRequest->self;
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
wmmhello's avatar
wmmhello 已提交
3244 3245 3246 3247 3248 3249

  printf("raw data block num:%d\n", rspObj.rsp.blockNum);
  while (++rspObj.resIter < rspObj.rsp.blockNum) {
    SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter);
    if (!rspObj.rsp.withSchema) {
      uError("WriteRaw:no schema, iter:%d", rspObj.resIter);
3250 3251
      goto end;
    }
wmmhello's avatar
wmmhello 已提交
3252 3253
    SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter);
    setResSchemaInfo(&rspObj.resInfo, pSW->pSchema, pSW->nCols);
3254

wmmhello's avatar
wmmhello 已提交
3255
    code = setQueryResultFromRsp(&rspObj.resInfo, pRetrieve, false, false);
L
Liu Jicong 已提交
3256
    if (code != TSDB_CODE_SUCCESS) {
3257 3258 3259 3260 3261
      uError("WriteRaw: setQueryResultFromRsp error");
      goto end;
    }

    uint16_t fLen = 0;
L
Liu Jicong 已提交
3262 3263
    int32_t  rowSize = 0;
    int16_t  nVar = 0;
3264
    for (int i = 0; i < pSW->nCols; i++) {
L
Liu Jicong 已提交
3265
      SSchema* schema = pSW->pSchema + i;
3266 3267
      fLen += TYPE_BYTES[schema->type];
      rowSize += schema->bytes;
L
Liu Jicong 已提交
3268 3269
      if (IS_VAR_DATA_TYPE(schema->type)) {
        nVar++;
3270 3271 3272
      }
    }

wmmhello's avatar
wmmhello 已提交
3273
    int32_t rows = rspObj.resInfo.numOfRows;
3274 3275 3276 3277 3278
    int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
                              (int32_t)TD_BITMAP_BYTES(pSW->nCols - 1);
    int32_t schemaLen = 0;
    int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;

wmmhello's avatar
wmmhello 已提交
3279
    const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter);
L
Liu Jicong 已提交
3280
    if (!tbName) {
3281 3282 3283 3284 3285
      uError("WriteRaw: tbname is null");
      code = TSDB_CODE_TMQ_INVALID_MSG;
      goto end;
    }

3286
    printf("raw data tbname:%s\n", tbName);
3287 3288 3289 3290 3291 3292 3293 3294 3295 3296 3297 3298 3299
    SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
    strcpy(pName.dbname, pRequest->pDb);
    strcpy(pName.tname, tbName);

    VgData vgData = {0};
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &(vgData.vg));
    if (code != TSDB_CODE_SUCCESS) {
      uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName);
      goto end;
    }

    SSubmitReq* subReq = NULL;
    SSubmitBlk* blk = NULL;
L
Liu Jicong 已提交
3300 3301
    void*       hData = taosHashGet(pVgHash, &vgData.vg.vgId, sizeof(vgData.vg.vgId));
    if (hData) {
3302 3303 3304
      vgData = *(VgData*)hData;

      int32_t totalLen = ((SSubmitReq*)(vgData.data))->length + submitLen;
L
Liu Jicong 已提交
3305
      void*   tmp = taosMemoryRealloc(vgData.data, totalLen);
3306 3307 3308 3309 3310 3311 3312 3313
      if (tmp == NULL) {
        code = TSDB_CODE_TSC_OUT_OF_MEMORY;
        goto end;
      }
      vgData.data = tmp;
      ((VgData*)hData)->data = tmp;
      subReq = (SSubmitReq*)(vgData.data);
      blk = POINTER_SHIFT(vgData.data, subReq->length);
L
Liu Jicong 已提交
3314
    } else {
3315
      int32_t totalLen = sizeof(SSubmitReq) + submitLen;
L
Liu Jicong 已提交
3316
      void*   tmp = taosMemoryCalloc(1, totalLen);
3317 3318 3319 3320 3321
      if (tmp == NULL) {
        code = TSDB_CODE_TSC_OUT_OF_MEMORY;
        goto end;
      }
      vgData.data = tmp;
L
Liu Jicong 已提交
3322
      taosHashPut(pVgHash, (const char*)&vgData.vg.vgId, sizeof(vgData.vg.vgId), (char*)&vgData, sizeof(vgData));
3323 3324 3325 3326 3327 3328 3329
      subReq = (SSubmitReq*)(vgData.data);
      subReq->length = sizeof(SSubmitReq);
      subReq->numOfBlocks = 0;

      blk = POINTER_SHIFT(vgData.data, sizeof(SSubmitReq));
    }

3330 3331
    STableMeta* pTableMeta = NULL;
    code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
3332 3333 3334 3335
    if (code != TSDB_CODE_SUCCESS) {
      uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbName);
      goto end;
    }
3336 3337 3338
    uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid);
    uint64_t uid = pTableMeta->uid;
    taosMemoryFreeClear(pTableMeta);
3339

L
Liu Jicong 已提交
3340
    void*   blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk));
3341 3342 3343 3344 3345 3346 3347 3348 3349 3350
    STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);

    SRowBuilder rb = {0};
    tdSRowInit(&rb, pSW->version);
    tdSRowSetTpInfo(&rb, pSW->nCols, fLen);
    int32_t dataLen = 0;

    for (int32_t j = 0; j < rows; j++) {
      tdSRowResetBuf(&rb, rowData);

wmmhello's avatar
wmmhello 已提交
3351 3352
      doSetOneRowPtr(&rspObj.resInfo);
      rspObj.resInfo.current += 1;
3353 3354 3355

      int32_t offset = 0;
      for (int32_t k = 0; k < pSW->nCols; k++) {
L
Liu Jicong 已提交
3356 3357
        const SSchema* pColumn = &pSW->pSchema[k];
        char*          data = rspObj.resInfo.row[k];
3358 3359 3360
        if (!data) {
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
        } else {
L
Liu Jicong 已提交
3361
          if (IS_VAR_DATA_TYPE(pColumn->type)) {
3362 3363 3364 3365 3366 3367 3368 3369 3370 3371 3372 3373 3374 3375 3376
            data -= VARSTR_HEADER_SIZE;
          }
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k);
        }
        offset += TYPE_BYTES[pColumn->type];
      }
      int32_t rowLen = TD_ROW_LEN(rowData);
      rowData = POINTER_SHIFT(rowData, rowLen);
      dataLen += rowLen;
    }

    blk->uid = htobe64(uid);
    blk->suid = htobe64(suid);
    blk->sversion = htonl(pSW->version);
    blk->schemaLen = htonl(schemaLen);
3377
    blk->numOfRows = htonl(rows);
3378 3379 3380 3381 3382 3383 3384 3385 3386 3387 3388 3389 3390 3391
    blk->dataLen = htonl(dataLen);
    subReq->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
    subReq->numOfBlocks++;
  }

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  if (NULL == pQuery) {
    uError("create SQuery error");
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->haveResultSet = false;
  pQuery->msgType = TDMT_VND_SUBMIT;
L
Liu Jicong 已提交
3392
  pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
3393 3394 3395 3396 3397
  if (NULL == pQuery->pRoot) {
    uError("create pQuery->pRoot error");
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
L
Liu Jicong 已提交
3398
  SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
3399 3400 3401 3402 3403
  nodeStmt->payloadType = PAYLOAD_TYPE_KV;

  int32_t numOfVg = taosHashGetSize(pVgHash);
  nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);

L
Liu Jicong 已提交
3404
  VgData* vData = (VgData*)taosHashIterate(pVgHash, NULL);
3405
  while (vData) {
L
Liu Jicong 已提交
3406
    SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
3407 3408 3409 3410
    if (NULL == dst) {
      code = TSDB_CODE_TSC_OUT_OF_MEMORY;
      goto end;
    }
3411 3412
    dst->vg = vData->vg;
    SSubmitReq* subReq = (SSubmitReq*)(vData->data);
3413 3414 3415
    dst->numOfTables = subReq->numOfBlocks;
    dst->size = subReq->length;
    dst->pData = (char*)subReq;
L
Liu Jicong 已提交
3416
    vData->data = NULL;  // no need free
3417 3418 3419 3420 3421 3422
    subReq->header.vgId = htonl(dst->vg.vgId);
    subReq->version = htonl(1);
    subReq->header.contLen = htonl(subReq->length);
    subReq->length = htonl(subReq->length);
    subReq->numOfBlocks = htonl(subReq->numOfBlocks);
    taosArrayPush(nodeStmt->pDataBlocks, &dst);
L
Liu Jicong 已提交
3423
    vData = (VgData*)taosHashIterate(pVgHash, vData);
3424 3425 3426 3427
  }

  launchQueryImpl(pRequest, pQuery, true, NULL);
  code = pRequest->code;
wmmhello's avatar
wmmhello 已提交
3428

3429
end:
wmmhello's avatar
wmmhello 已提交
3430 3431
  tDecoderClear(&decoder);
  taos_free_result(&rspObj);
3432 3433 3434 3435
  qDestroyQuery(pQuery);
  destroyRequest(pRequest);
  taosHashCleanup(pVgHash);
  return code;
wmmhello's avatar
wmmhello 已提交
3436 3437
}

wmmhello's avatar
wmmhello 已提交
3438 3439 3440 3441 3442 3443 3444 3445 3446 3447 3448 3449 3450 3451 3452 3453 3454 3455 3456 3457 3458 3459 3460 3461
char* tmq_get_json_meta(TAOS_RES* res) {
  if (!TD_RES_TMQ_META(res)) {
    return NULL;
  }

  SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
  if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB) {
    return processCreateStb(&pMetaRspObj->metaRsp);
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_STB) {
    return processAlterStb(&pMetaRspObj->metaRsp);
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_STB) {
    return processDropSTable(&pMetaRspObj->metaRsp);
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_TABLE) {
    return processCreateTable(&pMetaRspObj->metaRsp);
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_TABLE) {
    return processAlterTable(&pMetaRspObj->metaRsp);
  } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE) {
    return processDropTable(&pMetaRspObj->metaRsp);
  }
  return NULL;
}

void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }

L
Liu Jicong 已提交
3462 3463
int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
  if (!raw || !res) {
wmmhello's avatar
wmmhello 已提交
3464 3465 3466 3467 3468 3469 3470
    return TSDB_CODE_INVALID_PARA;
  }
  if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    raw->raw = pMetaRspObj->metaRsp.metaRsp;
    raw->raw_len = pMetaRspObj->metaRsp.metaRspLen;
    raw->raw_type = pMetaRspObj->metaRsp.resMsgType;
L
Liu Jicong 已提交
3471 3472
  } else if (TD_RES_TMQ(res)) {
    SMqRspObj* rspObj = ((SMqRspObj*)res);
wmmhello's avatar
wmmhello 已提交
3473 3474 3475 3476 3477 3478 3479 3480

    int32_t len = 0;
    int32_t code = 0;
    tEncodeSize(tEncodeSMqDataRsp, &rspObj->rsp, len, code);
    if (code < 0) {
      return -1;
    }

L
Liu Jicong 已提交
3481
    void*    buf = taosMemoryCalloc(1, len);
wmmhello's avatar
wmmhello 已提交
3482 3483 3484 3485 3486 3487 3488 3489 3490 3491 3492 3493 3494 3495 3496
    SEncoder encoder = {0};
    tEncoderInit(&encoder, buf, len);
    tEncodeSMqDataRsp(&encoder, &rspObj->rsp);
    tEncoderClear(&encoder);

    raw->raw = buf;
    raw->raw_len = len;
    raw->raw_type = RES_TYPE__TMQ;
  } else {
    return TSDB_CODE_TMQ_INVALID_MSG;
  }
  return TSDB_CODE_SUCCESS;
}

void tmq_free_raw(tmq_raw_data raw) {
L
Liu Jicong 已提交
3497
  if (raw.raw_type == RES_TYPE__TMQ) {
wmmhello's avatar
wmmhello 已提交
3498 3499 3500 3501
    taosMemoryFree(raw.raw);
  }
}

L
Liu Jicong 已提交
3502
int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
wmmhello's avatar
wmmhello 已提交
3503 3504 3505 3506
  if (!taos) {
    return TSDB_CODE_INVALID_PARA;
  }

L
Liu Jicong 已提交
3507
  if (raw.raw_type == TDMT_VND_CREATE_STB) {
wmmhello's avatar
wmmhello 已提交
3508
    return taosCreateStb(taos, raw.raw, raw.raw_len);
L
Liu Jicong 已提交
3509
  } else if (raw.raw_type == TDMT_VND_ALTER_STB) {
wmmhello's avatar
wmmhello 已提交
3510
    return taosCreateStb(taos, raw.raw, raw.raw_len);
L
Liu Jicong 已提交
3511
  } else if (raw.raw_type == TDMT_VND_DROP_STB) {
wmmhello's avatar
wmmhello 已提交
3512
    return taosDropStb(taos, raw.raw, raw.raw_len);
L
Liu Jicong 已提交
3513
  } else if (raw.raw_type == TDMT_VND_CREATE_TABLE) {
wmmhello's avatar
wmmhello 已提交
3514
    return taosCreateTable(taos, raw.raw, raw.raw_len);
L
Liu Jicong 已提交
3515
  } else if (raw.raw_type == TDMT_VND_ALTER_TABLE) {
wmmhello's avatar
wmmhello 已提交
3516
    return taosAlterTable(taos, raw.raw, raw.raw_len);
L
Liu Jicong 已提交
3517
  } else if (raw.raw_type == TDMT_VND_DROP_TABLE) {
wmmhello's avatar
wmmhello 已提交
3518
    return taosDropTable(taos, raw.raw, raw.raw_len);
L
Liu Jicong 已提交
3519
  } else if (raw.raw_type == TDMT_VND_DELETE) {
wmmhello's avatar
wmmhello 已提交
3520
    return taosDeleteData(taos, raw.raw, raw.raw_len);
L
Liu Jicong 已提交
3521
  } else if (raw.raw_type == RES_TYPE__TMQ) {
wmmhello's avatar
wmmhello 已提交
3522 3523 3524 3525 3526
    return tmqWriteRaw(taos, raw.raw, raw.raw_len);
  }
  return TSDB_CODE_INVALID_PARA;
}

L
Liu Jicong 已提交
3527
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
3528
  //
L
Liu Jicong 已提交
3529
  tmqCommitInner2(tmq, msg, 0, 1, cb, param);
L
Liu Jicong 已提交
3530 3531
}

3532 3533 3534 3535
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) {
  //
  return tmqCommitInner2(tmq, msg, 0, 0, NULL, NULL);
}