tmq.c 51.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "cJSON.h"
17 18 19
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22 23
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
24
#include "tqueue.h"
25
#include "tref.h"
L
Liu Jicong 已提交
26 27
#include "ttimer.h"

L
Liu Jicong 已提交
28 29 30
int32_t tmqAskEp(tmq_t* tmq, bool async);

typedef struct {
31 32 33
  int8_t  inited;
  tmr_h   timer;
  int32_t rsetId;
L
Liu Jicong 已提交
34 35 36
} SMqMgmt;

static SMqMgmt tmqMgmt = {0};
37

L
Liu Jicong 已提交
38 39 40 41 42 43
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
44 45 46
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
47 48
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
49
struct tmq_list_t {
L
Liu Jicong 已提交
50
  SArray container;
L
Liu Jicong 已提交
51
};
L
Liu Jicong 已提交
52

L
Liu Jicong 已提交
53
struct tmq_conf_t {
54 55 56 57 58
  char    clientId[256];
  char    groupId[TSDB_CGROUP_LEN];
  int8_t  autoCommit;
  int8_t  resetOffset;
  int8_t  withTbName;
L
Liu Jicong 已提交
59 60
  int8_t  snapEnable;
  int32_t snapBatchSize;
61 62 63

  bool hbBgEnable;

64 65 66 67 68
  uint16_t       port;
  int32_t        autoCommitInterval;
  char*          ip;
  char*          user;
  char*          pass;
69
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
70
  void*          commitCbUserParam;
L
Liu Jicong 已提交
71 72 73
};

struct tmq_t {
74
  int64_t refId;
L
Liu Jicong 已提交
75
  // conf
76 77 78 79 80 81 82 83 84 85 86
  char    groupId[TSDB_CGROUP_LEN];
  char    clientId[256];
  int8_t  withTbName;
  int8_t  useSnapshot;
  int8_t  autoCommit;
  int32_t autoCommitInterval;
  int32_t resetOffsetCfg;
  int64_t consumerId;

  bool hbBgEnable;

L
Liu Jicong 已提交
87 88
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
89 90 91 92

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
93 94
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
95
  int32_t epSkipCnt;
L
Liu Jicong 已提交
96
#endif
L
Liu Jicong 已提交
97 98
  int64_t pollCnt;

L
Liu Jicong 已提交
99
  // timer
100 101
  tmr_h hbLiveTimer;
  tmr_h epTimer;
L
Liu Jicong 已提交
102 103 104
  tmr_h reportTimer;
  tmr_h commitTimer;

L
Liu Jicong 已提交
105 106 107 108
  // connection
  STscObj* pTscObj;

  // container
L
Liu Jicong 已提交
109
  SArray*     clientTopics;  // SArray<SMqClientTopic>
L
Liu Jicong 已提交
110
  STaosQueue* mqueue;        // queue of rsp
L
Liu Jicong 已提交
111
  STaosQall*  qall;
L
Liu Jicong 已提交
112 113 114 115
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit

  // ctl
  tsem_t rspSem;
L
Liu Jicong 已提交
116 117
};

X
Xiaoyu Wang 已提交
118 119 120 121 122 123 124 125
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
126
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
127
  TMQ_CONSUMER_STATUS__RECOVER,
L
Liu Jicong 已提交
128 129
};

