tmq.c 88.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "cJSON.h"
17 18 19
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22 23
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
24
#include "tqueue.h"
25
#include "tref.h"
L
Liu Jicong 已提交
26 27
#include "ttimer.h"

L
Liu Jicong 已提交
28 29 30 31 32 33 34 35
int32_t tmqAskEp(tmq_t* tmq, bool async);

typedef struct {
  int8_t inited;
  tmr_h  timer;
} SMqMgmt;

static SMqMgmt tmqMgmt = {0};
36

L
Liu Jicong 已提交
37 38 39 40 41 42
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
43 44 45
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
46 47
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
48
struct tmq_list_t {
L
Liu Jicong 已提交
49
  SArray container;
L
Liu Jicong 已提交
50
};
L
Liu Jicong 已提交
51

L
Liu Jicong 已提交
52
struct tmq_conf_t {
53 54 55 56 57 58 59 60 61 62 63 64
  char           clientId[256];
  char           groupId[TSDB_CGROUP_LEN];
  int8_t         autoCommit;
  int8_t         resetOffset;
  int8_t         withTbName;
  int8_t         spEnable;
  int32_t        spBatchSize;
  uint16_t       port;
  int32_t        autoCommitInterval;
  char*          ip;
  char*          user;
  char*          pass;
65
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
66
  void*          commitCbUserParam;
L
Liu Jicong 已提交
67 68 69
};

struct tmq_t {
L
Liu Jicong 已提交
70
  // conf
L
Liu Jicong 已提交
71 72
  char           groupId[TSDB_CGROUP_LEN];
  char           clientId[256];
73
  int8_t         withTbName;
L
Liu Jicong 已提交
74
  int8_t         useSnapshot;
L
Liu Jicong 已提交
75
  int8_t         autoCommit;
L
Liu Jicong 已提交
76
  int32_t        autoCommitInterval;
L
Liu Jicong 已提交
77
  int32_t        resetOffsetCfg;
L
Liu Jicong 已提交
78
  int64_t        consumerId;
L
Liu Jicong 已提交
79 80
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
81 82 83 84

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
85 86
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
87
  int32_t epSkipCnt;
L
Liu Jicong 已提交
88
#endif
L
Liu Jicong 已提交
89 90
  int64_t pollCnt;

L
Liu Jicong 已提交
91 92 93 94 95
  // timer
  tmr_h hbTimer;
  tmr_h reportTimer;
  tmr_h commitTimer;

L
Liu Jicong 已提交
96 97 98 99
  // connection
  STscObj* pTscObj;

  // container
L
Liu Jicong 已提交
100
  SArray*     clientTopics;  // SArray<SMqClientTopic>
L
Liu Jicong 已提交
101
  STaosQueue* mqueue;        // queue of rsp
L
Liu Jicong 已提交
102
  STaosQall*  qall;
L
Liu Jicong 已提交
103 104 105 106
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit

  // ctl
  tsem_t rspSem;
L
Liu Jicong 已提交
107 108
};

X
Xiaoyu Wang 已提交
109 110 111 112 113 114 115 116
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
117
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
118 119
};

L
Liu Jicong 已提交
120 121 122 123 124 125
enum {
  TMQ_DELAYED_TASK__HB = 1,
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

L
Liu Jicong 已提交
126
typedef struct {
127 128 129
  // statistics
  int64_t pollCnt;
  // offset
L
Liu Jicong 已提交
130 131 132 133
  /*int64_t      committedOffset;*/
  /*int64_t      currentOffset;*/
  STqOffsetVal committedOffsetNew;
  STqOffsetVal currentOffsetNew;
L
Liu Jicong 已提交
134
  // connection info
135
  int32_t vgId;
X
Xiaoyu Wang 已提交
136
  int32_t vgStatus;
L
Liu Jicong 已提交
137
  int32_t vgSkipCnt;
138 139 140
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
141
typedef struct {
142
  // subscribe info
L
Liu Jicong 已提交
143
  char* topicName;
L
Liu Jicong 已提交
144
  char  db[TSDB_DB_FNAME_LEN];
L
Liu Jicong 已提交
145 146 147

  SArray* vgs;  // SArray<SMqClientVg>

L
Liu Jicong 已提交
148 149
  int8_t         isSchemaAdaptive;
  SSchemaWrapper schema;
150 151
} SMqClientTopic;

L
Liu Jicong 已提交
152 153 154 155 156
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
157
  union {
L
Liu Jicong 已提交
158 159
    SMqDataRsp dataRsp;
    SMqMetaRsp metaRsp;
L
Liu Jicong 已提交
160
  };
L
Liu Jicong 已提交
161 162
} SMqPollRspWrapper;

L
Liu Jicong 已提交
163
typedef struct {
L
Liu Jicong 已提交
164 165 166
  tmq_t*  tmq;
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
167
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
168

L
Liu Jicong 已提交
169
typedef struct {
170
  tmq_t*  tmq;
L
Liu Jicong 已提交
171
  int32_t code;
L
Liu Jicong 已提交
172
  int32_t async;
X
Xiaoyu Wang 已提交
173
  tsem_t  rspSem;
174 175
} SMqAskEpCbParam;

L
Liu Jicong 已提交
176
typedef struct {
L
Liu Jicong 已提交
177 178
  tmq_t*          tmq;
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
179
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
180
  int32_t         epoch;
L
Liu Jicong 已提交
181
  int32_t         vgId;
L
Liu Jicong 已提交
182
  tsem_t          rspSem;
X
Xiaoyu Wang 已提交
183
} SMqPollCbParam;
184

L
Liu Jicong 已提交
185
#if 0
L
Liu Jicong 已提交
186
typedef struct {
L
Liu Jicong 已提交
187
  tmq_t*         tmq;
L
Liu Jicong 已提交
188 189
  int8_t         async;
  int8_t         automatic;
L
Liu Jicong 已提交
190
  int8_t         freeOffsets;
L
Liu Jicong 已提交
191
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
192
  tsem_t         rspSem;
L
Liu Jicong 已提交
193
  int32_t        rspErr;
L
Liu Jicong 已提交
194
  SArray*        offsets;
L
Liu Jicong 已提交
195
  void*          userParam;
L
Liu Jicong 已提交
196
} SMqCommitCbParam;
L
Liu Jicong 已提交
197
#endif
L
Liu Jicong 已提交
198

199
typedef struct {
L
Liu Jicong 已提交
200 201 202 203
  tmq_t* tmq;
  int8_t automatic;
  int8_t async;
  /*int8_t         freeOffsets;*/
L
Liu Jicong 已提交
204 205
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
L
Liu Jicong 已提交
206
  int32_t        rspErr;
207
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
208 209 210 211
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
  void*  userParam;
  tsem_t rspSem;
212 213 214 215 216 217 218
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
  STqOffset*           pOffset;
} SMqCommitCbParam2;

219
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
220
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
221
  conf->withTbName = false;
L
Liu Jicong 已提交
222
  conf->autoCommit = true;
L
Liu Jicong 已提交
223
  conf->autoCommitInterval = 5000;
X
Xiaoyu Wang 已提交
224
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
225 226 227
  return conf;
}

L
Liu Jicong 已提交
228
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
229 230 231 232 233 234
  if (conf) {
    if (conf->ip) taosMemoryFree(conf->ip);
    if (conf->user) taosMemoryFree(conf->user);
    if (conf->pass) taosMemoryFree(conf->pass);
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
235 236 237
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
238 239
  if (strcmp(key, "group.id") == 0) {
    strcpy(conf->groupId, value);
L
Liu Jicong 已提交
240
    return TMQ_CONF_OK;
241
  }
L
Liu Jicong 已提交
242

243 244
  if (strcmp(key, "client.id") == 0) {
    strcpy(conf->clientId, value);
L
Liu Jicong 已提交
245 246
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
247

L
Liu Jicong 已提交
248 249
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
250
      conf->autoCommit = true;
L
Liu Jicong 已提交
251 252
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
253
      conf->autoCommit = false;
L
Liu Jicong 已提交
254 255 256 257
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
258
  }
L
Liu Jicong 已提交
259

L
Liu Jicong 已提交
260 261 262 263 264
  if (strcmp(key, "auto.commit.interval.ms") == 0) {
    conf->autoCommitInterval = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
265 266 267 268 269 270 271 272 273 274 275 276 277 278
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
279

280 281
  if (strcmp(key, "msg.with.table.name") == 0) {
    if (strcmp(value, "true") == 0) {
282
      conf->withTbName = true;
L
Liu Jicong 已提交
283
      return TMQ_CONF_OK;
284
    } else if (strcmp(value, "false") == 0) {
285
      conf->withTbName = false;
L
Liu Jicong 已提交
286
      return TMQ_CONF_OK;
287 288 289 290 291
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
292
  if (strcmp(key, "experimental.snapshot.enable") == 0) {
L
Liu Jicong 已提交
293
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
294
      conf->spEnable = true;
L
Liu Jicong 已提交
295 296
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
297
      conf->spEnable = false;
L
Liu Jicong 已提交
298 299 300 301 302 303
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
304 305 306 307 308
  if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
    conf->spBatchSize = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
309
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
310 311 312
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
313
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
314 315 316
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
317
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
318 319 320
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
321
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
322 323 324
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
325
  if (strcmp(key, "td.connect.db") == 0) {
326
    /*conf->db = strdup(value);*/
L
Liu Jicong 已提交
327 328 329
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
330
  return TMQ_CONF_UNKNOWN;
331 332 333
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
334 335
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
336 337
}

L
Liu Jicong 已提交
338 339
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
340
  char*   topic = strdup(src);
L
fix  
Liu Jicong 已提交
341
  if (taosArrayPush(container, &topic) == NULL) return -1;
342 343 344
  return 0;
}

L
Liu Jicong 已提交
345
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
346
  SArray* container = &list->container;
L
Liu Jicong 已提交
347
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
348 349
}

L
Liu Jicong 已提交
350 351 352 353 354 355 356 357 358 359
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

L
Liu Jicong 已提交
360 361 362 363
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

L
Liu Jicong 已提交
364
#if 0
L
Liu Jicong 已提交
365 366
int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
  SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
L
Liu Jicong 已提交
367
  pParam->rspErr = code;
L
Liu Jicong 已提交
368 369
  if (pParam->async) {
    if (pParam->automatic && pParam->tmq->commitCb) {
L
Liu Jicong 已提交
370
      pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, pParam->tmq->commitCbUserParam);
L
Liu Jicong 已提交
371
    } else if (!pParam->automatic && pParam->userCb) {
L
Liu Jicong 已提交
372
      pParam->userCb(pParam->tmq, pParam->rspErr, pParam->userParam);
L
Liu Jicong 已提交
373 374
    }

L
Liu Jicong 已提交
375
    if (pParam->freeOffsets) {
L
Liu Jicong 已提交
376 377 378 379 380 381 382 383 384
      taosArrayDestroy(pParam->offsets);
    }

    taosMemoryFree(pParam);
  } else {
    tsem_post(&pParam->rspSem);
  }
  return 0;
}
L
Liu Jicong 已提交
385
#endif
L
Liu Jicong 已提交
386

D
dapan1121 已提交
387
int32_t tmqCommitCb2(void* param, SDataBuf* pBuf, int32_t code) {
388 389 390
  SMqCommitCbParam2*   pParam = (SMqCommitCbParam2*)param;
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
  // push into array
L
Liu Jicong 已提交
391
#if 0
392 393 394 395 396
  if (code == 0) {
    taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset);
  } else {
    taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
  }
L
Liu Jicong 已提交
397
#endif
L
Liu Jicong 已提交
398

S
Shengliang Guan 已提交
399
  /*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
L
Liu Jicong 已提交
400 401
   * pOffset->version);*/

402
  // count down waiting rsp
L
Liu Jicong 已提交
403
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
404 405 406 407 408 409 410
  ASSERT(waitingRspNum >= 0);

  if (waitingRspNum == 0) {
    // if no more waiting rsp
    if (pParamSet->async) {
      // call async cb func
      if (pParamSet->automatic && pParamSet->tmq->commitCb) {
L
Liu Jicong 已提交
411
        pParamSet->tmq->commitCb(pParamSet->tmq, pParamSet->rspErr, pParamSet->tmq->commitCbUserParam);
412 413
      } else if (!pParamSet->automatic && pParamSet->userCb) {
        // sem post
L
Liu Jicong 已提交
414
        pParamSet->userCb(pParamSet->tmq, pParamSet->rspErr, pParamSet->userParam);
415
      }
L
Liu Jicong 已提交
416 417
    } else {
      tsem_post(&pParamSet->rspSem);
418 419
    }

L
Liu Jicong 已提交
420
#if 0
421 422
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
L
Liu Jicong 已提交
423
#endif
424 425 426 427
  }
  return 0;
}

L
Liu Jicong 已提交
428 429 430 431 432 433 434
static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pTopic, SMqCommitCbParamSet* pParamSet) {
  STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
  if (pOffset == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pOffset->val = pVg->currentOffsetNew;
435

L
Liu Jicong 已提交
436 437 438 439
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pOffset->subKey, tmq->groupId, groupLen);
  pOffset->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);