L
Liu Jicong 已提交
130
enum {
131
  TMQ_DELAYED_TASK__ASK_EP = 1,
L
Liu Jicong 已提交
132 133 134 135
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

L
Liu Jicong 已提交
136
typedef struct {
137 138 139
  // statistics
  int64_t pollCnt;
  // offset
L
Liu Jicong 已提交
140 141
  STqOffsetVal committedOffset;
  STqOffsetVal currentOffset;
L
Liu Jicong 已提交
142
  // connection info
143
  int32_t vgId;
X
Xiaoyu Wang 已提交
144
  int32_t vgStatus;
L
Liu Jicong 已提交
145
  int32_t vgSkipCnt;
146 147 148
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
149
typedef struct {
150
  // subscribe info
151 152
  char topicName[TSDB_TOPIC_FNAME_LEN];
  char db[TSDB_DB_FNAME_LEN];
L
Liu Jicong 已提交
153 154 155

  SArray* vgs;  // SArray<SMqClientVg>

L
Liu Jicong 已提交
156
  SSchemaWrapper schema;
157 158
} SMqClientTopic;

L
Liu Jicong 已提交
159 160 161 162 163
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
164
  union {
L
Liu Jicong 已提交
165 166
    SMqDataRsp dataRsp;
    SMqMetaRsp metaRsp;
L
Liu Jicong 已提交
167
  };
L
Liu Jicong 已提交
168 169
} SMqPollRspWrapper;

L
Liu Jicong 已提交
170
typedef struct {
171 172
  int64_t refId;
  int32_t epoch;
L
Liu Jicong 已提交
173 174
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
175
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
176

L
Liu Jicong 已提交
177
typedef struct {
178 179
  int64_t refId;
  int32_t epoch;
L
Liu Jicong 已提交
180
  int32_t code;
L
Liu Jicong 已提交
181
  int32_t async;
X
Xiaoyu Wang 已提交
182
  tsem_t  rspSem;
183 184
} SMqAskEpCbParam;

L
Liu Jicong 已提交
185
typedef struct {
186 187
  int64_t         refId;
  int32_t         epoch;
L
Liu Jicong 已提交
188
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
189
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
190
  int32_t         vgId;
L
Liu Jicong 已提交
191
  tsem_t          rspSem;
X
Xiaoyu Wang 已提交
192
} SMqPollCbParam;
193

194
typedef struct {
195 196
  int64_t        refId;
  int32_t        epoch;
L
Liu Jicong 已提交
197 198
  int8_t         automatic;
  int8_t         async;
L
Liu Jicong 已提交
199 200
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
L
Liu Jicong 已提交
201
  int32_t        rspErr;
202
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
203 204 205 206
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
  void*  userParam;
  tsem_t rspSem;
207 208 209 210 211
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
  STqOffset*           pOffset;
212
} SMqCommitCbParam;
213

214
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
215
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
216
  conf->withTbName = false;
L
Liu Jicong 已提交
217
  conf->autoCommit = true;
L
Liu Jicong 已提交
218
  conf->autoCommitInterval = 5000;
X
Xiaoyu Wang 已提交
219
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
220
  conf->hbBgEnable = true;
221 222 223
  return conf;
}

L
Liu Jicong 已提交
224
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
225 226 227 228 229 230
  if (conf) {
    if (conf->ip) taosMemoryFree(conf->ip);
    if (conf->user) taosMemoryFree(conf->user);
    if (conf->pass) taosMemoryFree(conf->pass);
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
231 232 233
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
234 235
  if (strcmp(key, "group.id") == 0) {
    strcpy(conf->groupId, value);
L
Liu Jicong 已提交
236
    return TMQ_CONF_OK;
237
  }
L
Liu Jicong 已提交
238

239 240
  if (strcmp(key, "client.id") == 0) {
    strcpy(conf->clientId, value);
L
Liu Jicong 已提交
241 242
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
243

L
Liu Jicong 已提交
244 245
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
246
      conf->autoCommit = true;
L
Liu Jicong 已提交
247 248
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
249
      conf->autoCommit = false;
L
Liu Jicong 已提交
250 251 252 253
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
254
  }
L
Liu Jicong 已提交
255

L
Liu Jicong 已提交
256 257 258 259 260
  if (strcmp(key, "auto.commit.interval.ms") == 0) {
    conf->autoCommitInterval = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
261 262 263 264 265 266 267 268 269 270 271 272 273 274
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
275

276 277
  if (strcmp(key, "msg.with.table.name") == 0) {
    if (strcmp(value, "true") == 0) {
278
      conf->withTbName = true;
L
Liu Jicong 已提交
279
      return TMQ_CONF_OK;
280
    } else if (strcmp(value, "false") == 0) {
281
      conf->withTbName = false;
L
Liu Jicong 已提交
282
      return TMQ_CONF_OK;
283 284 285 286 287
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
288
  if (strcmp(key, "experimental.snapshot.enable") == 0) {
L
Liu Jicong 已提交
289
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
290
      conf->snapEnable = true;
291 292
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
293
      conf->snapEnable = false;
294 295 296 297 298 299
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
300 301 302 303 304
  if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
    conf->snapBatchSize = atoi(value);
    return TMQ_CONF_OK;
  }

305 306 307
  if (strcmp(key, "enable.heartbeat.background") == 0) {
    if (strcmp(value, "true") == 0) {
      conf->hbBgEnable = true;
L
Liu Jicong 已提交
308 309
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
310
      conf->hbBgEnable = false;
L
Liu Jicong 已提交
311 312 313 314
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
315
    return TMQ_CONF_OK;
L
Liu Jicong 已提交
316 317
  }

L
Liu Jicong 已提交
318
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
319 320 321
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
322
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
323 324 325
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
326
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
327 328 329
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
330
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
331 332 333
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
334
  if (strcmp(key, "td.connect.db") == 0) {
335
    /*conf->db = strdup(value);*/
L
Liu Jicong 已提交
336 337 338
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
339
  return TMQ_CONF_UNKNOWN;
340 341 342
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
343 344
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
345 346
}

L
Liu Jicong 已提交
347 348
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
349 350 351 352 353
  if (src == NULL || src[0] == 0) return -1;
  char* topic = strdup(src);
  if (topic[0] != '`') {
    strtolower(topic, src);
  }
L
fix  
Liu Jicong 已提交
354
  if (taosArrayPush(container, &topic) == NULL) return -1;
355 356 357
  return 0;
}

L
Liu Jicong 已提交
358
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
359
  SArray* container = &list->container;
L
Liu Jicong 已提交
360
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
361 362
}

L
Liu Jicong 已提交
363 364 365 366 367 368 369 370 371 372
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

L
Liu Jicong 已提交
373 374 375 376
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408
int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParamSet->refId);
  if (tmq == NULL) {
    if (!pParamSet->async) {
      tsem_destroy(&pParamSet->rspSem);
    }
    taosMemoryFree(pParamSet);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  // if no more waiting rsp
  if (pParamSet->async) {
    // call async cb func
    if (pParamSet->automatic && tmq->commitCb) {
      tmq->commitCb(tmq, pParamSet->rspErr, tmq->commitCbUserParam);
    } else if (!pParamSet->automatic && pParamSet->userCb) {
      // sem post
      pParamSet->userCb(tmq, pParamSet->rspErr, pParamSet->userParam);
    }
    taosMemoryFree(pParamSet);
  } else {
    tsem_post(&pParamSet->rspSem);
  }

#if 0
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
#endif
  return 0;
}

409 410
int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
411 412
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
  // push into array
L
Liu Jicong 已提交
413
#if 0
414 415 416 417 418
  if (code == 0) {
    taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset);
  } else {
    taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
  }
L
Liu Jicong 已提交
419
#endif
L
Liu Jicong 已提交
420

L
Liu Jicong 已提交
421 422 423
  taosMemoryFree(pParam->pOffset);
  if (pBuf->pData) taosMemoryFree(pBuf->pData);

S
Shengliang Guan 已提交
424
  /*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
L
Liu Jicong 已提交
425 426
   * pOffset->version);*/

427
  // count down waiting rsp
L
Liu Jicong 已提交
428
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
429 430 431
  ASSERT(waitingRspNum >= 0);

  if (waitingRspNum == 0) {
432
    tmqCommitDone(pParamSet);
433 434 435 436
  }
  return 0;
}

L
Liu Jicong 已提交
437 438 439 440 441 442
static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pTopic, SMqCommitCbParamSet* pParamSet) {
  STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
  if (pOffset == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
443
  pOffset->val = pVg->currentOffset;
444

L
Liu Jicong 已提交
445 446 447 448
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pOffset->subKey, tmq->groupId, groupLen);
  pOffset->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);
L
Liu Jicong 已提交
449

L
Liu Jicong 已提交
450 451 452 453 454 455 456 457 458 459
  int32_t len;
  int32_t code;
  tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
  if (code < 0) {
    ASSERT(0);
    return -1;
  }
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
  if (buf == NULL) return -1;
  ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
460

L
Liu Jicong 已提交
461 462 463 464 465 466 467
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
  tEncodeSTqOffset(&encoder, pOffset);

  // build param
468
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
469 470 471 472 473 474 475 476 477 478 479 480 481 482
  pParam->params = pParamSet;
  pParam->pOffset = pOffset;

  // build send info
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
    return -1;
  }
  pMsgSendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = sizeof(SMsgHead) + len,
      .handle = NULL,
  };