L
Liu Jicong 已提交
440

L
Liu Jicong 已提交
441 442 443 444 445 446 447 448 449 450
  int32_t len;
  int32_t code;
  tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
  if (code < 0) {
    ASSERT(0);
    return -1;
  }
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
  if (buf == NULL) return -1;
  ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
451

L
Liu Jicong 已提交
452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
  tEncodeSTqOffset(&encoder, pOffset);

  // build param
  SMqCommitCbParam2* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam2));
  pParam->params = pParamSet;
  pParam->pOffset = pOffset;

  // build send info
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
    return -1;
  }
  pMsgSendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = sizeof(SMsgHead) + len,
      .handle = NULL,
  };

S
Shengliang Guan 已提交
474 475
  tscDebug("consumer:%" PRId64 ", commit offset of %s on vgId:%d, offset is %" PRId64, tmq->consumerId, pOffset->subKey,
           pVg->vgId, pOffset->val.version);
L
Liu Jicong 已提交
476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522

  // TODO: put into cb
  pVg->committedOffsetNew = pVg->currentOffsetNew;

  pMsgSendInfo->requestId = generateRequestId();
  pMsgSendInfo->requestObjRefId = 0;
  pMsgSendInfo->param = pParam;
  pMsgSendInfo->fp = tmqCommitCb2;
  pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
  // send msg

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
  pParamSet->waitingRspNum++;
  pParamSet->totalRspNum++;
  return 0;
}

int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
  char*   topic;
  int32_t vgId;
  ASSERT(msg != NULL);
  if (TD_RES_TMQ(msg)) {
    SMqRspObj* pRspObj = (SMqRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
  } else if (TD_RES_TMQ_META(msg)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
    topic = pMetaRspObj->topic;
    vgId = pMetaRspObj->vgId;
  } else {
    return TSDB_CODE_TMQ_INVALID_MSG;
  }

  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pParamSet->tmq = tmq;
  pParamSet->automatic = 0;
  pParamSet->async = async;
  /*pParamSet->freeOffsets = 1;*/
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

L
Liu Jicong 已提交
523 524
  int32_t code = -1;

L
Liu Jicong 已提交
525 526 527 528 529 530 531 532 533 534
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(pTopic->topicName, topic) != 0) continue;
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      if (pVg->vgId != vgId) continue;

      if (pVg->currentOffsetNew.type > 0 && !tOffsetEqual(&pVg->currentOffsetNew, &pVg->committedOffsetNew)) {
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          goto FAIL;
L
Liu Jicong 已提交
535
        }
L
Liu Jicong 已提交
536
        goto HANDLE_RSP;
L
Liu Jicong 已提交
537 538
      }
    }
L
Liu Jicong 已提交
539
  }
L
Liu Jicong 已提交
540

L
Liu Jicong 已提交
541 542 543 544
HANDLE_RSP:
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
545 546 547
    return 0;
  }

L
Liu Jicong 已提交
548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
    return code;
  } else {
    code = 0;
  }

FAIL:
  if (code != 0 && async) {
    userCb(tmq, code, userParam);
  }
  return 0;
}

int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                        void* userParam) {
  int32_t code = -1;

  if (msg != NULL) {
    return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
  }

572 573 574 575 576 577 578 579
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pParamSet->tmq = tmq;
  pParamSet->automatic = automatic;
  pParamSet->async = async;
L
Liu Jicong 已提交
580
  /*pParamSet->freeOffsets = 1;*/
581 582 583 584 585 586
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
587

S
Shengliang Guan 已提交
588
    tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgNum %d", tmq->consumerId, pTopic->topicName,
L
Liu Jicong 已提交
589 590
             (int32_t)taosArrayGetSize(pTopic->vgs));

591
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
L
Liu Jicong 已提交
592 593
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);

S
Shengliang Guan 已提交
594 595
      tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgId:%d", tmq->consumerId, pTopic->topicName,
               pVg->vgId);
L
Liu Jicong 已提交
596

L
Liu Jicong 已提交
597 598 599 600
      if (pVg->currentOffsetNew.type > 0 && !tOffsetEqual(&pVg->currentOffsetNew, &pVg->committedOffsetNew)) {
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          continue;
        }
601 602 603 604
      }
    }
  }

L
Liu Jicong 已提交
605 606 607 608 609 610
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
    return 0;
  }

611 612 613 614 615 616 617 618 619 620
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
  } else {
    code = 0;
  }

  if (code != 0 && async) {
    if (automatic) {
L
Liu Jicong 已提交
621
      tmq->commitCb(tmq, code, tmq->commitCbUserParam);
622
    } else {
L
Liu Jicong 已提交
623
      userCb(tmq, code, userParam);
624 625 626
    }
  }

L
Liu Jicong 已提交
627
#if 0
628 629 630 631
  if (!async) {
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
  }
L
Liu Jicong 已提交
632
#endif
633 634 635 636

  return 0;
}

L
Liu Jicong 已提交
637 638
#if 0
int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async,
L
Liu Jicong 已提交
639 640 641 642 643 644
                       tmq_commit_cb* userCb, void* userParam) {
  SMqCMCommitOffsetReq req;
  SArray*              pOffsets = NULL;
  void*                buf = NULL;
  SMqCommitCbParam*    pParam = NULL;
  SMsgSendInfo*        sendInfo = NULL;
L
Liu Jicong 已提交
645 646
  int8_t               freeOffsets;
  int32_t              code = -1;
L
Liu Jicong 已提交
647

L
Liu Jicong 已提交
648
  if (msg == NULL) {
L
Liu Jicong 已提交
649
    freeOffsets = 1;
L
Liu Jicong 已提交
650 651 652 653 654 655 656 657 658 659 660 661 662 663
    pOffsets = taosArrayInit(0, sizeof(SMqOffset));
    for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
      for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
        SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
        SMqOffset    offset;
        tstrncpy(offset.topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
        tstrncpy(offset.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
        offset.vgId = pVg->vgId;
        offset.offset = pVg->currentOffset;
        taosArrayPush(pOffsets, &offset);
      }
    }
  } else {
L
Liu Jicong 已提交
664
    freeOffsets = 0;
L
Liu Jicong 已提交
665
    pOffsets = (SArray*)&msg->container;
L
Liu Jicong 已提交
666 667 668 669 670
  }

  req.num = (int32_t)pOffsets->size;
  req.offsets = pOffsets->pData;

L
Liu Jicong 已提交
671 672 673 674
  SEncoder encoder;

  tEncoderInit(&encoder, NULL, 0);
  code = tEncodeSMqCMCommitOffsetReq(&encoder, &req);
L
Liu Jicong 已提交
675 676 677
  if (code < 0) {
    goto END;
  }
L
Liu Jicong 已提交
678
  int32_t tlen = encoder.pos;
L
Liu Jicong 已提交
679 680
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) {
L
Liu Jicong 已提交
681
    tEncoderClear(&encoder);
L
Liu Jicong 已提交
682 683
    goto END;
  }
L
Liu Jicong 已提交
684 685
  tEncoderClear(&encoder);

L
Liu Jicong 已提交
686 687 688 689 690 691 692 693 694 695 696 697
  tEncoderInit(&encoder, buf, tlen);
  tEncodeSMqCMCommitOffsetReq(&encoder, &req);
  tEncoderClear(&encoder);

  pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
  if (pParam == NULL) {
    goto END;
  }
  pParam->tmq = tmq;
  pParam->automatic = automatic;
  pParam->async = async;
  pParam->offsets = pOffsets;
L
Liu Jicong 已提交
698
  pParam->freeOffsets = freeOffsets;
L
Liu Jicong 已提交
699 700 701 702
  pParam->userCb = userCb;
  pParam->userParam = userParam;
  if (!async) tsem_init(&pParam->rspSem, 0, 0);

703
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726
  if (sendInfo == NULL) goto END;
  sendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqCommitCb;
  sendInfo->msgType = TDMT_MND_MQ_COMMIT_OFFSET;

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

  if (!async) {
    tsem_wait(&pParam->rspSem);
    code = pParam->rspErr;
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
727 728
  } else {
    code = 0;
L
Liu Jicong 已提交
729 730 731 732 733 734 735
  }

  // avoid double free if msg is sent
  buf = NULL;

END:
  if (buf) taosMemoryFree(buf);
L
Liu Jicong 已提交
736 737
  /*if (pParam) taosMemoryFree(pParam);*/
  /*if (sendInfo) taosMemoryFree(sendInfo);*/
L
Liu Jicong 已提交
738 739 740

  if (code != 0 && async) {
    if (automatic) {
L
Liu Jicong 已提交
741
      tmq->commitCb(tmq, code, (tmq_topic_vgroup_list_t*)pOffsets, tmq->commitCbUserParam);
L
Liu Jicong 已提交
742
    } else {
L
Liu Jicong 已提交
743
      userCb(tmq, code, (tmq_topic_vgroup_list_t*)pOffsets, userParam);
L
Liu Jicong 已提交
744 745 746
    }
  }

L
Liu Jicong 已提交
747
  if (!async && freeOffsets) {
L
Liu Jicong 已提交
748 749 750 751
    taosArrayDestroy(pOffsets);
  }
  return code;
}
L
Liu Jicong 已提交
752
#endif
L
Liu Jicong 已提交
753

L
Liu Jicong 已提交
754 755
void tmqAssignDelayedHbTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
756
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
757 758
  *pTaskType = TMQ_DELAYED_TASK__HB;
  taosWriteQitem(tmq->delayedTask, pTaskType);
759
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
760 761 762 763
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
764
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
765 766
  *pTaskType = TMQ_DELAYED_TASK__COMMIT;
  taosWriteQitem(tmq->delayedTask, pTaskType);
767
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
768 769 770 771
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
772
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
773 774
  *pTaskType = TMQ_DELAYED_TASK__REPORT;
  taosWriteQitem(tmq->delayedTask, pTaskType);
775
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
776 777 778 779 780 781 782 783 784 785 786
}

int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
  STaosQall* qall = taosAllocateQall();
  taosReadAllQitems(tmq->delayedTask, qall);
  while (1) {
    int8_t* pTaskType = NULL;
    taosGetQitem(qall, (void**)&pTaskType);
    if (pTaskType == NULL) break;

    if (*pTaskType == TMQ_DELAYED_TASK__HB) {
L
Liu Jicong 已提交
787
      tmqAskEp(tmq, true);
L
Liu Jicong 已提交
788 789
      taosTmrReset(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer, &tmq->hbTimer);
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
L
Liu Jicong 已提交
790
      tmqCommitInner2(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
L
Liu Jicong 已提交
791 792 793 794 795
      taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer);
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    } else {
      ASSERT(0);
    }
L
Liu Jicong 已提交
796
    taosFreeQitem(pTaskType);
L
Liu Jicong 已提交
797 798 799 800 801
  }
  taosFreeQall(qall);
  return 0;
}

L
Liu Jicong 已提交
802
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
803
  SMqRspWrapper* msg = NULL;
L
Liu Jicong 已提交
804 805 806 807 808 809 810 811
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }

L
fix  
Liu Jicong 已提交
812
  msg = NULL;
L
Liu Jicong 已提交
813 814 815 816 817 818 819 820 821 822
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }
}

D
dapan1121 已提交
823
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
824 825
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
L
Liu Jicong 已提交
826
  /*tmq_t* tmq = pParam->tmq;*/
L
Liu Jicong 已提交
827 828 829
  tsem_post(&pParam->rspSem);
  return 0;
}
830

L
Liu Jicong 已提交
831
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
X
Xiaoyu Wang 已提交
832 833 834 835
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
836
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
837
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
838
  }
L
Liu Jicong 已提交
839
  return 0;
X
Xiaoyu Wang 已提交
840 841
}

L
Liu Jicong 已提交
842 843 844
int32_t tmq_unsubscribe(tmq_t* tmq) {
  tmq_list_t* lst = tmq_list_new();
  int32_t     rsp = tmq_subscribe(tmq, lst);
L
Liu Jicong 已提交
845 846
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
847 848
}

L
Liu Jicong 已提交
849
#if 0
850
tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
wafwerar's avatar
wafwerar 已提交
851
  tmq_t* pTmq = taosMemoryCalloc(sizeof(tmq_t), 1);
852 853 854 855 856 857
  if (pTmq == NULL) {
    return NULL;
  }
  pTmq->pTscObj = (STscObj*)conn;
  pTmq->status = 0;
  pTmq->pollCnt = 0;
L
Liu Jicong 已提交
858
  pTmq->epoch = 0;
L
fix txn  
Liu Jicong 已提交
859
  pTmq->epStatus = 0;
L
temp  
Liu Jicong 已提交
860
  pTmq->epSkipCnt = 0;
L
Liu Jicong 已提交
861
  // set conf
862 863
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
L
Liu Jicong 已提交
864
  pTmq->autoCommit = conf->autoCommit;
865
  pTmq->commit_cb = conf->commit_cb;
L
Liu Jicong 已提交
866
  pTmq->resetOffsetCfg = conf->resetOffset;
L
Liu Jicong 已提交
867

L
Liu Jicong 已提交
868 869 870 871 872 873 874 875 876 877
  pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  if (pTmq->clientTopics == NULL) {
    taosMemoryFree(pTmq);
    return NULL;
  }

  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();

L
Liu Jicong 已提交
878 879
  tsem_init(&pTmq->rspSem, 0, 0);

L
Liu Jicong 已提交
880 881
  return pTmq;
}
L
Liu Jicong 已提交
882
#endif
L
Liu Jicong 已提交
883

L
Liu Jicong 已提交
884
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
L
Liu Jicong 已提交
885 886 887 888 889 890 891 892 893 894 895
  // init timer
  int8_t inited = atomic_val_compare_exchange_8(&tmqMgmt.inited, 0, 1);
  if (inited == 0) {
    tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
    if (tmqMgmt.timer == NULL) {
      atomic_store_8(&tmqMgmt.inited, 0);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
  }

L
Liu Jicong 已提交
896 897
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
L
Liu Jicong 已提交
898 899
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
L
Liu Jicong 已提交
900 901
    return NULL;
  }
L
Liu Jicong 已提交
902

L
Liu Jicong 已提交
903 904 905 906 907
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

  ASSERT(user);
  ASSERT(pass);
L
Liu Jicong 已提交
908
  ASSERT(conf->groupId[0]);
L
Liu Jicong 已提交
909

L
Liu Jicong 已提交
910 911 912 913 914 915
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
  pTmq->delayedTask = taosOpenQueue();

  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
L
Liu Jicong 已提交
916 917
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
L
Liu Jicong 已提交
918 919
    goto FAIL;
  }
L
Liu Jicong 已提交
920

L
Liu Jicong 已提交
921 922
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
923 924
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
925 926
  /*pTmq->epStatus = 0;*/
  /*pTmq->epSkipCnt = 0;*/
L
Liu Jicong 已提交
927

L
Liu Jicong 已提交
928 929 930
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
931
  pTmq->withTbName = conf->withTbName;
L
Liu Jicong 已提交
932
  pTmq->useSnapshot = conf->spEnable;
L
Liu Jicong 已提交
933
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
934
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
935 936
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
937 938
  pTmq->resetOffsetCfg = conf->resetOffset;

L
Liu Jicong 已提交
939
  // assign consumerId
L
Liu Jicong 已提交
940
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
941

L
Liu Jicong 已提交
942 943
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
L
Liu Jicong 已提交
944
    tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
L
Liu Jicong 已提交
945 946
    goto FAIL;
  }
L
Liu Jicong 已提交
947

L
Liu Jicong 已提交
948 949 950
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
L
Liu Jicong 已提交
951
    tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
L
Liu Jicong 已提交
952 953 954
    tsem_destroy(&pTmq->rspSem);
    goto FAIL;
  }
L
Liu Jicong 已提交
955

L
Liu Jicong 已提交
956 957
  tscInfo("consumer %ld is setup, consumer group %s", pTmq->consumerId, pTmq->groupId);

958
  return pTmq;
L
Liu Jicong 已提交
959 960 961 962 963 964 965 966

FAIL:
  if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
  if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
  if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
  if (pTmq->qall) taosFreeQall(pTmq->qall);
  taosMemoryFree(pTmq);
  return NULL;
967 968
}

L
Liu Jicong 已提交
969 970
#if 0
int32_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
L
Liu Jicong 已提交
971
  return tmqCommitInner2(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam);
L
Liu Jicong 已提交
972
}
L
Liu Jicong 已提交
973
#endif
L
Liu Jicong 已提交
974

L
Liu Jicong 已提交
975
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
L
Liu Jicong 已提交
976 977 978 979 980
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
  SCMSubscribeReq req = {0};
  int32_t         code = -1;
981 982

  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
983
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
984
  req.topicNames = taosArrayInit(sz, sizeof(void*));
L
Liu Jicong 已提交
985
  if (req.topicNames == NULL) goto FAIL;
986

L
Liu Jicong 已提交
987 988
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
989 990

    SName name = {0};
L
Liu Jicong 已提交
991
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
992

L
Liu Jicong 已提交
993 994 995
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
996
    }
L
Liu Jicong 已提交
997
    tNameExtractFullName(&name, topicFName);
998

L
Liu Jicong 已提交
999 1000 1001
    tscDebug("subscribe topic: %s", topicFName);

    taosArrayPush(req.topicNames, &topicFName);
1002 1003
  }

L
Liu Jicong 已提交
1004 1005 1006 1007
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) goto FAIL;

1008 1009 1010
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

1011
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1012
  if (sendInfo == NULL) goto FAIL;
1013

X
Xiaoyu Wang 已提交
1014
  SMqSubscribeCbParam param = {
L
Liu Jicong 已提交
1015
      .rspErr = 0,
X
Xiaoyu Wang 已提交
1016 1017
      .tmq = tmq,
  };
L
Liu Jicong 已提交
1018

L
Liu Jicong 已提交
1019 1020 1021
  if (tsem_init(&param.rspSem, 0, 0) != 0) goto FAIL;

  sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1022 1023 1024 1025
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
1026

L
Liu Jicong 已提交
1027 1028
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1029 1030
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
1031 1032
  sendInfo->msgType = TDMT_MND_SUBSCRIBE;

1033 1034 1035 1036 1037
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
1038 1039 1040
  // avoid double free if msg is sent
  buf = NULL;

L
Liu Jicong 已提交
1041 1042
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
1043

L
Liu Jicong 已提交
1044 1045 1046
  code = param.rspErr;
  if (code != 0) goto FAIL;

L
Liu Jicong 已提交
1047
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
L
fix  
Liu Jicong 已提交
1048
    tscDebug("consumer not ready, retry");
L
Liu Jicong 已提交
1049 1050
    taosMsleep(500);
  }
1051

L
Liu Jicong 已提交
1052
  // init hb timer
1053 1054 1055
  if (tmq->hbTimer == NULL) {
    tmq->hbTimer = taosTmrStart(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer);
  }
L
Liu Jicong 已提交
1056 1057

  // init auto commit timer
1058
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
L
Liu Jicong 已提交
1059 1060 1061
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer);
  }

L
Liu Jicong 已提交
1062 1063 1064
  code = 0;
FAIL:
  if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
1065
  if (code != 0 && buf) {
L
Liu Jicong 已提交
1066 1067 1068
    taosMemoryFree(buf);
  }
  return code;
1069 1070
}

L
Liu Jicong 已提交
1071
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
1072
  //
1073
  conf->commitCb = cb;
L
Liu Jicong 已提交
1074
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
1075
}
1076

L
Liu Jicong 已提交
1077
#if 0
L
Liu Jicong 已提交
1078 1079
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
  if (tmq_message == NULL) return 0;
L
Liu Jicong 已提交
1080
  SMqPollRsp* pRsp = &tmq_message->msg;
L
Liu Jicong 已提交
1081 1082
  return pRsp->skipLogNum;
}
L
Liu Jicong 已提交
1083
#endif
L
Liu Jicong 已提交
1084

D
dapan1121 已提交
1085
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1086 1087
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
1088
  SMqClientTopic* pTopic = pParam->pTopic;
X
Xiaoyu Wang 已提交
1089
  tmq_t*          tmq = pParam->tmq;
L
Liu Jicong 已提交
1090 1091 1092
  int32_t         vgId = pParam->vgId;
  int32_t         epoch = pParam->epoch;
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
1093
  if (code != 0) {
S
Shengliang Guan 已提交
1094
    tscWarn("msg discard from vgId:%d, epoch %d, code:%x", vgId, epoch, code);
L
Liu Jicong 已提交
1095
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1096 1097 1098 1099
    if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
      if (pRspWrapper == NULL) {
        taosMemoryFree(pMsg->pData);
S
Shengliang Guan 已提交
1100
        tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
Liu Jicong 已提交
1101 1102 1103 1104 1105 1106 1107 1108
        goto CREATE_MSG_FAIL;
      }
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      /*pRspWrapper->vgHandle = pVg;*/
      /*pRspWrapper->topicHandle = pTopic;*/
      taosWriteQitem(tmq->mqueue, pRspWrapper);
      tsem_post(&tmq->rspSem);
    }
L
fix txn  
Liu Jicong 已提交
1109
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1110 1111
  }

X
Xiaoyu Wang 已提交
1112 1113 1114
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
1115
    // do not write into queue since updating epoch reset
S
Shengliang Guan 已提交
1116
    tscWarn("msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d", vgId, msgEpoch,
L
Liu Jicong 已提交
1117
            tmqEpoch);
1118
    tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1119
    taosMemoryFree(pMsg->pData);
X
Xiaoyu Wang 已提交
1120 1121 1122 1123
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
S
Shengliang Guan 已提交
1124
    tscWarn("mismatch rsp from vgId:%d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
X
Xiaoyu Wang 已提交
1125 1126
  }

L
Liu Jicong 已提交
1127 1128 1129
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

1130
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1131
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1132
    taosMemoryFree(pMsg->pData);
S
Shengliang Guan 已提交
1133
    tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
fix txn  
Liu Jicong 已提交
1134
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1135
  }
L
Liu Jicong 已提交
1136

L
Liu Jicong 已提交
1137
  pRspWrapper->tmqRspType = rspType;
L
Liu Jicong 已提交
1138 1139
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
1140

L
Liu Jicong 已提交
1141
  if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1142 1143 1144
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
L
Liu Jicong 已提交
1145
    memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1146 1147 1148
  } else {
    ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP);
    tDecodeSMqMetaRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->metaRsp);
L
Liu Jicong 已提交
1149
    memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1150
  }
L
Liu Jicong 已提交
1151

L
Liu Jicong 已提交
1152
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1153

S
Shengliang Guan 已提交
1154 1155 1156
  tscDebug("consumer:%" PRId64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d",
           tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version,
           rspType);
L
fix  
Liu Jicong 已提交
1157

L
Liu Jicong 已提交
1158
  taosWriteQitem(tmq->mqueue, pRspWrapper);
1159
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1160

L
Liu Jicong 已提交
1161
  return 0;
L
fix txn  
Liu Jicong 已提交
1162
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1163
  if (epoch == tmq->epoch) {
L
Liu Jicong 已提交
1164 1165
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
1166
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1167
  return -1;
1168 1169
}

1170 1171 1172 1173 1174
bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
  bool set = false;

  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