S
Shengliang Guan 已提交
483 484
  tscDebug("consumer:%" PRId64 ", commit offset of %s on vgId:%d, offset is %" PRId64, tmq->consumerId, pOffset->subKey,
           pVg->vgId, pOffset->val.version);
L
Liu Jicong 已提交
485 486

  // TODO: put into cb
L
Liu Jicong 已提交
487
  pVg->committedOffset = pVg->currentOffset;
L
Liu Jicong 已提交
488 489 490 491

  pMsgSendInfo->requestId = generateRequestId();
  pMsgSendInfo->requestObjRefId = 0;
  pMsgSendInfo->param = pParam;
L
Liu Jicong 已提交
492
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
493
  pMsgSendInfo->fp = tmqCommitCb;
L
Liu Jicong 已提交
494 495 496
  pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
  // send msg

L
Liu Jicong 已提交
497 498 499
  atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
  atomic_add_fetch_32(&pParamSet->totalRspNum, 1);

L
Liu Jicong 已提交
500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
  return 0;
}

int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
  char*   topic;
  int32_t vgId;
  ASSERT(msg != NULL);
  if (TD_RES_TMQ(msg)) {
    SMqRspObj* pRspObj = (SMqRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
  } else if (TD_RES_TMQ_META(msg)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
    topic = pMetaRspObj->topic;
    vgId = pMetaRspObj->vgId;
  } else {
    return TSDB_CODE_TMQ_INVALID_MSG;
  }

  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
526 527
  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;
L
Liu Jicong 已提交
528 529 530 531 532 533
  pParamSet->automatic = 0;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

L
Liu Jicong 已提交
534 535
  int32_t code = -1;

L
Liu Jicong 已提交
536 537 538 539 540 541 542
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(pTopic->topicName, topic) != 0) continue;
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      if (pVg->vgId != vgId) continue;

L
Liu Jicong 已提交
543
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
L
Liu Jicong 已提交
544 545
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          goto FAIL;
L
Liu Jicong 已提交
546
        }
L
Liu Jicong 已提交
547
        goto HANDLE_RSP;
L
Liu Jicong 已提交
548 549
      }
    }
L
Liu Jicong 已提交
550
  }
L
Liu Jicong 已提交
551

L
Liu Jicong 已提交
552 553 554 555
HANDLE_RSP:
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
556 557 558
    return 0;
  }

L
Liu Jicong 已提交
559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
    return code;
  } else {
    code = 0;
  }

FAIL:
  if (code != 0 && async) {
    userCb(tmq, code, userParam);
  }
  return 0;
}

L
Liu Jicong 已提交
575 576
int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                       void* userParam) {
L
Liu Jicong 已提交
577 578 579 580 581 582
  int32_t code = -1;

  if (msg != NULL) {
    return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
  }

583 584 585 586 587
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
588 589 590 591

  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;

592 593 594 595 596 597
  pParamSet->automatic = automatic;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

598 599 600
  // init as 1 to prevent concurrency issue
  pParamSet->waitingRspNum = 1;

601 602
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
603

S
Shengliang Guan 已提交
604
    tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgNum %d", tmq->consumerId, pTopic->topicName,
L
Liu Jicong 已提交
605 606
             (int32_t)taosArrayGetSize(pTopic->vgs));

607
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
L
Liu Jicong 已提交
608 609
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);

S
Shengliang Guan 已提交
610 611
      tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgId:%d", tmq->consumerId, pTopic->topicName,
               pVg->vgId);
L
Liu Jicong 已提交
612

L
Liu Jicong 已提交
613 614 615
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
        tscDebug("consumer: %ld, vg:%d, current %ld, committed %ld", tmq->consumerId, pVg->vgId,
                 pVg->currentOffset.version, pVg->committedOffset.version);
L
Liu Jicong 已提交
616 617 618
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          continue;
        }
619 620 621 622
      }
    }
  }

L
Liu Jicong 已提交
623 624 625 626 627 628
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
    return 0;
  }

629 630 631 632 633 634
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
  ASSERT(waitingRspNum >= 0);
  if (waitingRspNum == 0) {
    tmqCommitDone(pParamSet);
  }

635 636 637 638
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
639
    taosMemoryFree(pParamSet);
640 641 642 643 644 645
  } else {
    code = 0;
  }

  if (code != 0 && async) {
    if (automatic) {
L
Liu Jicong 已提交
646
      tmq->commitCb(tmq, code, tmq->commitCbUserParam);
647
    } else {
L
Liu Jicong 已提交
648
      userCb(tmq, code, userParam);
649 650 651
    }
  }

L
Liu Jicong 已提交
652
#if 0
653 654 655 656
  if (!async) {
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
  }
L
Liu Jicong 已提交
657
#endif
658 659 660 661

  return 0;
}

662
void tmqAssignAskEpTask(void* param, void* tmrId) {
663 664 665 666 667 668 669 670 671
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
    *pTaskType = TMQ_DELAYED_TASK__ASK_EP;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
672 673 674
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
675 676 677 678 679 680 681 682 683
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
    *pTaskType = TMQ_DELAYED_TASK__COMMIT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
684 685 686
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
687 688 689 690 691 692 693 694 695
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
    *pTaskType = TMQ_DELAYED_TASK__REPORT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
696 697
}

698 699 700 701 702 703
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
  if (pMsg && pMsg->pData) taosMemoryFree(pMsg->pData);
  return 0;
}

void tmqSendHbReq(void* param, void* tmrId) {
704 705 706 707 708
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq == NULL) {
    return;
  }
709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737
  int64_t   consumerId = tmq->consumerId;
  int32_t   epoch = tmq->epoch;
  SMqHbReq* pReq = taosMemoryMalloc(sizeof(SMqHbReq));
  if (pReq == NULL) goto OVER;
  pReq->consumerId = consumerId;
  pReq->epoch = epoch;

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pReq);
  }
  sendInfo->msgInfo = (SDataBuf){
      .pData = pReq,
      .len = sizeof(SMqHbReq),
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = NULL;
  sendInfo->fp = tmqHbCb;
  sendInfo->msgType = TDMT_MND_MQ_HB;

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

OVER:
738
  taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer);
739 740
}

L
Liu Jicong 已提交
741 742 743 744 745 746 747 748
int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
  STaosQall* qall = taosAllocateQall();
  taosReadAllQitems(tmq->delayedTask, qall);
  while (1) {
    int8_t* pTaskType = NULL;
    taosGetQitem(qall, (void**)&pTaskType);
    if (pTaskType == NULL) break;

749
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
L
Liu Jicong 已提交
750
      tmqAskEp(tmq, true);
751 752 753 754 755

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
      *pRefId = tmq->refId;

      taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &tmq->epTimer);