S
Shengliang Guan 已提交
1175
  tscDebug("consumer:%" PRId64 ", update ep epoch %d to epoch %d, topic num:%d", tmq->consumerId, tmq->epoch, epoch,
1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193
           topicNumGet);

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
S
Shengliang Guan 已提交
1194
      tscDebug("consumer:%" PRId64 ", new vg num: %d", tmq->consumerId, vgNumCur);
1195 1196 1197
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
        sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
L
Liu Jicong 已提交
1198 1199 1200 1201
        char buf[80];
        tFormatOffset(buf, 80, &pVgCur->currentOffsetNew);
        tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch,
                 pVgCur->vgId, vgKey, buf);
L
Liu Jicong 已提交
1202
        taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffsetNew, sizeof(STqOffsetVal));
1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
    topic.schema = pTopicEp->schema;
    topic.topicName = strdup(pTopicEp->topic);
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);

S
Shengliang Guan 已提交
1214
    tscDebug("consumer:%" PRId64 ", update topic: %s", tmq->consumerId, topic.topicName);
1215 1216 1217 1218 1219 1220

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
L
Liu Jicong 已提交
1221 1222
      STqOffsetVal* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      STqOffsetVal  offsetNew = {.type = tmq->resetOffsetCfg};
1223
      if (pOffset != NULL) {
L
Liu Jicong 已提交
1224
        offsetNew = *pOffset;
1225 1226
      }

S
Shengliang Guan 已提交
1227 1228
      /*tscDebug("consumer:%" PRId64 ", (epoch %d) offset of vgId:%d updated to %" PRId64 ", vgKey is %s",
       * tmq->consumerId, epoch,*/
L
Liu Jicong 已提交
1229
      /*pVgEp->vgId, offset, vgKey);*/
1230 1231
      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1232
          .currentOffsetNew = offsetNew,
1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
          .vgSkipCnt = 0,
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
    taosArrayPush(newTopics, &topic);
  }
  if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
  taosHashCleanup(pHash);
  tmq->clientTopics = newTopics;

  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);

  atomic_store_32(&tmq->epoch, epoch);
  return set;
}

L
Liu Jicong 已提交
1256
#if 0
L
Liu Jicong 已提交
1257
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
L
Liu Jicong 已提交
1258
  /*printf("call update ep %d\n", epoch);*/
X
Xiaoyu Wang 已提交
1259
  bool    set = false;
L
Liu Jicong 已提交
1260 1261
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
S
Shengliang Guan 已提交
1262
  tscDebug("consumer:%" PRId64 ", update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch,
L
Liu Jicong 已提交
1263
           topicNumGet);
L
Liu Jicong 已提交
1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275
  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }
  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }

  // find topic, build hash
  for (int32_t i = 0; i < topicNumGet; i++) {
X
Xiaoyu Wang 已提交
1276 1277
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
L
Liu Jicong 已提交
1278
    topic.schema = pTopicEp->schema;
L
Liu Jicong 已提交
1279
    taosHashClear(pHash);
X
Xiaoyu Wang 已提交
1280
    topic.topicName = strdup(pTopicEp->topic);
L
Liu Jicong 已提交
1281
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1282

S
Shengliang Guan 已提交
1283
    tscDebug("consumer:%" PRId64 ", update topic: %s", tmq->consumerId, topic.topicName);
L
Liu Jicong 已提交
1284 1285 1286 1287 1288 1289
    int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
    for (int32_t j = 0; j < topicNumCur; j++) {
      // find old topic
      SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
      if (pTopicCur->vgs && strcmp(pTopicCur->topicName, pTopicEp->topic) == 0) {
        int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
S
Shengliang Guan 已提交
1290
        tscDebug("consumer:%" PRId64 ", new vg num: %d", tmq->consumerId, vgNumCur);
L
Liu Jicong 已提交
1291 1292 1293 1294
        if (vgNumCur == 0) break;
        for (int32_t k = 0; k < vgNumCur; k++) {
          SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k);
          sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId);
S
Shengliang Guan 已提交
1295
          tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d build %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey);
L
Liu Jicong 已提交
1296 1297 1298 1299 1300 1301 1302 1303 1304
          taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
        }
        break;
      }
    }

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
X
Xiaoyu Wang 已提交
1305
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
L
Liu Jicong 已提交
1306 1307 1308
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
      int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      int64_t  offset = pVgEp->offset;
S
Shengliang Guan 已提交
1309
      tscDebug("consumer:%" PRId64 ", (epoch %d) original offset of vgId:%d is %" PRId64, tmq->consumerId, epoch, pVgEp->vgId, offset);
L
Liu Jicong 已提交
1310 1311
      if (pOffset != NULL) {
        offset = *pOffset;
S
Shengliang Guan 已提交
1312
        tscDebug("consumer:%" PRId64 ", (epoch %d) receive offset of vgId:%d, full key is %s", tmq->consumerId, epoch, pVgEp->vgId,
1313
                 vgKey);
L
Liu Jicong 已提交
1314
      }
S
Shengliang Guan 已提交
1315
      tscDebug("consumer:%" PRId64 ", (epoch %d) offset of vgId:%d updated to %" PRId64, tmq->consumerId, epoch, pVgEp->vgId, offset);
X
Xiaoyu Wang 已提交
1316 1317
      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1318
          .currentOffset = offset,
X
Xiaoyu Wang 已提交
1319 1320 1321
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
L
Liu Jicong 已提交
1322
          .vgSkipCnt = 0,
X
Xiaoyu Wang 已提交
1323 1324 1325 1326
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
L
Liu Jicong 已提交
1327
    taosArrayPush(newTopics, &topic);
X
Xiaoyu Wang 已提交
1328
  }
L
Liu Jicong 已提交
1329
  if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
L
Liu Jicong 已提交
1330
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
1331
  tmq->clientTopics = newTopics;
1332

1333 1334 1335 1336
  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
1337

X
Xiaoyu Wang 已提交
1338 1339 1340
  atomic_store_32(&tmq->epoch, epoch);
  return set;
}
L
Liu Jicong 已提交
1341
#endif
X
Xiaoyu Wang 已提交
1342

D
dapan1121 已提交
1343
int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
1344
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1345
  tmq_t*           tmq = pParam->tmq;
L
Liu Jicong 已提交
1346
  int8_t           async = pParam->async;
L
Liu Jicong 已提交
1347
  pParam->code = code;
1348
  if (code != 0) {
S
Shengliang Guan 已提交
1349
    tscError("consumer:%" PRId64 ", get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->async);
L
Liu Jicong 已提交
1350
    goto END;
1351
  }
L
Liu Jicong 已提交
1352

L
Liu Jicong 已提交
1353
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1354
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1355
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1356 1357
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
S
Shengliang Guan 已提交
1358
  tscDebug("consumer:%" PRId64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1359 1360
  if (head->epoch <= epoch) {
    goto END;
1361
  }
L
Liu Jicong 已提交
1362

L
Liu Jicong 已提交
1363
  if (!async) {
L
Liu Jicong 已提交
1364 1365
    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
S
Shengliang Guan 已提交
1366 1367
    /*printf("rsp epoch %" PRId64 " sz %" PRId64 "\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %" PRId64 " sz %" PRId64 "\n", tmq->epoch, tmq->clientTopics->size);*/
L
Liu Jicong 已提交
1368
    tmqUpdateEp2(tmq, head->epoch, &rsp);
L
Liu Jicong 已提交
1369
    tDeleteSMqAskEpRsp(&rsp);
X
Xiaoyu Wang 已提交
1370
  } else {
1371
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1372
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1373
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1374 1375
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1376
    }
L
Liu Jicong 已提交
1377 1378 1379
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1380
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1381

L
Liu Jicong 已提交
1382
    taosWriteQitem(tmq->mqueue, pWrapper);
1383
    tsem_post(&tmq->rspSem);
1384
  }
L
Liu Jicong 已提交
1385 1386

END:
L
Liu Jicong 已提交
1387
  /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1388
  if (!async) {
L
Liu Jicong 已提交
1389
    tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
1390 1391
  } else {
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
1392 1393
  }
  return code;
1394 1395
}

L
Liu Jicong 已提交
1396
int32_t tmqAskEp(tmq_t* tmq, bool async) {
L
Liu Jicong 已提交
1397
  int32_t code = 0;
L
Liu Jicong 已提交
1398
#if 0
L
Liu Jicong 已提交
1399
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
L
fix txn  
Liu Jicong 已提交
1400
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
1401
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
S
Shengliang Guan 已提交
1402
    tscTrace("consumer:%" PRId64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
1403
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
1404
  }
L
temp  
Liu Jicong 已提交
1405
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
1406
#endif
L
Liu Jicong 已提交
1407
  int32_t      tlen = sizeof(SMqAskEpReq);
L
Liu Jicong 已提交
1408
  SMqAskEpReq* req = taosMemoryCalloc(1, tlen);
L
Liu Jicong 已提交
1409
  if (req == NULL) {
L
Liu Jicong 已提交
1410
    tscError("failed to malloc get subscribe ep buf");
L
Liu Jicong 已提交
1411
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1412
    return -1;
L
Liu Jicong 已提交
1413
  }
L
Liu Jicong 已提交
1414 1415 1416
  req->consumerId = htobe64(tmq->consumerId);
  req->epoch = htonl(tmq->epoch);
  strcpy(req->cgroup, tmq->groupId);
1417

L
Liu Jicong 已提交
1418
  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
1419 1420
  if (pParam == NULL) {
    tscError("failed to malloc subscribe param");
wafwerar's avatar
wafwerar 已提交
1421
    taosMemoryFree(req);
L
Liu Jicong 已提交
1422
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1423
    return -1;
L
Liu Jicong 已提交
1424 1425
  }
  pParam->tmq = tmq;
L
Liu Jicong 已提交
1426
  pParam->async = async;
X
Xiaoyu Wang 已提交
1427
  tsem_init(&pParam->rspSem, 0, 0);
1428

1429
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1430 1431
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1432 1433
    taosMemoryFree(pParam);
    taosMemoryFree(req);
L
Liu Jicong 已提交
1434
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1435 1436 1437 1438 1439 1440 1441 1442 1443 1444
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
      .pData = req,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
L
Liu Jicong 已提交
1445 1446 1447
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1448
  sendInfo->msgType = TDMT_MND_MQ_ASK_EP;
1449

L
Liu Jicong 已提交
1450 1451
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

S
Shengliang Guan 已提交
1452
  tscDebug("consumer:%" PRId64 ", ask ep", tmq->consumerId);
L
add log  
Liu Jicong 已提交
1453

L
Liu Jicong 已提交
1454 1455
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1456

L
Liu Jicong 已提交
1457
  if (!async) {
L
Liu Jicong 已提交
1458 1459 1460 1461 1462
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }
  return code;
1463 1464
}

L
Liu Jicong 已提交
1465 1466
#if 0
int32_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
L
Liu Jicong 已提交
1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479
  const SMqOffset* pOffset = &offset->offset;
  if (strcmp(pOffset->cgroup, tmq->groupId) != 0) {
    return TMQ_RESP_ERR__FAIL;
  }
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* clientTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(clientTopic->topicName, pOffset->topicName) == 0) {
      int32_t vgSz = taosArrayGetSize(clientTopic->vgs);
      for (int32_t j = 0; j < vgSz; j++) {
        SMqClientVg* pVg = taosArrayGet(clientTopic->vgs, j);
        if (pVg->vgId == pOffset->vgId) {
          pVg->currentOffset = pOffset->offset;
L
Liu Jicong 已提交
1480
          tmqClearUnhandleMsg(tmq);
L
Liu Jicong 已提交
1481 1482 1483 1484 1485 1486 1487
          return TMQ_RESP_ERR__SUCCESS;
        }
      }
    }
  }
  return TMQ_RESP_ERR__FAIL;
}
L
Liu Jicong 已提交
1488
#endif
L
Liu Jicong 已提交
1489

1490
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1491 1492 1493 1494 1495 1496 1497 1498 1499 1500
  /*int64_t reqOffset;*/
  /*if (pVg->currentOffset >= 0) {*/
  /*reqOffset = pVg->currentOffset;*/
  /*} else {*/
  /*if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {*/
  /*tscError("unable to poll since no committed offset but reset offset is set to none");*/
  /*return NULL;*/
  /*}*/
  /*reqOffset = tmq->resetOffsetCfg;*/
  /*}*/