L
Liu Jicong 已提交
756
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
L
Liu Jicong 已提交
757
      tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
758 759 760 761 762

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
      *pRefId = tmq->refId;

      taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId, tmqMgmt.timer, &tmq->commitTimer);
L
Liu Jicong 已提交
763 764 765 766
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    } else {
      ASSERT(0);
    }
L
Liu Jicong 已提交
767
    taosFreeQitem(pTaskType);
L
Liu Jicong 已提交
768 769 770 771 772
  }
  taosFreeQall(qall);
  return 0;
}

L
Liu Jicong 已提交
773
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
774
  SMqRspWrapper* msg = NULL;
L
Liu Jicong 已提交
775 776 777 778 779 780 781 782
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }

L
fix  
Liu Jicong 已提交
783
  msg = NULL;
L
Liu Jicong 已提交
784 785 786 787 788 789 790 791 792 793
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }
}

D
dapan1121 已提交
794
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
795 796 797 798 799
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
  tsem_post(&pParam->rspSem);
  return 0;
}
800

L
Liu Jicong 已提交
801
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
X
Xiaoyu Wang 已提交
802 803 804 805
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
806
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
807
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
808
  }
L
Liu Jicong 已提交
809
  return 0;
X
Xiaoyu Wang 已提交
810 811
}

L
Liu Jicong 已提交
812 813 814
int32_t tmq_unsubscribe(tmq_t* tmq) {
  tmq_list_t* lst = tmq_list_new();
  int32_t     rsp = tmq_subscribe(tmq, lst);
L
Liu Jicong 已提交
815 816
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
817 818
}

819 820
void tmqFreeImpl(void* handle) {
  tmq_t* tmq = (tmq_t*)handle;
L
Liu Jicong 已提交
821

822 823 824 825
  // TODO stop timer
  if (tmq->mqueue) taosCloseQueue(tmq->mqueue);
  if (tmq->delayedTask) taosCloseQueue(tmq->delayedTask);
  if (tmq->qall) taosFreeQall(tmq->qall);
L
Liu Jicong 已提交
826

827
  tsem_destroy(&tmq->rspSem);
L
Liu Jicong 已提交
828

829 830 831 832 833 834 835 836 837 838
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (pTopic->schema.nCols) taosMemoryFree(pTopic->schema.pSchema);
    int32_t vgSz = taosArrayGetSize(pTopic->vgs);
    taosArrayDestroy(pTopic->vgs);
  }
  taosArrayDestroy(tmq->clientTopics);
  taos_close_internal(tmq->pTscObj);
  taosMemoryFree(tmq);
L
Liu Jicong 已提交
839 840
}

L
Liu Jicong 已提交
841
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
L
Liu Jicong 已提交
842 843 844 845 846 847 848 849 850
  // init timer
  int8_t inited = atomic_val_compare_exchange_8(&tmqMgmt.inited, 0, 1);
  if (inited == 0) {
    tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
    if (tmqMgmt.timer == NULL) {
      atomic_store_8(&tmqMgmt.inited, 0);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
851
    tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
L
Liu Jicong 已提交
852 853
  }

L
Liu Jicong 已提交
854 855
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
L
Liu Jicong 已提交
856
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
857 858
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
859 860
    return NULL;
  }
L
Liu Jicong 已提交
861

L
Liu Jicong 已提交
862 863 864 865 866
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

  ASSERT(user);
  ASSERT(pass);
L
Liu Jicong 已提交
867
  ASSERT(conf->groupId[0]);
L
Liu Jicong 已提交
868

L
Liu Jicong 已提交
869 870 871 872 873 874
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
  pTmq->delayedTask = taosOpenQueue();

  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
L
Liu Jicong 已提交
875
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
876 877
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
878 879
    goto FAIL;
  }
L
Liu Jicong 已提交
880

L
Liu Jicong 已提交
881 882
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
883 884
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
885 886
  /*pTmq->epStatus = 0;*/
  /*pTmq->epSkipCnt = 0;*/
L
Liu Jicong 已提交
887

L
Liu Jicong 已提交
888 889 890
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
891
  pTmq->withTbName = conf->withTbName;
L
Liu Jicong 已提交
892
  pTmq->useSnapshot = conf->snapEnable;
L
Liu Jicong 已提交
893
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
894
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
895 896
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
897 898
  pTmq->resetOffsetCfg = conf->resetOffset;

899 900
  pTmq->hbBgEnable = conf->hbBgEnable;

L
Liu Jicong 已提交
901
  // assign consumerId
L
Liu Jicong 已提交
902
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
903

L
Liu Jicong 已提交
904 905
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
S
Shengliang Guan 已提交
906 907
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
908 909
    goto FAIL;
  }
L
Liu Jicong 已提交
910

L
Liu Jicong 已提交
911 912 913
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
S
Shengliang Guan 已提交
914 915
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
916 917 918
    tsem_destroy(&pTmq->rspSem);
    goto FAIL;
  }
L
Liu Jicong 已提交
919

920 921 922 923 924 925 926 927 928
  pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
  if (pTmq->refId < 0) {
    tmqFreeImpl(pTmq);
    return NULL;
  }

  int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
  *pRefId = pTmq->refId;

929
  if (pTmq->hbBgEnable) {
930
    pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
931 932
  }

S
Shengliang Guan 已提交
933
  tscInfo("consumer %" PRId64 " is setup, consumer group %s", pTmq->consumerId, pTmq->groupId);
L
Liu Jicong 已提交
934

935
  return pTmq;
L
Liu Jicong 已提交
936 937 938 939 940 941 942 943

FAIL:
  if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
  if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
  if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
  if (pTmq->qall) taosFreeQall(pTmq->qall);
  taosMemoryFree(pTmq);
  return NULL;
944 945
}

L
Liu Jicong 已提交
946
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
L
Liu Jicong 已提交
947 948 949 950 951
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
  SCMSubscribeReq req = {0};
  int32_t         code = -1;
952 953

  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
954
  tstrncpy(req.clientId, tmq->clientId, 256);
L
Liu Jicong 已提交
955
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
956
  req.topicNames = taosArrayInit(sz, sizeof(void*));