L
Liu Jicong 已提交
1501

L
Liu Jicong 已提交
1502
  SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq));
1503 1504 1505
  if (pReq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
1506

L
Liu Jicong 已提交
1507 1508 1509
  /*strcpy(pReq->topic, pTopic->topicName);*/
  /*strcpy(pReq->cgroup, tmq->groupId);*/

L
Liu Jicong 已提交
1510 1511 1512 1513
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1514

1515
  pReq->withTbName = tmq->withTbName;
1516
  pReq->timeout = timeout;
L
Liu Jicong 已提交
1517
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1518
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1519 1520
  /*pReq->currentOffset = reqOffset;*/
  pReq->reqOffset = pVg->currentOffsetNew;
L
Liu Jicong 已提交
1521
  pReq->reqId = generateRequestId();
1522

L
Liu Jicong 已提交
1523 1524
  pReq->useSnapshot = tmq->useSnapshot;

1525
  pReq->head.vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
1526
  pReq->head.contLen = htonl(sizeof(SMqPollReq));
1527 1528 1529
  return pReq;
}

L
Liu Jicong 已提交
1530 1531
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
L
Liu Jicong 已提交
1532
  pRspObj->resType = RES_TYPE__TMQ_META;
L
Liu Jicong 已提交
1533 1534 1535 1536 1537 1538 1539 1540
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

L
Liu Jicong 已提交
1541 1542 1543
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1544 1545
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1546
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1547
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1548
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
L
Liu Jicong 已提交
1549

L
Liu Jicong 已提交
1550 1551
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
L
Liu Jicong 已提交
1552
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1553 1554
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1555

L
Liu Jicong 已提交
1556
  return pRspObj;
X
Xiaoyu Wang 已提交
1557 1558
}

1559
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1560
  /*tscDebug("call poll");*/
X
Xiaoyu Wang 已提交
1561 1562 1563 1564 1565 1566
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
      if (vgStatus != TMQ_VG_STATUS__IDLE) {
L
Liu Jicong 已提交
1567
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
L
Liu Jicong 已提交
1568 1569
        tscTrace("consumer:%" PRId64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId,
                 vgSkipCnt);
X
Xiaoyu Wang 已提交
1570
        continue;
L
Liu Jicong 已提交
1571
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1572 1573 1574 1575
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
S
Shengliang Guan 已提交
1576
        tscDebug("consumer:%" PRId64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId);
L
temp  
Liu Jicong 已提交
1577 1578
        }
#endif
X
Xiaoyu Wang 已提交
1579
      }
L
Liu Jicong 已提交
1580
      atomic_store_32(&pVg->vgSkipCnt, 0);
1581
      SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, timeout, pTopic, pVg);
X
Xiaoyu Wang 已提交
1582 1583
      if (pReq == NULL) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1584
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1585 1586
        return -1;
      }
wafwerar's avatar
wafwerar 已提交
1587
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1588
      if (pParam == NULL) {
wafwerar's avatar
wafwerar 已提交
1589
        taosMemoryFree(pReq);
X
Xiaoyu Wang 已提交
1590
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1591
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1592 1593
        return -1;
      }
L
Liu Jicong 已提交
1594 1595
      pParam->tmq = tmq;
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1596
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1597
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1598 1599
      pParam->epoch = tmq->epoch;

1600
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1601
      if (sendInfo == NULL) {
wafwerar's avatar
wafwerar 已提交
1602 1603
        taosMemoryFree(pReq);
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1604
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1605
        tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1606 1607 1608 1609
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1610
          .pData = pReq,
L
Liu Jicong 已提交
1611
          .len = sizeof(SMqPollReq),
X
Xiaoyu Wang 已提交
1612 1613
          .handle = NULL,
      };
L
Liu Jicong 已提交
1614
      sendInfo->requestId = pReq->reqId;
X
Xiaoyu Wang 已提交
1615
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1616
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1617
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1618
      sendInfo->msgType = TDMT_VND_CONSUME;
X
Xiaoyu Wang 已提交
1619 1620

      int64_t transporterId = 0;
L
fix  
Liu Jicong 已提交
1621
      /*printf("send poll\n");*/
L
Liu Jicong 已提交
1622

1623 1624
      char offsetFormatBuf[80];
      tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffsetNew);
L
Liu Jicong 已提交
1625 1626
      tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64,
               tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId);
S
Shengliang Guan 已提交
1627
      /*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/
X
Xiaoyu Wang 已提交
1628 1629 1630 1631 1632 1633 1634 1635
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
  return 0;
}

L
Liu Jicong 已提交
1636 1637
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1638
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1639 1640
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1641
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
L
Liu Jicong 已提交
1642
      tmqUpdateEp2(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1643
      /*tmqClearUnhandleMsg(tmq);*/
X
Xiaoyu Wang 已提交
1644 1645 1646 1647 1648 1649 1650 1651 1652 1653
      *pReset = true;
    } else {
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
1654
void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
X
Xiaoyu Wang 已提交
1655
  while (1) {
L
Liu Jicong 已提交
1656 1657 1658
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1659
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1660 1661
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
      if (rspWrapper == NULL) return NULL;
X
Xiaoyu Wang 已提交
1662 1663
    }

L
Liu Jicong 已提交
1664 1665 1666 1667 1668
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(rspWrapper);
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
      return NULL;
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1669
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1670
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
1671
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1672
      if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1673
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
L
Liu Jicong 已提交
1674 1675
        /*printf("vgId:%d offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
         * rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1676
        pVg->currentOffsetNew = pollRspWrapper->dataRsp.rspOffset;
X
Xiaoyu Wang 已提交
1677
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1678
        if (pollRspWrapper->dataRsp.blockNum == 0) {
L
Liu Jicong 已提交
1679 1680
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1681 1682
          continue;
        }
L
Liu Jicong 已提交
1683
        // build rsp
L
Liu Jicong 已提交
1684
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1685
        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1686
        return pRsp;
X
Xiaoyu Wang 已提交
1687
      } else {
L
Liu Jicong 已提交
1688 1689 1690 1691 1692 1693 1694
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
                 pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
        taosFreeQitem(pollRspWrapper);
      }
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1695
      if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1696
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
L
Liu Jicong 已提交
1697 1698
        /*printf("vgId:%d offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
         * rspMsg->msg.rspOffset);*/
wmmhello's avatar
wmmhello 已提交
1699 1700
        pVg->currentOffsetNew.version = pollRspWrapper->metaRsp.rspOffset;
        pVg->currentOffsetNew.type = TMQ_OFFSET__LOG;
L
Liu Jicong 已提交
1701 1702
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // build rsp
L
Liu Jicong 已提交
1703
        SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1704 1705 1706 1707
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
L
Liu Jicong 已提交
1708
                 pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1709
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1710 1711
      }
    } else {
L
fix  
Liu Jicong 已提交
1712
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1713
      bool reset = false;
L
Liu Jicong 已提交
1714 1715
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1716
      if (pollIfReset && reset) {
S
Shengliang Guan 已提交
1717
        tscDebug("consumer:%" PRId64 ", reset and repoll", tmq->consumerId);
1718
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
1719 1720 1721 1722 1723
      }
    }
  }
}

1724
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1725
  /*tscDebug("call poll1");*/
L
Liu Jicong 已提交
1726 1727
  void*   rspObj;
  int64_t startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1728

1729 1730 1731
#if 0
  tmqHandleAllDelayedTask(tmq);
  tmqPollImpl(tmq, timeout);
1732
  rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1733 1734
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1735
  }
1736
#endif
X
Xiaoyu Wang 已提交
1737

L
Liu Jicong 已提交
1738
  // in no topic status also need process delayed task
L
Liu Jicong 已提交
1739
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1740 1741 1742
    return NULL;
  }

X
Xiaoyu Wang 已提交
1743
  while (1) {
L
Liu Jicong 已提交
1744
    tmqHandleAllDelayedTask(tmq);
1745
    if (tmqPollImpl(tmq, timeout) < 0) return NULL;
L
Liu Jicong 已提交
1746

1747
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1748 1749
    if (rspObj) {
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
1750 1751
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      return NULL;
X
Xiaoyu Wang 已提交
1752
    }
1753
    if (timeout != -1) {
X
Xiaoyu Wang 已提交
1754
      int64_t endTime = taosGetTimestampMs();
1755
      int64_t leftTime = endTime - startTime;
1756
      if (leftTime > timeout) {
S
Shengliang Guan 已提交
1757
        tscDebug("consumer:%" PRId64 ", (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
X
Xiaoyu Wang 已提交
1758 1759
        return NULL;
      }
1760
      tsem_timewait(&tmq->rspSem, leftTime * 1000);
L
Liu Jicong 已提交
1761 1762 1763
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
      tsem_timewait(&tmq->rspSem, 500 * 1000);
X
Xiaoyu Wang 已提交
1764 1765 1766 1767
    }
  }
}

L
Liu Jicong 已提交
1768
int32_t tmq_consumer_close(tmq_t* tmq) {
1769
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
L
Liu Jicong 已提交
1770 1771
    int32_t rsp = tmq_commit_sync(tmq, NULL);
    if (rsp != 0) {
L
Liu Jicong 已提交
1772
      return rsp;
1773 1774 1775 1776
    }

    tmq_list_t* lst = tmq_list_new();
    rsp = tmq_subscribe(tmq, lst);
1777
    tmq_list_destroy(lst);
1778

L
Liu Jicong 已提交
1779
    if (rsp != 0) {
L
Liu Jicong 已提交
1780
      return rsp;
1781
    }
L
Liu Jicong 已提交
1782
  }
1783
  // TODO: free resources
L
Liu Jicong 已提交
1784
  return 0;
1785
}
L
Liu Jicong 已提交
1786

L
Liu Jicong 已提交
1787 1788
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
1789
    return "success";
L
Liu Jicong 已提交
1790
  } else if (err == -1) {
L
Liu Jicong 已提交
1791 1792 1793
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
1794 1795
  }
}
L
Liu Jicong 已提交
1796

L
Liu Jicong 已提交
1797 1798 1799 1800 1801 1802 1803 1804 1805 1806
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
    return TMQ_RES_TABLE_META;
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
1807
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
1808 1809
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
1810
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1811 1812 1813
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1814 1815 1816 1817 1818
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1819 1820 1821 1822
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1823 1824 1825
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1826 1827 1828 1829 1830
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1831 1832 1833 1834
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
1835 1836 1837
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
L
Liu Jicong 已提交
1838 1839 1840 1841
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
1842 1843 1844 1845 1846 1847 1848 1849

const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
1850
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
L
Liu Jicong 已提交
1851 1852 1853
  }
  return NULL;
}
1854

1855 1856
int32_t tmq_get_raw_meta(TAOS_RES* res, tmq_raw_data *raw) {
  if (TD_RES_TMQ_META(res) && raw) {
L
Liu Jicong 已提交
1857
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
wmmhello's avatar
wmmhello 已提交
1858 1859 1860
    raw->raw_meta = pMetaRspObj->metaRsp.metaRsp;
    raw->raw_meta_len = pMetaRspObj->metaRsp.metaRspLen;
    raw->raw_meta_type = pMetaRspObj->metaRsp.resMsgType;
1861
    return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1862
  }
1863
  return TSDB_CODE_INVALID_PARA;
L
Liu Jicong 已提交
1864 1865
}

wmmhello's avatar
wmmhello 已提交
1866 1867 1868 1869 1870 1871 1872 1873
static char *buildCreateTableJson(SSchemaWrapper *schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t){
  char*  string = NULL;
  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    return string;
  }
  cJSON* type = cJSON_CreateString("create");
  cJSON_AddItemToObject(json, "type", type);
wmmhello's avatar
wmmhello 已提交
1874

wmmhello's avatar
wmmhello 已提交
1875 1876 1877 1878
//  char uid[32] = {0};
//  sprintf(uid, "%"PRIi64, id);
//  cJSON* id_ = cJSON_CreateString(uid);
//  cJSON_AddItemToObject(json, "id", id_);
wmmhello's avatar
wmmhello 已提交
1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893
  cJSON* tableName = cJSON_CreateString(name);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super");
  cJSON_AddItemToObject(json, "tableType", tableType);
//  cJSON* version = cJSON_CreateNumber(1);
//  cJSON_AddItemToObject(json, "version", version);

  cJSON* columns = cJSON_CreateArray();
  for(int i = 0; i < schemaRow->nCols; i++){
    cJSON* column = cJSON_CreateObject();
    SSchema *s = schemaRow->pSchema + i;
    cJSON* cname = cJSON_CreateString(s->name);
    cJSON_AddItemToObject(column, "name", cname);
    cJSON* ctype = cJSON_CreateNumber(s->type);
    cJSON_AddItemToObject(column, "type", ctype);
wmmhello's avatar
wmmhello 已提交
1894 1895 1896 1897 1898 1899 1900 1901 1902
    if(s->type == TSDB_DATA_TYPE_BINARY){
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
      cJSON* cbytes = cJSON_CreateNumber(length);
      cJSON_AddItemToObject(column, "length", cbytes);
    }else if (s->type == TSDB_DATA_TYPE_NCHAR){
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE;
      cJSON* cbytes = cJSON_CreateNumber(length);
      cJSON_AddItemToObject(column, "length", cbytes);
    }
wmmhello's avatar
wmmhello 已提交
1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914
    cJSON_AddItemToArray(columns, column);
  }
  cJSON_AddItemToObject(json, "columns", columns);

  cJSON* tags = cJSON_CreateArray();
  for(int i = 0; schemaTag && i < schemaTag->nCols; i++){
    cJSON* tag = cJSON_CreateObject();
    SSchema *s = schemaTag->pSchema + i;
    cJSON* tname = cJSON_CreateString(s->name);
    cJSON_AddItemToObject(tag, "name", tname);
    cJSON* ttype = cJSON_CreateNumber(s->type);
    cJSON_AddItemToObject(tag, "type", ttype);
wmmhello's avatar
wmmhello 已提交
1915 1916 1917 1918 1919 1920 1921 1922 1923
    if(s->type == TSDB_DATA_TYPE_BINARY){
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
      cJSON* cbytes = cJSON_CreateNumber(length);
      cJSON_AddItemToObject(tag, "length", cbytes);
    }else if (s->type == TSDB_DATA_TYPE_NCHAR){
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE;
      cJSON* cbytes = cJSON_CreateNumber(length);
      cJSON_AddItemToObject(tag, "length", cbytes);
    }
wmmhello's avatar
wmmhello 已提交
1924 1925 1926 1927 1928 1929 1930 1931 1932
    cJSON_AddItemToArray(tags, tag);
  }
  cJSON_AddItemToObject(json, "tags", tags);

  string = cJSON_PrintUnformatted(json);
  cJSON_Delete(json);
  return string;
}

wmmhello's avatar
wmmhello 已提交
1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018
static char *buildAlterSTableJson(void* alterData, int32_t alterDataLen){
  SMAlterStbReq  req = {0};
  cJSON* json = NULL;
  char*  string = NULL;

  if (tDeserializeSMAlterStbReq(alterData, alterDataLen, &req) != 0) {
    goto end;
  }

  json = cJSON_CreateObject();
  if (json == NULL) {
    goto end;
  }
  cJSON* type = cJSON_CreateString("alter");
  cJSON_AddItemToObject(json, "type", type);
//  cJSON* uid = cJSON_CreateNumber(id);
//  cJSON_AddItemToObject(json, "uid", uid);
  SName name = {0};
  tNameFromString(&name, req.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
  cJSON* tableName = cJSON_CreateString(name.tname);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString("super");
  cJSON_AddItemToObject(json, "tableType", tableType);

  cJSON* alterType = cJSON_CreateNumber(req.alterType);
  cJSON_AddItemToObject(json, "alterType", alterType);
  switch (req.alterType) {
    case TSDB_ALTER_TABLE_ADD_TAG:
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
      TAOS_FIELD *field = taosArrayGet(req.pFields, 0);
      cJSON* colName = cJSON_CreateString(field->name);
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colType = cJSON_CreateNumber(field->type);
      cJSON_AddItemToObject(json, "colType", colType);

      if(field->type == TSDB_DATA_TYPE_BINARY){
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
        cJSON* cbytes = cJSON_CreateNumber(length);
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }else if (field->type == TSDB_DATA_TYPE_NCHAR){
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE;
        cJSON* cbytes = cJSON_CreateNumber(length);
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
      break;
    }
    case TSDB_ALTER_TABLE_DROP_TAG:
    case TSDB_ALTER_TABLE_DROP_COLUMN:{
      TAOS_FIELD *field = taosArrayGet(req.pFields, 0);
      cJSON* colName = cJSON_CreateString(field->name);
      cJSON_AddItemToObject(json, "colName", colName);
      break;
    }
    case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:{
      TAOS_FIELD *field = taosArrayGet(req.pFields, 0);
      cJSON* colName = cJSON_CreateString(field->name);
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colType = cJSON_CreateNumber(field->type);
      cJSON_AddItemToObject(json, "colType", colType);
      if(field->type == TSDB_DATA_TYPE_BINARY){
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
        cJSON* cbytes = cJSON_CreateNumber(length);
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }else if (field->type == TSDB_DATA_TYPE_NCHAR){
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE;
        cJSON* cbytes = cJSON_CreateNumber(length);
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
      break;
    }
    case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:{
      TAOS_FIELD *oldField = taosArrayGet(req.pFields, 0);
      TAOS_FIELD *newField = taosArrayGet(req.pFields, 1);
      cJSON* colName = cJSON_CreateString(oldField->name);
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colNewName = cJSON_CreateString(newField->name);
      cJSON_AddItemToObject(json, "colNewName", colNewName);
      break;
    }
    default:
      break;
  }
  string = cJSON_PrintUnformatted(json);

wmmhello's avatar
wmmhello 已提交
2019
  end:
wmmhello's avatar
wmmhello 已提交
2020 2021 2022 2023 2024
  cJSON_Delete(json);
  tFreeSMAltertbReq(&req);
  return string;
}

wmmhello's avatar
wmmhello 已提交
2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041
static char *processCreateStb(SMqMetaRsp *metaRsp){
  SVCreateStbReq req = {0};
  SDecoder       coder;
  char*  string = NULL;

  // decode and process req
  void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
    goto _err;
  }
  string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE);
  tDecoderClear(&coder);
  return string;

wmmhello's avatar
wmmhello 已提交
2042
  _err:
wmmhello's avatar
wmmhello 已提交
2043 2044 2045 2046
  tDecoderClear(&coder);
  return string;
}

wmmhello's avatar
wmmhello 已提交
2047 2048 2049
static char *processAlterStb(SMqMetaRsp *metaRsp){
  SVCreateStbReq req = {0};
  SDecoder       coder;
wmmhello's avatar
wmmhello 已提交
2050
  char*  string = NULL;
wmmhello's avatar
wmmhello 已提交
2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068

  // decode and process req
  void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);

  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
    goto _err;
  }
  string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen);
  tDecoderClear(&coder);
  return string;

  _err:
  tDecoderClear(&coder);
  return string;
}

wmmhello's avatar
wmmhello 已提交
2069
static char *buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray* tagName, int64_t id){
wmmhello's avatar
wmmhello 已提交
2070 2071
  char*  string = NULL;
  SArray* pTagVals = NULL;
wmmhello's avatar
wmmhello 已提交
2072 2073 2074 2075 2076 2077
  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    return string;
  }
  cJSON* type = cJSON_CreateString("create");
  cJSON_AddItemToObject(json, "type", type);
wmmhello's avatar
wmmhello 已提交
2078 2079 2080 2081
//  char cid[32] = {0};
//  sprintf(cid, "%"PRIi64, id);
//  cJSON* cid_ = cJSON_CreateString(cid);
//  cJSON_AddItemToObject(json, "id", cid_);
wmmhello's avatar
wmmhello 已提交
2082

wmmhello's avatar
wmmhello 已提交
2083 2084 2085 2086
  cJSON* tableName = cJSON_CreateString(name);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString("child");
  cJSON_AddItemToObject(json, "tableType", tableType);
wmmhello's avatar
wmmhello 已提交
2087
  cJSON* using = cJSON_CreateString(sname);
wmmhello's avatar
wmmhello 已提交
2088 2089 2090 2091 2092
  cJSON_AddItemToObject(json, "using", using);
//  cJSON* version = cJSON_CreateNumber(1);
//  cJSON_AddItemToObject(json, "version", version);

  cJSON* tags = cJSON_CreateArray();
wmmhello's avatar
wmmhello 已提交
2093 2094 2095 2096
  int32_t code = tTagToValArray(pTag, &pTagVals);
  if (code) {
    goto end;
  }
wmmhello's avatar
wmmhello 已提交
2097

wmmhello's avatar
wmmhello 已提交
2098 2099 2100 2101 2102
  if (tTagIsJson(pTag)) {
    STag* p = (STag*)pTag;
    if(p->nTag == 0){
      goto end;
    }
wmmhello's avatar
wmmhello 已提交
2103 2104
    char* pJson = parseTagDatatoJson(pTag);
    cJSON* tag = cJSON_CreateObject();
wmmhello's avatar
wmmhello 已提交
2105 2106
    STagVal* pTagVal = taosArrayGet(pTagVals, 0);

wmmhello's avatar
wmmhello 已提交
2107 2108
    char*  ptname = taosArrayGet(tagName, 0);
    cJSON* tname = cJSON_CreateString(ptname);
wmmhello's avatar
wmmhello 已提交
2109
    cJSON_AddItemToObject(tag, "name", tname);
wmmhello's avatar
wmmhello 已提交
2110 2111
//    cJSON* cid_ = cJSON_CreateString("");
//    cJSON_AddItemToObject(tag, "cid", cid_);
wmmhello's avatar
wmmhello 已提交
2112 2113
    cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON);
    cJSON_AddItemToObject(tag, "type", ttype);
wmmhello's avatar
wmmhello 已提交
2114
    cJSON* tvalue = cJSON_CreateString(pJson);
wmmhello's avatar
wmmhello 已提交
2115 2116
    cJSON_AddItemToObject(tag, "value", tvalue);
    cJSON_AddItemToArray(tags, tag);
wmmhello's avatar
wmmhello 已提交
2117
    taosMemoryFree(pJson);
wmmhello's avatar
wmmhello 已提交
2118 2119 2120
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
2121
  for(int i = 0; i < taosArrayGetSize(pTagVals); i++){
wmmhello's avatar
wmmhello 已提交
2122 2123 2124
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);

    cJSON* tag = cJSON_CreateObject();
wmmhello's avatar
wmmhello 已提交
2125

wmmhello's avatar
wmmhello 已提交
2126 2127
    char*  ptname = taosArrayGet(tagName, i);
    cJSON* tname = cJSON_CreateString(ptname);
wmmhello's avatar
wmmhello 已提交
2128
    cJSON_AddItemToObject(tag, "name", tname);
wmmhello's avatar
wmmhello 已提交
2129 2130
//    cJSON* cid = cJSON_CreateNumber(pTagVal->cid);
//    cJSON_AddItemToObject(tag, "cid", cid);
wmmhello's avatar
wmmhello 已提交
2131 2132
    cJSON* ttype = cJSON_CreateNumber(pTagVal->type);
    cJSON_AddItemToObject(tag, "type", ttype);
wmmhello's avatar
wmmhello 已提交
2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144

    char* buf = NULL;
    if (IS_VAR_DATA_TYPE(pTagVal->type)) {
      buf = taosMemoryCalloc(pTagVal->nData + 1, 1);
      dataConverToStr(buf, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL);
    } else {
      buf = taosMemoryCalloc(32, 1);
      dataConverToStr(buf, pTagVal->type, &pTagVal->i64, tDataTypes[pTagVal->type].bytes, NULL);
    }

    cJSON* tvalue = cJSON_CreateString(buf);
    taosMemoryFree(buf);
wmmhello's avatar
wmmhello 已提交
2145 2146 2147
    cJSON_AddItemToObject(tag, "value", tvalue);
    cJSON_AddItemToArray(tags, tag);
  }
wmmhello's avatar
wmmhello 已提交
2148 2149

  end:
wmmhello's avatar
wmmhello 已提交
2150 2151 2152
  cJSON_AddItemToObject(json, "tags", tags);
  string = cJSON_PrintUnformatted(json);
  cJSON_Delete(json);
wmmhello's avatar
wmmhello 已提交
2153
  taosArrayDestroy(pTagVals);
wmmhello's avatar
wmmhello 已提交
2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173
  return string;
}

static char *processCreateTable(SMqMetaRsp *metaRsp){
  SDecoder           decoder = {0};
  SVCreateTbBatchReq req = {0};
  SVCreateTbReq     *pCreateReq;
  char              *string = NULL;
  // decode
  void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
    goto _exit;
  }

  // loop to create table
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;
    if(pCreateReq->type == TSDB_CHILD_TABLE){
wmmhello's avatar
wmmhello 已提交
2174
      string = buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.name, pCreateReq->name, pCreateReq->ctb.tagName, pCreateReq->uid);
wmmhello's avatar
wmmhello 已提交
2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209
    }else if(pCreateReq->type == TSDB_NORMAL_TABLE){
      string = buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE);
    }
  }

  tDecoderClear(&decoder);

  _exit:
  tDecoderClear(&decoder);
  return string;
}

static char *processAlterTable(SMqMetaRsp *metaRsp){
  SDecoder           decoder = {0};
  SVAlterTbReq       vAlterTbReq = {0};
  char              *string = NULL;

  // decode
  void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVAlterTbReq(&decoder, &vAlterTbReq) < 0) {
    goto _exit;
  }

  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    goto _exit;
  }
  cJSON* type = cJSON_CreateString("alter");
  cJSON_AddItemToObject(json, "type", type);
//  cJSON* uid = cJSON_CreateNumber(id);
//  cJSON_AddItemToObject(json, "uid", uid);
  cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName);
  cJSON_AddItemToObject(json, "tableName", tableName);
wmmhello's avatar
wmmhello 已提交
2210
  cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ? "child" : "normal");
wmmhello's avatar
wmmhello 已提交
2211
  cJSON_AddItemToObject(json, "tableType", tableType);
wmmhello's avatar
wmmhello 已提交
2212 2213
  cJSON* alterType = cJSON_CreateNumber(vAlterTbReq.action);
  cJSON_AddItemToObject(json, "alterType", alterType);
wmmhello's avatar
wmmhello 已提交
2214 2215 2216 2217 2218 2219 2220

  switch (vAlterTbReq.action) {
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
      cJSON_AddItemToObject(json, "colType", colType);
wmmhello's avatar
wmmhello 已提交
2221 2222 2223 2224 2225 2226 2227 2228 2229 2230

      if(vAlterTbReq.type == TSDB_DATA_TYPE_BINARY){
        int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
        cJSON* cbytes = cJSON_CreateNumber(length);
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR){
        int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE;
        cJSON* cbytes = cJSON_CreateNumber(length);
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
wmmhello's avatar
wmmhello 已提交
2231 2232 2233 2234 2235 2236 2237 2238 2239 2240
      break;
    }
    case TSDB_ALTER_TABLE_DROP_COLUMN:{
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
      break;
    }
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:{
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
wmmhello's avatar
wmmhello 已提交
2241
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.colModType);
wmmhello's avatar
wmmhello 已提交
2242
      cJSON_AddItemToObject(json, "colType", colType);
wmmhello's avatar
wmmhello 已提交
2243 2244
      if(vAlterTbReq.colModType == TSDB_DATA_TYPE_BINARY){
        int32_t length = vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE;
wmmhello's avatar
wmmhello 已提交
2245 2246
        cJSON* cbytes = cJSON_CreateNumber(length);
        cJSON_AddItemToObject(json, "colLength", cbytes);
wmmhello's avatar
wmmhello 已提交
2247 2248
      }else if (vAlterTbReq.colModType == TSDB_DATA_TYPE_NCHAR){
        int32_t length = (vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE;
wmmhello's avatar
wmmhello 已提交
2249 2250 2251
        cJSON* cbytes = cJSON_CreateNumber(length);
        cJSON_AddItemToObject(json, "colLength", cbytes);
      }
wmmhello's avatar
wmmhello 已提交
2252 2253 2254 2255 2256 2257 2258 2259 2260
      break;
    }
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:{
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
      cJSON_AddItemToObject(json, "colName", colName);
      cJSON* colNewName = cJSON_CreateString(vAlterTbReq.colNewName);
      cJSON_AddItemToObject(json, "colNewName", colNewName);
      break;
    }
wmmhello's avatar
wmmhello 已提交
2261 2262 2263
    case TSDB_ALTER_TABLE_UPDATE_TAG_VAL:{
      cJSON* tagName = cJSON_CreateString(vAlterTbReq.tagName);
      cJSON_AddItemToObject(json, "colName", tagName);
wmmhello's avatar
wmmhello 已提交
2264

wmmhello's avatar
wmmhello 已提交
2265 2266 2267 2268 2269 2270
      bool isNull = vAlterTbReq.isNull;
      if(vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON){
        STag *jsonTag = (STag *)vAlterTbReq.pTagVal;
        if(jsonTag->nTag == 0) isNull = true;
      }
      if (!isNull){
wmmhello's avatar
wmmhello 已提交
2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285
        char* buf = NULL;

        if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
          ASSERT(tTagIsJson(vAlterTbReq.pTagVal) == true);
          buf = parseTagDatatoJson(vAlterTbReq.pTagVal);
        } else {
          buf = taosMemoryCalloc(vAlterTbReq.nTagVal + 1, 1);
          dataConverToStr(buf, vAlterTbReq.tagType, vAlterTbReq.pTagVal, vAlterTbReq.nTagVal, NULL);
        }

        cJSON* colValue = cJSON_CreateString(buf);
        cJSON_AddItemToObject(json, "colValue", colValue);
        taosMemoryFree(buf);
      }

wmmhello's avatar
wmmhello 已提交
2286 2287
      cJSON* isNullCJson = cJSON_CreateBool(isNull);
      cJSON_AddItemToObject(json, "colValueNull", isNullCJson);
wmmhello's avatar
wmmhello 已提交
2288 2289
      break;
    }
wmmhello's avatar
wmmhello 已提交
2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351
    default:
      break;
  }
  string = cJSON_PrintUnformatted(json);

  _exit:
  tDecoderClear(&decoder);
  return string;
}

static char *processDropSTable(SMqMetaRsp *metaRsp){
  SDecoder           decoder = {0};
  SVDropStbReq       req = {0};
  char              *string = NULL;

  // decode
  void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
    goto _exit;
  }

  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    goto _exit;
  }
  cJSON* type = cJSON_CreateString("drop");
  cJSON_AddItemToObject(json, "type", type);
  cJSON* tableName = cJSON_CreateString(req.name);
  cJSON_AddItemToObject(json, "tableName", tableName);
  cJSON* tableType = cJSON_CreateString("super");
  cJSON_AddItemToObject(json, "tableType", tableType);

  string = cJSON_PrintUnformatted(json);

  _exit:
  tDecoderClear(&decoder);
  return string;
}

static char *processDropTable(SMqMetaRsp *metaRsp){
  SDecoder           decoder = {0};
  SVDropTbBatchReq   req = {0};
  char              *string = NULL;

  // decode
  void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
  tDecoderInit(&decoder, data, len);
  if (tDecodeSVDropTbBatchReq(&decoder, &req) < 0) {
    goto _exit;
  }

  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
    goto _exit;
  }
  cJSON* type = cJSON_CreateString("drop");
  cJSON_AddItemToObject(json, "type", type);
//  cJSON* uid = cJSON_CreateNumber(id);
//  cJSON_AddItemToObject(json, "uid", uid);
wmmhello's avatar
wmmhello 已提交
2352 2353
//  cJSON* tableType = cJSON_CreateString("normal");
//  cJSON_AddItemToObject(json, "tableType", tableType);
wmmhello's avatar
wmmhello 已提交
2354 2355 2356 2357 2358

  cJSON* tableNameList = cJSON_CreateArray();
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;

wmmhello's avatar
wmmhello 已提交
2359
    cJSON* tableName = cJSON_CreateString(pDropTbReq->name);
wmmhello's avatar
wmmhello 已提交
2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379
    cJSON_AddItemToArray(tableNameList, tableName);
  }
  cJSON_AddItemToObject(json, "tableNameList", tableNameList);

  string = cJSON_PrintUnformatted(json);

  _exit:
  tDecoderClear(&decoder);
  return string;
}

char *tmq_get_json_meta(TAOS_RES *res){
  if (!TD_RES_TMQ_META(res)) {
    return NULL;
  }

  SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
  if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB){
    return processCreateStb(&pMetaRspObj->metaRsp);
  }else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_STB){
wmmhello's avatar
wmmhello 已提交
2380
    return processAlterStb(&pMetaRspObj->metaRsp);
wmmhello's avatar
wmmhello 已提交
2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392
  }else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_STB){
    return processDropSTable(&pMetaRspObj->metaRsp);
  }else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_TABLE){
    return processCreateTable(&pMetaRspObj->metaRsp);
  }else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_TABLE){
    return processAlterTable(&pMetaRspObj->metaRsp);
  }else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE){
    return processDropTable(&pMetaRspObj->metaRsp);
  }
  return NULL;
}

wmmhello's avatar
wmmhello 已提交
2393 2394 2395 2396
void tmq_free_json_meta(char* jsonMeta){
  taosMemoryFreeClear(jsonMeta);
}

2397
static int32_t taosCreateStb(TAOS *taos, void *meta, int32_t metaLen){
wmmhello's avatar
wmmhello 已提交
2398 2399 2400 2401 2402 2403
  SVCreateStbReq req = {0};
  SDecoder       coder;
  SMCreateStbReq pReq = {0};
  int32_t code = TSDB_CODE_SUCCESS;
  SRequestObj* pRequest = NULL;

2404
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  if(!pRequest->pDb){
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
  void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }
  // build create stable
  pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SField));
  for(int32_t i = 0; i < req.schemaRow.nCols; i++){
    SSchema* pSchema = req.schemaRow.pSchema + i;
    SField   field   = {.type = pSchema->type, .bytes = pSchema->bytes};
    strcpy(field.name, pSchema->name);
    taosArrayPush(pReq.pColumns, &field);
  }
  pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField));
  for(int32_t i = 0; i < req.schemaTag.nCols; i++){
    SSchema* pSchema = req.schemaTag.pSchema + i;
    SField   field   = {.type = pSchema->type, .bytes = pSchema->bytes};
    strcpy(field.name, pSchema->name);
    taosArrayPush(pReq.pTags, &field);
  }
2436

2437 2438
  pReq.colVer = req.schemaRow.version;
  pReq.tagVer = req.schemaTag.version;
wmmhello's avatar
wmmhello 已提交
2439 2440 2441 2442
  pReq.numOfColumns = req.schemaRow.nCols;
  pReq.numOfTags = req.schemaTag.nCols;
  pReq.commentLen = -1;
  pReq.suid = req.suid;
wmmhello's avatar
wmmhello 已提交
2443
  pReq.source = TD_REQ_FROM_TAOX;
wmmhello's avatar
wmmhello 已提交
2444
  pReq.igExists = true;
wmmhello's avatar
wmmhello 已提交
2445

2446
  STscObj* pTscObj = pRequest->pTscObj;
wmmhello's avatar
wmmhello 已提交
2447
  SName tableName;
wmmhello's avatar
wmmhello 已提交
2448
  tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
wmmhello's avatar
wmmhello 已提交
2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484

  SCmdMsgInfo pCmdMsg = {0};
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
  if (NULL == pCmdMsg.pMsg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);

  SQuery      pQuery = {0};
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
  pQuery.pCmdMsg = &pCmdMsg;
  pQuery.msgType = pQuery.pCmdMsg->msgType;
  pQuery.stableQuery = true;

  launchQueryImpl(pRequest, &pQuery, true, NULL);
  code = pRequest->code;
  taosMemoryFree(pCmdMsg.pMsg);

  end:
  destroyRequest(pRequest);
  tFreeSMCreateStbReq(&pReq);
  tDecoderClear(&coder);
  return code;
}