L
Liu Jicong 已提交
957
  if (req.topicNames == NULL) goto FAIL;
958

L
Liu Jicong 已提交
959 960
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
961 962

    SName name = {0};
L
Liu Jicong 已提交
963
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
964

L
Liu Jicong 已提交
965 966 967
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
968
    }
L
Liu Jicong 已提交
969
    tNameExtractFullName(&name, topicFName);
970

L
Liu Jicong 已提交
971 972 973
    tscDebug("subscribe topic: %s", topicFName);

    taosArrayPush(req.topicNames, &topicFName);
974 975
  }

L
Liu Jicong 已提交
976 977 978 979
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) goto FAIL;

980 981 982
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

983
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
984
  if (sendInfo == NULL) goto FAIL;
985

X
Xiaoyu Wang 已提交
986
  SMqSubscribeCbParam param = {
L
Liu Jicong 已提交
987
      .rspErr = 0,
988 989
      .refId = tmq->refId,
      .epoch = tmq->epoch,
X
Xiaoyu Wang 已提交
990
  };
L
Liu Jicong 已提交
991

L
Liu Jicong 已提交
992 993 994
  if (tsem_init(&param.rspSem, 0, 0) != 0) goto FAIL;

  sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
995 996 997 998
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
999

L
Liu Jicong 已提交
1000 1001
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1002 1003
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
1004 1005
  sendInfo->msgType = TDMT_MND_SUBSCRIBE;

1006 1007 1008 1009 1010
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
1011 1012 1013
  // avoid double free if msg is sent
  buf = NULL;

L
Liu Jicong 已提交
1014 1015
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
1016

L
Liu Jicong 已提交
1017 1018 1019
  code = param.rspErr;
  if (code != 0) goto FAIL;

L
Liu Jicong 已提交
1020
  int32_t retryCnt = 0;
L
Liu Jicong 已提交
1021
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
L
Liu Jicong 已提交
1022 1023 1024
    if (retryCnt++ > 10) {
      goto FAIL;
    }
L
fix  
Liu Jicong 已提交
1025
    tscDebug("consumer not ready, retry");
L
Liu Jicong 已提交
1026 1027
    taosMsleep(500);
  }
1028

1029 1030
  // init ep timer
  if (tmq->epTimer == NULL) {
1031 1032 1033
    int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId1 = tmq->refId;
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, pRefId1, tmqMgmt.timer);
1034
  }
L
Liu Jicong 已提交
1035 1036

  // init auto commit timer
1037
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
1038 1039 1040
    int64_t* pRefId2 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId2 = tmq->refId;
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId2, tmqMgmt.timer);
L
Liu Jicong 已提交
1041 1042
  }

L
Liu Jicong 已提交
1043 1044 1045
  code = 0;
FAIL:
  if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
1046
  if (code != 0 && buf) {
L
Liu Jicong 已提交
1047 1048 1049
    taosMemoryFree(buf);
  }
  return code;
1050 1051
}

L
Liu Jicong 已提交
1052
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
1053
  //
1054
  conf->commitCb = cb;
L
Liu Jicong 已提交
1055
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
1056
}
1057

D
dapan1121 已提交
1058
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1059 1060
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
1061
  SMqClientTopic* pTopic = pParam->pTopic;
1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073

  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
  if (tmq == NULL) {
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
    taosMemoryFree(pMsg->pData);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  int32_t epoch = pParam->epoch;
  int32_t vgId = pParam->vgId;
L
Liu Jicong 已提交
1074
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
1075
  if (code != 0) {
L
Liu Jicong 已提交
1076
    tscWarn("msg discard from vgId:%d, epoch %d, since %s", vgId, epoch, terrstr());
L
Liu Jicong 已提交
1077
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1078 1079 1080 1081
    if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
      goto CREATE_MSG_FAIL;
    }
L
Liu Jicong 已提交
1082 1083 1084 1085
    if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
      if (pRspWrapper == NULL) {
        taosMemoryFree(pMsg->pData);
S
Shengliang Guan 已提交
1086
        tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
Liu Jicong 已提交
1087 1088 1089 1090 1091 1092 1093 1094
        goto CREATE_MSG_FAIL;
      }
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      /*pRspWrapper->vgHandle = pVg;*/
      /*pRspWrapper->topicHandle = pTopic;*/
      taosWriteQitem(tmq->mqueue, pRspWrapper);
      tsem_post(&tmq->rspSem);
    }
L
fix txn  
Liu Jicong 已提交
1095
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1096 1097
  }

X
Xiaoyu Wang 已提交
1098 1099 1100
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
1101
    // do not write into queue since updating epoch reset
S
Shengliang Guan 已提交
1102
    tscWarn("msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d", vgId, msgEpoch,
L
Liu Jicong 已提交
1103
            tmqEpoch);
1104
    tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1105
    taosMemoryFree(pMsg->pData);
X
Xiaoyu Wang 已提交
1106 1107 1108 1109
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
S
Shengliang Guan 已提交
1110
    tscWarn("mismatch rsp from vgId:%d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
X
Xiaoyu Wang 已提交
1111 1112
  }

L
Liu Jicong 已提交
1113 1114 1115
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

1116
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1117
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1118
    taosMemoryFree(pMsg->pData);
S
Shengliang Guan 已提交
1119
    tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
fix txn  
Liu Jicong 已提交
1120
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1121
  }
L
Liu Jicong 已提交
1122

L
Liu Jicong 已提交
1123
  pRspWrapper->tmqRspType = rspType;
L
Liu Jicong 已提交
1124 1125
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
1126

L
Liu Jicong 已提交
1127
  if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1128 1129 1130
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
wmmhello's avatar
wmmhello 已提交
1131
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1132
    memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1133 1134
  } else {
    ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP);
1135 1136 1137 1138
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqMetaRsp(&decoder, &pRspWrapper->metaRsp);
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1139
    memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1140
  }
L
Liu Jicong 已提交
1141

L
Liu Jicong 已提交
1142
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1143

S
Shengliang Guan 已提交
1144 1145 1146
  tscDebug("consumer:%" PRId64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d",
           tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version,
           rspType);
L
fix  
Liu Jicong 已提交
1147

L
Liu Jicong 已提交
1148
  taosWriteQitem(tmq->mqueue, pRspWrapper);
1149
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1150

L
Liu Jicong 已提交
1151
  return 0;
L
fix txn  
Liu Jicong 已提交
1152
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1153
  if (epoch == tmq->epoch) {
L
Liu Jicong 已提交
1154 1155
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
1156
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1157
  return -1;
1158 1159
}

L
Liu Jicong 已提交
1160
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
1161 1162 1163 1164
  bool set = false;

  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
S
Shengliang Guan 已提交
1165
  tscDebug("consumer:%" PRId64 ", update ep epoch %d to epoch %d, topic num:%d", tmq->consumerId, tmq->epoch, epoch,
1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183
           topicNumGet);

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
S
Shengliang Guan 已提交
1184
      tscDebug("consumer:%" PRId64 ", new vg num: %d", tmq->consumerId, vgNumCur);
1185 1186 1187
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
        sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
L
Liu Jicong 已提交
1188
        char buf[80];
L
Liu Jicong 已提交
1189
        tFormatOffset(buf, 80, &pVgCur->currentOffset);
L
Liu Jicong 已提交
1190 1191
        tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch,
                 pVgCur->vgId, vgKey, buf);
L
Liu Jicong 已提交
1192
        taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal));
1193 1194 1195 1196 1197 1198 1199 1200
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
    topic.schema = pTopicEp->schema;
1201
    tstrncpy(topic.topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
1202 1203
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);

S
Shengliang Guan 已提交
1204
    tscDebug("consumer:%" PRId64 ", update topic: %s", tmq->consumerId, topic.topicName);
1205 1206 1207 1208 1209 1210

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
L
Liu Jicong 已提交
1211 1212
      STqOffsetVal* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      STqOffsetVal  offsetNew = {.type = tmq->resetOffsetCfg};
1213
      if (pOffset != NULL) {
L
Liu Jicong 已提交
1214
        offsetNew = *pOffset;
1215 1216 1217 1218
      }

      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1219
          .currentOffset = offsetNew,
1220 1221 1222 1223 1224 1225 1226 1227 1228 1229
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
          .vgSkipCnt = 0,
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
    taosArrayPush(newTopics, &topic);
  }
1230 1231 1232 1233 1234 1235 1236
  if (tmq->clientTopics) {
    int32_t sz = taosArrayGetSize(tmq->clientTopics);
    for (int32_t i = 0; i < sz; i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
      if (pTopic->schema.nCols) taosMemoryFree(pTopic->schema.pSchema);
      int32_t vgSz = taosArrayGetSize(pTopic->vgs);
      taosArrayDestroy(pTopic->vgs);
L
Liu Jicong 已提交
1237
    }
1238
    taosArrayDestroy(tmq->clientTopics);
X
Xiaoyu Wang 已提交
1239
  }
L
Liu Jicong 已提交
1240
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
1241
  tmq->clientTopics = newTopics;
1242

1243 1244 1245 1246
  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
1247

X
Xiaoyu Wang 已提交
1248 1249 1250 1251
  atomic_store_32(&tmq->epoch, epoch);
  return set;
}

D
dapan1121 已提交
1252
int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
1253
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1254
  int8_t           async = pParam->async;
1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267
  tmq_t*           tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);

  if (tmq == NULL) {
    if (!async) {
      tsem_destroy(&pParam->rspSem);
    } else {
      taosMemoryFree(pParam);
    }
    taosMemoryFree(pMsg->pData);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

L
Liu Jicong 已提交
1268
  pParam->code = code;
1269
  if (code != 0) {
S
Shengliang Guan 已提交
1270
    tscError("consumer:%" PRId64 ", get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->async);
L
Liu Jicong 已提交
1271
    goto END;
1272
  }
L
Liu Jicong 已提交
1273

L
Liu Jicong 已提交
1274
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1275
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1276
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1277 1278
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
S
Shengliang Guan 已提交
1279
  tscDebug("consumer:%" PRId64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1280 1281
  if (head->epoch <= epoch) {
    goto END;
1282
  }
L
Liu Jicong 已提交
1283

L
Liu Jicong 已提交
1284
  if (!async) {
L
Liu Jicong 已提交
1285 1286
    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
S
Shengliang Guan 已提交
1287 1288
    /*printf("rsp epoch %" PRId64 " sz %" PRId64 "\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %" PRId64 " sz %" PRId64 "\n", tmq->epoch, tmq->clientTopics->size);*/
L
Liu Jicong 已提交
1289
    tmqUpdateEp(tmq, head->epoch, &rsp);
L
Liu Jicong 已提交
1290
    tDeleteSMqAskEpRsp(&rsp);
X
Xiaoyu Wang 已提交
1291
  } else {
1292
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1293
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1294
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1295 1296
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1297
    }
L
Liu Jicong 已提交
1298 1299 1300
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1301
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1302

L
Liu Jicong 已提交
1303
    taosWriteQitem(tmq->mqueue, pWrapper);
1304
    tsem_post(&tmq->rspSem);
1305
  }
L
Liu Jicong 已提交
1306 1307

END:
L
Liu Jicong 已提交
1308
  /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1309
  if (!async) {
L
Liu Jicong 已提交
1310
    tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
1311 1312
  } else {
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
1313
  }
L
Liu Jicong 已提交
1314
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1315
  return code;
1316 1317
}

L
Liu Jicong 已提交
1318
int32_t tmqAskEp(tmq_t* tmq, bool async) {
L
Liu Jicong 已提交
1319
  int32_t code = 0;
L
Liu Jicong 已提交
1320
#if 0
L
Liu Jicong 已提交
1321
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
L
fix txn  
Liu Jicong 已提交
1322
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
1323
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
S
Shengliang Guan 已提交
1324
    tscTrace("consumer:%" PRId64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
1325
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
1326
  }
L
temp  
Liu Jicong 已提交
1327
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
1328
#endif
L
Liu Jicong 已提交
1329
  int32_t      tlen = sizeof(SMqAskEpReq);
L
Liu Jicong 已提交
1330
  SMqAskEpReq* req = taosMemoryCalloc(1, tlen);
L
Liu Jicong 已提交
1331
  if (req == NULL) {
L
Liu Jicong 已提交
1332
    tscError("failed to malloc get subscribe ep buf");
L
Liu Jicong 已提交
1333
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1334
    return -1;
L
Liu Jicong 已提交
1335
  }
L
Liu Jicong 已提交
1336 1337 1338
  req->consumerId = htobe64(tmq->consumerId);
  req->epoch = htonl(tmq->epoch);
  strcpy(req->cgroup, tmq->groupId);
1339

L
Liu Jicong 已提交
1340
  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
1341 1342
  if (pParam == NULL) {
    tscError("failed to malloc subscribe param");
wafwerar's avatar
wafwerar 已提交
1343
    taosMemoryFree(req);
L
Liu Jicong 已提交
1344
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1345
    return -1;
L
Liu Jicong 已提交
1346
  }
1347 1348
  pParam->refId = tmq->refId;
  pParam->epoch = tmq->epoch;
L
Liu Jicong 已提交
1349
  pParam->async = async;
X
Xiaoyu Wang 已提交
1350
  tsem_init(&pParam->rspSem, 0, 0);
1351

1352
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1353 1354
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1355 1356
    taosMemoryFree(pParam);
    taosMemoryFree(req);
L
Liu Jicong 已提交
1357
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1358 1359 1360 1361 1362 1363 1364 1365 1366 1367
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
      .pData = req,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
L
Liu Jicong 已提交
1368 1369 1370
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1371
  sendInfo->msgType = TDMT_MND_MQ_ASK_EP;
1372

L
Liu Jicong 已提交
1373 1374
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

S
Shengliang Guan 已提交
1375
  tscDebug("consumer:%" PRId64 ", ask ep", tmq->consumerId);
L
add log  
Liu Jicong 已提交
1376

L
Liu Jicong 已提交
1377 1378
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1379

L
Liu Jicong 已提交
1380
  if (!async) {
L
Liu Jicong 已提交
1381 1382 1383 1384 1385
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }
  return code;
1386 1387
}

1388
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1389
  SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq));