static int32_t taosDropStb(TAOS *taos, void *meta, int32_t metaLen){
  SVDropStbReq req = {0};
  SDecoder     coder;
  SMDropStbReq pReq = {0};
  int32_t code = TSDB_CODE_SUCCESS;
  SRequestObj* pRequest = NULL;

2485
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  if(!pRequest->pDb){
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
  void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVDropStbReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

  // build drop stable
  pReq.igNotExists = true;
wmmhello's avatar
wmmhello 已提交
2505
  pReq.source = TD_REQ_FROM_TAOX;
wmmhello's avatar
wmmhello 已提交
2506
  pReq.suid = req.suid;
2507 2508

  STscObj* pTscObj = pRequest->pTscObj;
wmmhello's avatar
wmmhello 已提交
2509
  SName tableName;
wmmhello's avatar
wmmhello 已提交
2510
  tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
wmmhello's avatar
wmmhello 已提交
2511 2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549

  SCmdMsgInfo pCmdMsg = {0};
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  pCmdMsg.msgType = TDMT_MND_DROP_STB;
  pCmdMsg.msgLen = tSerializeSMDropStbReq(NULL, 0, &pReq);
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
  if (NULL == pCmdMsg.pMsg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  tSerializeSMDropStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);

  SQuery      pQuery = {0};
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
  pQuery.pCmdMsg = &pCmdMsg;
  pQuery.msgType = pQuery.pCmdMsg->msgType;
  pQuery.stableQuery = true;

  launchQueryImpl(pRequest, &pQuery, true, NULL);
  code = pRequest->code;
  taosMemoryFree(pCmdMsg.pMsg);

  end:
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  return code;
}

typedef struct SVgroupCreateTableBatch {
  SVCreateTbBatchReq req;
  SVgroupInfo        info;
  char               dbName[TSDB_DB_NAME_LEN];
} SVgroupCreateTableBatch;

static void destroyCreateTbReqBatch(void* data) {
  SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*) data;
  taosArrayDestroy(pTbBatch->req.pArray);
}

L
Liu Jicong 已提交
2550 2551 2552 2553 2554 2555 2556
static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
  SVCreateTbBatchReq req = {0};
  SDecoder           coder = {0};
  int32_t            code = TSDB_CODE_SUCCESS;
  SRequestObj*       pRequest = NULL;
  SQuery*            pQuery = NULL;
  SHashObj*          pVgroupHashmap = NULL;
wmmhello's avatar
wmmhello 已提交
2557

L
Liu Jicong 已提交
2558
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2559 2560 2561 2562
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
2563

wmmhello's avatar
wmmhello 已提交
2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576
  if(!pRequest->pDb){
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
  void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

2577 2578
  STscObj* pTscObj = pRequest->pTscObj;

L
Liu Jicong 已提交
2579 2580
  SVCreateTbReq* pCreateReq = NULL;
  SCatalog*      pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
  if (NULL == pVgroupHashmap) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);

  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
      .requestId = pRequest->requestId,
      .requestObjRefId = pRequest->self,
      .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
  // loop to create table
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pCreateReq = req.pReqs + iReq;

    SVgroupInfo pInfo = {0};
    SName pName;
    toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName);
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
    if (code != TSDB_CODE_SUCCESS) {
      goto end;
    }

    SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
    if (pTableBatch == NULL) {
      SVgroupCreateTableBatch tBatch = {0};
      tBatch.info = pInfo;
      strcpy(tBatch.dbName, pRequest->pDb);

      tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
      taosArrayPush(tBatch.req.pArray, pCreateReq);

      taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch));
    } else {  // add to the correct vgroup
      taosArrayPush(pTableBatch->req.pArray, pCreateReq);
    }
  }

  SArray* pBufArray = serializeVgroupsCreateTableBatch(pVgroupHashmap);
  if (NULL == pBufArray) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->msgType = TDMT_VND_CREATE_TABLE;
  pQuery->stableQuery = false;
  pQuery->pRoot   = nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT);

  code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  launchQueryImpl(pRequest, pQuery, false, NULL);
  pQuery  = NULL;          // no need to free in the end
  code    = pRequest->code;

  end:
  taosHashCleanup(pVgroupHashmap);
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  qDestroyQuery(pQuery);
  return code;
}

typedef struct SVgroupDropTableBatch {
  SVDropTbBatchReq req;
  SVgroupInfo      info;
  char             dbName[TSDB_DB_NAME_LEN];
} SVgroupDropTableBatch;

static void destroyDropTbReqBatch(void* data) {
  SVgroupDropTableBatch* pTbBatch = (SVgroupDropTableBatch*)data;
  taosArrayDestroy(pTbBatch->req.pArray);
}

L
Liu Jicong 已提交
2664 2665 2666 2667 2668 2669 2670
static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
  SVDropTbBatchReq req = {0};
  SDecoder         coder = {0};
  int32_t          code = TSDB_CODE_SUCCESS;
  SRequestObj*     pRequest = NULL;
  SQuery*          pQuery = NULL;
  SHashObj*        pVgroupHashmap = NULL;
wmmhello's avatar
wmmhello 已提交
2671

2672
  code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  if(!pRequest->pDb){
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
  void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVDropTbBatchReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

2690 2691
  STscObj* pTscObj = pRequest->pTscObj;

L
Liu Jicong 已提交
2692 2693
  SVDropTbReq* pDropReq = NULL;
  SCatalog*    pCatalog = NULL;
wmmhello's avatar
wmmhello 已提交
2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
  if (NULL == pVgroupHashmap) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);

  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
      .requestId = pRequest->requestId,
      .requestObjRefId = pRequest->self,
      .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
  // loop to create table
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
    pDropReq = req.pReqs + iReq;
wmmhello's avatar
wmmhello 已提交
2713
    pDropReq->igNotExists = true;
wmmhello's avatar
wmmhello 已提交
2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773

    SVgroupInfo pInfo = {0};
    SName pName;
    toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName);
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
    if (code != TSDB_CODE_SUCCESS) {
      goto end;
    }

    SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
    if (pTableBatch == NULL) {
      SVgroupDropTableBatch tBatch = {0};
      tBatch.info = pInfo;
      tBatch.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
      taosArrayPush(tBatch.req.pArray, pDropReq);

      taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch));
    } else {  // add to the correct vgroup
      taosArrayPush(pTableBatch->req.pArray, pDropReq);
    }
  }

  SArray* pBufArray = serializeVgroupsDropTableBatch(pVgroupHashmap);
  if (NULL == pBufArray) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->msgType = TDMT_VND_DROP_TABLE;
  pQuery->stableQuery = false;
  pQuery->pRoot   = nodesMakeNode(QUERY_NODE_DROP_TABLE_STMT);

  code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  launchQueryImpl(pRequest, pQuery, false, NULL);
  pQuery  = NULL;          // no need to free in the end
  code    = pRequest->code;

  end:
  taosHashCleanup(pVgroupHashmap);
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  qDestroyQuery(pQuery);
  return code;
}

static int32_t taosAlterTable(TAOS *taos, void *meta, int32_t metaLen){
  SVAlterTbReq        req             = {0};
  SDecoder            coder           = {0};
  int32_t             code            = TSDB_CODE_SUCCESS;
  SRequestObj        *pRequest        = NULL;
  SQuery             *pQuery          = NULL;
  SArray             *pArray          = NULL;
  SVgDataBlocks      *pVgData         = NULL;

2774
  code = buildRequest(*(int64_t*) taos, "", 0, NULL, false, &pRequest);
wmmhello's avatar
wmmhello 已提交
2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797

  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  if(!pRequest->pDb){
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }
  // decode and process req
  void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
  int32_t len = metaLen - sizeof(SMsgHead);
  tDecoderInit(&coder, data, len);
  if (tDecodeSVAlterTbReq(&coder, &req) < 0) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
  }

  // do not deal TSDB_ALTER_TABLE_UPDATE_OPTIONS
  if(req.action == TSDB_ALTER_TABLE_UPDATE_OPTIONS){
    goto end;
  }

2798
  STscObj*  pTscObj = pRequest->pTscObj;
wmmhello's avatar
wmmhello 已提交
2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811 2812 2813 2814 2815 2816 2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839 2840 2841 2842 2843 2844 2845 2846 2847 2848 2849 2850 2851 2852 2853 2854 2855 2856
  SCatalog        *pCatalog = NULL;
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
      .requestId = pRequest->requestId,
      .requestObjRefId = pRequest->self,
      .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};

  SVgroupInfo pInfo = {0};
  SName pName = {0};
  toName(pTscObj->acctId, pRequest->pDb, req.tbName, &pName);
  code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  pArray = taosArrayInit(1, sizeof(void*));
  if (NULL == pArray) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }

  pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
  if (NULL == pVgData) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  pVgData->vg = pInfo;
  pVgData->pData = taosMemoryMalloc(metaLen);
  if (NULL == pVgData->pData) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  memcpy(pVgData->pData, meta, metaLen);
  ((SMsgHead*)pVgData->pData)->vgId = htonl(pInfo.vgId);
  pVgData->size = metaLen;
  pVgData->numOfTables = 1;
  taosArrayPush(pArray, &pVgData);

  pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->msgType = TDMT_VND_ALTER_TABLE;
  pQuery->stableQuery = false;
  pQuery->pRoot   = nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT);

  code = rewriteToVnodeModifyOpStmt(pQuery, pArray);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

  launchQueryImpl(pRequest, pQuery, false, NULL);
  pQuery  = NULL;          // no need to free in the end
  pVgData = NULL;
  pArray  = NULL;
  code    = pRequest->code;
wmmhello's avatar
wmmhello 已提交
2857 2858 2859
  if (code == TSDB_CODE_VND_TABLE_NOT_EXIST){
    code = 0;
  }
wmmhello's avatar
wmmhello 已提交
2860

wmmhello's avatar
wmmhello 已提交
2861
  end:
wmmhello's avatar
wmmhello 已提交
2862 2863 2864 2865 2866 2867 2868 2869 2870
  taosArrayDestroy(pArray);
  if(pVgData) taosMemoryFreeClear(pVgData->pData);
  taosMemoryFreeClear(pVgData);
  destroyRequest(pRequest);
  tDecoderClear(&coder);
  qDestroyQuery(pQuery);
  return code;
}

2871 2872
int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data raw_meta){
  if (!taos) {
wmmhello's avatar
wmmhello 已提交
2873 2874 2875
    return TSDB_CODE_INVALID_PARA;
  }

2876 2877 2878 2879 2880 2881 2882 2883 2884 2885 2886 2887
  if(raw_meta.raw_meta_type == TDMT_VND_CREATE_STB) {
    return taosCreateStb(taos, raw_meta.raw_meta, raw_meta.raw_meta_len);
  }else if(raw_meta.raw_meta_type == TDMT_VND_ALTER_STB){
    return taosCreateStb(taos, raw_meta.raw_meta, raw_meta.raw_meta_len);
  }else if(raw_meta.raw_meta_type == TDMT_VND_DROP_STB){
    return taosDropStb(taos, raw_meta.raw_meta, raw_meta.raw_meta_len);
  }else if(raw_meta.raw_meta_type == TDMT_VND_CREATE_TABLE){
    return taosCreateTable(taos, raw_meta.raw_meta, raw_meta.raw_meta_len);
  }else if(raw_meta.raw_meta_type == TDMT_VND_ALTER_TABLE){
    return taosAlterTable(taos, raw_meta.raw_meta, raw_meta.raw_meta_len);
  }else if(raw_meta.raw_meta_type == TDMT_VND_DROP_TABLE){
    return taosDropTable(taos, raw_meta.raw_meta, raw_meta.raw_meta_len);
wmmhello's avatar
wmmhello 已提交
2888 2889 2890 2891
  }
  return TSDB_CODE_INVALID_PARA;
}

wmmhello's avatar
wmmhello 已提交
2892 2893 2894 2895
void tmq_free_raw_meta(tmq_raw_data *rawMeta){
  taosMemoryFreeClear(rawMeta);
}

L
Liu Jicong 已提交
2896 2897
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
  tmqCommitInner2(tmq, msg, 0, 1, cb, param);
L
Liu Jicong 已提交
2898 2899
}

wmmhello's avatar
wmmhello 已提交
2900
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) { return tmqCommitInner2(tmq, msg, 0, 0, NULL, NULL); }