1390 1391 1392
  if (pReq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
1393

L
Liu Jicong 已提交
1394 1395 1396
  /*strcpy(pReq->topic, pTopic->topicName);*/
  /*strcpy(pReq->cgroup, tmq->groupId);*/

L
Liu Jicong 已提交
1397 1398 1399 1400
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1401

1402
  pReq->withTbName = tmq->withTbName;
1403
  pReq->timeout = timeout;
L
Liu Jicong 已提交
1404
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1405
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1406
  /*pReq->currentOffset = reqOffset;*/
L
Liu Jicong 已提交
1407
  pReq->reqOffset = pVg->currentOffset;
L
Liu Jicong 已提交
1408
  pReq->reqId = generateRequestId();
1409

L
Liu Jicong 已提交
1410 1411
  pReq->useSnapshot = tmq->useSnapshot;

1412
  pReq->head.vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
1413
  pReq->head.contLen = htonl(sizeof(SMqPollReq));
1414 1415 1416
  return pReq;
}

L
Liu Jicong 已提交
1417 1418
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
L
Liu Jicong 已提交
1419
  pRspObj->resType = RES_TYPE__TMQ_META;
L
Liu Jicong 已提交
1420 1421 1422 1423 1424 1425 1426 1427
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

L
Liu Jicong 已提交
1428 1429 1430
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1431 1432
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1433
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1434
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1435
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
L
Liu Jicong 已提交
1436

L
Liu Jicong 已提交
1437 1438
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
L
Liu Jicong 已提交
1439
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1440 1441
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1442

L
Liu Jicong 已提交
1443
  return pRspObj;
X
Xiaoyu Wang 已提交
1444 1445
}

1446
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1447
  /*tscDebug("call poll");*/
X
Xiaoyu Wang 已提交
1448 1449 1450 1451 1452 1453
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
      if (vgStatus != TMQ_VG_STATUS__IDLE) {
L
Liu Jicong 已提交
1454
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
L
Liu Jicong 已提交
1455 1456
        tscTrace("consumer:%" PRId64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId,
                 vgSkipCnt);
X
Xiaoyu Wang 已提交
1457
        continue;
L
Liu Jicong 已提交
1458
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1459 1460 1461 1462
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
S
Shengliang Guan 已提交
1463
        tscDebug("consumer:%" PRId64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId);
L
temp  
Liu Jicong 已提交
1464 1465
        }
#endif
X
Xiaoyu Wang 已提交
1466
      }
L
Liu Jicong 已提交
1467
      atomic_store_32(&pVg->vgSkipCnt, 0);
1468
      SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, timeout, pTopic, pVg);
X
Xiaoyu Wang 已提交
1469 1470
      if (pReq == NULL) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1471
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1472 1473
        return -1;
      }
wafwerar's avatar
wafwerar 已提交
1474
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1475
      if (pParam == NULL) {
wafwerar's avatar
wafwerar 已提交
1476
        taosMemoryFree(pReq);
X
Xiaoyu Wang 已提交
1477
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1478
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1479 1480
        return -1;
      }
1481 1482 1483
      pParam->refId = tmq->refId;
      pParam->epoch = tmq->epoch;

L
Liu Jicong 已提交
1484
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1485
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1486
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1487

1488
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1489
      if (sendInfo == NULL) {
wafwerar's avatar
wafwerar 已提交
1490 1491
        taosMemoryFree(pReq);
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1492
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1493
        tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1494 1495 1496 1497
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1498
          .pData = pReq,
L
Liu Jicong 已提交
1499
          .len = sizeof(SMqPollReq),
X
Xiaoyu Wang 已提交
1500 1501
          .handle = NULL,
      };
L
Liu Jicong 已提交
1502
      sendInfo->requestId = pReq->reqId;
X
Xiaoyu Wang 已提交
1503
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1504
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1505
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1506
      sendInfo->msgType = TDMT_VND_CONSUME;
X
Xiaoyu Wang 已提交
1507 1508

      int64_t transporterId = 0;
L
fix  
Liu Jicong 已提交
1509
      /*printf("send poll\n");*/
L
Liu Jicong 已提交
1510

1511
      char offsetFormatBuf[80];
L
Liu Jicong 已提交
1512
      tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffset);
L
Liu Jicong 已提交
1513 1514
      tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64,
               tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId);
S
Shengliang Guan 已提交
1515
      /*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/
X
Xiaoyu Wang 已提交
1516 1517 1518 1519 1520 1521 1522 1523
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
  return 0;
}

L
Liu Jicong 已提交
1524 1525
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1526
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1527 1528
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1529
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
L
Liu Jicong 已提交
1530
      tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1531
      /*tmqClearUnhandleMsg(tmq);*/
X
Xiaoyu Wang 已提交
1532 1533 1534 1535 1536 1537 1538 1539 1540 1541
      *pReset = true;
    } else {
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
1542
void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
X
Xiaoyu Wang 已提交
1543
  while (1) {
L
Liu Jicong 已提交
1544 1545 1546
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1547
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1548 1549
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
      if (rspWrapper == NULL) return NULL;
X
Xiaoyu Wang 已提交
1550 1551
    }

L
Liu Jicong 已提交
1552 1553 1554 1555 1556
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(rspWrapper);
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
      return NULL;
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1557
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1558
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
1559
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1560
      if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1561
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1562
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1563
         * rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1564
        pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;
X
Xiaoyu Wang 已提交
1565
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1566
        if (pollRspWrapper->dataRsp.blockNum == 0) {
L
Liu Jicong 已提交
1567 1568
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1569 1570
          continue;
        }
L
Liu Jicong 已提交
1571
        // build rsp
L
Liu Jicong 已提交
1572
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1573
        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1574
        return pRsp;
X
Xiaoyu Wang 已提交
1575
      } else {
L
Liu Jicong 已提交
1576 1577 1578 1579 1580 1581 1582
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
                 pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
        taosFreeQitem(pollRspWrapper);
      }
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1583
      if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1584
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1585
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1586
         * rspMsg->msg.rspOffset);*/
wmmhello's avatar
wmmhello 已提交
1587
        pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset;
L
Liu Jicong 已提交
1588 1589
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // build rsp
L
Liu Jicong 已提交
1590
        SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1591 1592 1593 1594
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
L
Liu Jicong 已提交
1595
                 pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1596
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1597 1598
      }
    } else {
L
fix  
Liu Jicong 已提交
1599
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1600
      bool reset = false;
L
Liu Jicong 已提交
1601 1602
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1603
      if (pollIfReset && reset) {
S
Shengliang Guan 已提交
1604
        tscDebug("consumer:%" PRId64 ", reset and repoll", tmq->consumerId);
1605
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
1606 1607 1608 1609 1610
      }
    }
  }
}

1611
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1612
  /*tscDebug("call poll1");*/
L
Liu Jicong 已提交
1613 1614
  void*   rspObj;
  int64_t startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1615

1616 1617 1618
#if 0
  tmqHandleAllDelayedTask(tmq);
  tmqPollImpl(tmq, timeout);
1619
  rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1620 1621
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1622
  }
1623
#endif
X
Xiaoyu Wang 已提交
1624

1625
  // in no topic status, delayed task also need to be processed
L
Liu Jicong 已提交
1626
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1627 1628 1629
    return NULL;
  }

L
Liu Jicong 已提交
1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
    int32_t retryCnt = 0;
    while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
      if (retryCnt++ > 10) {
        return NULL;
      }
      tscDebug("consumer not ready, retry");
      taosMsleep(500);
    }
  }

X
Xiaoyu Wang 已提交
1641
  while (1) {
L
Liu Jicong 已提交
1642
    tmqHandleAllDelayedTask(tmq);
1643
    if (tmqPollImpl(tmq, timeout) < 0) return NULL;
L
Liu Jicong 已提交
1644

1645
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1646 1647
    if (rspObj) {
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
1648 1649
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      return NULL;
X
Xiaoyu Wang 已提交
1650
    }
1651
    if (timeout != -1) {
X
Xiaoyu Wang 已提交
1652
      int64_t endTime = taosGetTimestampMs();
1653
      int64_t leftTime = endTime - startTime;
1654
      if (leftTime > timeout) {
S
Shengliang Guan 已提交
1655
        tscDebug("consumer:%" PRId64 ", (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
X
Xiaoyu Wang 已提交
1656 1657
        return NULL;
      }
1658
      tsem_timewait(&tmq->rspSem, leftTime * 1000);
L
Liu Jicong 已提交
1659 1660 1661
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
      tsem_timewait(&tmq->rspSem, 500 * 1000);
X
Xiaoyu Wang 已提交
1662 1663 1664 1665
    }
  }
}

L
Liu Jicong 已提交
1666
int32_t tmq_consumer_close(tmq_t* tmq) {
1667
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
L
Liu Jicong 已提交
1668 1669
    int32_t rsp = tmq_commit_sync(tmq, NULL);
    if (rsp != 0) {
L
Liu Jicong 已提交
1670
      return rsp;
1671 1672
    }

L
Liu Jicong 已提交
1673
    int32_t     retryCnt = 0;
1674
    tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
1675 1676 1677 1678 1679 1680 1681 1682 1683 1684
    while (1) {
      rsp = tmq_subscribe(tmq, lst);
      if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
        break;
      } else {
        retryCnt++;
        taosMsleep(500);
      }
    }

1685
    tmq_list_destroy(lst);
1686

L
Liu Jicong 已提交
1687 1688
    /*return rsp;*/
    return 0;
L
Liu Jicong 已提交
1689
  }
1690
  taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
L
Liu Jicong 已提交
1691
  return 0;
1692
}
L
Liu Jicong 已提交
1693

L
Liu Jicong 已提交
1694 1695
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
1696
    return "success";
L
Liu Jicong 已提交
1697
  } else if (err == -1) {
L
Liu Jicong 已提交
1698 1699 1700
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
1701 1702
  }
}
L
Liu Jicong 已提交
1703

L
Liu Jicong 已提交
1704 1705 1706 1707
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
wmmhello's avatar
wmmhello 已提交
1708 1709 1710 1711
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DELETE) {
      return TMQ_RES_DATA;
    }
L
Liu Jicong 已提交
1712 1713 1714 1715 1716 1717
    return TMQ_RES_TABLE_META;
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
1718
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
1719 1720
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
1721
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1722 1723 1724
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1725 1726 1727 1728 1729
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1730 1731 1732 1733
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1734 1735 1736
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1737 1738 1739 1740 1741
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1742 1743 1744 1745
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
1746 1747 1748
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
L
Liu Jicong 已提交
1749 1750 1751 1752
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
1753 1754 1755 1756 1757 1758 1759 1760

const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
1761
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
L
Liu Jicong 已提交
1762 1763 1764
  }
  return NULL;
}
1765

L
Liu Jicong 已提交
1766
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
1767
  //
L
Liu Jicong 已提交
1768
  tmqCommitInner(tmq, msg, 0, 1, cb, param);
L
Liu Jicong 已提交
1769 1770
}

1771 1772
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) {
  //
L
Liu Jicong 已提交
1773
  return tmqCommitInner(tmq, msg, 0, 0, NULL, NULL);
1774
}