tmq.c 51.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "cJSON.h"
17 18 19
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22 23
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
24
#include "tqueue.h"
25
#include "tref.h"
L
Liu Jicong 已提交
26 27
#include "ttimer.h"

L
Liu Jicong 已提交
28 29 30
int32_t tmqAskEp(tmq_t* tmq, bool async);

typedef struct {
31 32 33
  int8_t  inited;
  tmr_h   timer;
  int32_t rsetId;
L
Liu Jicong 已提交
34 35 36
} SMqMgmt;

static SMqMgmt tmqMgmt = {0};
37

L
Liu Jicong 已提交
38 39 40 41 42 43
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
44 45 46
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
47 48
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
49
struct tmq_list_t {
L
Liu Jicong 已提交
50
  SArray container;
L
Liu Jicong 已提交
51
};
L
Liu Jicong 已提交
52

L
Liu Jicong 已提交
53
struct tmq_conf_t {
54 55 56 57 58 59 60 61 62 63
  char    clientId[256];
  char    groupId[TSDB_CGROUP_LEN];
  int8_t  autoCommit;
  int8_t  resetOffset;
  int8_t  withTbName;
  int8_t  ssEnable;
  int32_t ssBatchSize;

  bool hbBgEnable;

64 65 66 67 68
  uint16_t       port;
  int32_t        autoCommitInterval;
  char*          ip;
  char*          user;
  char*          pass;
69
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
70
  void*          commitCbUserParam;
L
Liu Jicong 已提交
71 72 73
};

struct tmq_t {
74
  int64_t refId;
L
Liu Jicong 已提交
75
  // conf
76 77 78 79 80 81 82 83 84 85 86
  char    groupId[TSDB_CGROUP_LEN];
  char    clientId[256];
  int8_t  withTbName;
  int8_t  useSnapshot;
  int8_t  autoCommit;
  int32_t autoCommitInterval;
  int32_t resetOffsetCfg;
  int64_t consumerId;

  bool hbBgEnable;

L
Liu Jicong 已提交
87 88
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
89 90 91 92

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
93 94
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
95
  int32_t epSkipCnt;
L
Liu Jicong 已提交
96
#endif
L
Liu Jicong 已提交
97 98
  int64_t pollCnt;

L
Liu Jicong 已提交
99
  // timer
100 101
  tmr_h hbLiveTimer;
  tmr_h epTimer;
L
Liu Jicong 已提交
102 103 104
  tmr_h reportTimer;
  tmr_h commitTimer;

L
Liu Jicong 已提交
105 106 107 108
  // connection
  STscObj* pTscObj;

  // container
L
Liu Jicong 已提交
109
  SArray*     clientTopics;  // SArray<SMqClientTopic>
L
Liu Jicong 已提交
110
  STaosQueue* mqueue;        // queue of rsp
L
Liu Jicong 已提交
111
  STaosQall*  qall;
L
Liu Jicong 已提交
112 113 114 115
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit

  // ctl
  tsem_t rspSem;
L
Liu Jicong 已提交
116 117
};

X
Xiaoyu Wang 已提交
118 119 120 121 122 123 124 125
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
126
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
127
  TMQ_CONSUMER_STATUS__RECOVER,
L
Liu Jicong 已提交
128 129
};

L
Liu Jicong 已提交
130
enum {
131
  TMQ_DELAYED_TASK__ASK_EP = 1,
L
Liu Jicong 已提交
132 133 134 135
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

L
Liu Jicong 已提交
136
typedef struct {
137 138 139
  // statistics
  int64_t pollCnt;
  // offset
L
Liu Jicong 已提交
140 141
  STqOffsetVal committedOffset;
  STqOffsetVal currentOffset;
L
Liu Jicong 已提交
142
  // connection info
143
  int32_t vgId;
X
Xiaoyu Wang 已提交
144
  int32_t vgStatus;
L
Liu Jicong 已提交
145
  int32_t vgSkipCnt;
146 147 148
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
149
typedef struct {
150
  // subscribe info
151 152
  char topicName[TSDB_TOPIC_FNAME_LEN];
  char db[TSDB_DB_FNAME_LEN];
L
Liu Jicong 已提交
153 154 155

  SArray* vgs;  // SArray<SMqClientVg>

L
Liu Jicong 已提交
156
  SSchemaWrapper schema;
157 158
} SMqClientTopic;

L
Liu Jicong 已提交
159 160 161 162 163
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
164
  union {
L
Liu Jicong 已提交
165 166
    SMqDataRsp dataRsp;
    SMqMetaRsp metaRsp;
L
Liu Jicong 已提交
167
  };
L
Liu Jicong 已提交
168 169
} SMqPollRspWrapper;

L
Liu Jicong 已提交
170
typedef struct {
171 172
  int64_t refId;
  int32_t epoch;
L
Liu Jicong 已提交
173 174
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
175
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
176

L
Liu Jicong 已提交
177
typedef struct {
178 179
  int64_t refId;
  int32_t epoch;
L
Liu Jicong 已提交
180
  int32_t code;
L
Liu Jicong 已提交
181
  int32_t async;
X
Xiaoyu Wang 已提交
182
  tsem_t  rspSem;
183 184
} SMqAskEpCbParam;

L
Liu Jicong 已提交
185
typedef struct {
186 187
  int64_t         refId;
  int32_t         epoch;
L
Liu Jicong 已提交
188
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
189
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
190
  int32_t         vgId;
L
Liu Jicong 已提交
191
  tsem_t          rspSem;
X
Xiaoyu Wang 已提交
192
} SMqPollCbParam;
193

194
typedef struct {
195 196
  int64_t        refId;
  int32_t        epoch;
L
Liu Jicong 已提交
197 198
  int8_t         automatic;
  int8_t         async;
L
Liu Jicong 已提交
199 200
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
L
Liu Jicong 已提交
201
  int32_t        rspErr;
202
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
203 204 205 206
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
  void*  userParam;
  tsem_t rspSem;
207 208 209 210 211
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
  STqOffset*           pOffset;
212
} SMqCommitCbParam;
213

214
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
215
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
216
  conf->withTbName = false;
L
Liu Jicong 已提交
217
  conf->autoCommit = true;
L
Liu Jicong 已提交
218
  conf->autoCommitInterval = 5000;
X
Xiaoyu Wang 已提交
219
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
220
  conf->hbBgEnable = true;
221 222 223
  return conf;
}

L
Liu Jicong 已提交
224
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
225 226 227 228 229 230
  if (conf) {
    if (conf->ip) taosMemoryFree(conf->ip);
    if (conf->user) taosMemoryFree(conf->user);
    if (conf->pass) taosMemoryFree(conf->pass);
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
231 232 233
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
234 235
  if (strcmp(key, "group.id") == 0) {
    strcpy(conf->groupId, value);
L
Liu Jicong 已提交
236
    return TMQ_CONF_OK;
237
  }
L
Liu Jicong 已提交
238

239 240
  if (strcmp(key, "client.id") == 0) {
    strcpy(conf->clientId, value);
L
Liu Jicong 已提交
241 242
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
243

L
Liu Jicong 已提交
244 245
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
246
      conf->autoCommit = true;
L
Liu Jicong 已提交
247 248
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
249
      conf->autoCommit = false;
L
Liu Jicong 已提交
250 251 252 253
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
254
  }
L
Liu Jicong 已提交
255

L
Liu Jicong 已提交
256 257 258 259 260
  if (strcmp(key, "auto.commit.interval.ms") == 0) {
    conf->autoCommitInterval = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
261 262 263 264 265 266 267 268 269 270 271 272 273 274
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
275

276 277
  if (strcmp(key, "msg.with.table.name") == 0) {
    if (strcmp(value, "true") == 0) {
278
      conf->withTbName = true;
L
Liu Jicong 已提交
279
      return TMQ_CONF_OK;
280
    } else if (strcmp(value, "false") == 0) {
281
      conf->withTbName = false;
L
Liu Jicong 已提交
282
      return TMQ_CONF_OK;
283 284 285 286 287
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
288
  if (strcmp(key, "experimental.snapshot.enable") == 0) {
L
Liu Jicong 已提交
289
    if (strcmp(value, "true") == 0) {
290 291 292 293 294 295 296 297 298 299 300 301 302
      conf->ssEnable = true;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
      conf->ssEnable = false;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

  if (strcmp(key, "enable.heartbeat.background") == 0) {
    if (strcmp(value, "true") == 0) {
      conf->hbBgEnable = true;
L
Liu Jicong 已提交
303 304
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
305
      conf->hbBgEnable = false;
L
Liu Jicong 已提交
306 307 308 309
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
310
    return TMQ_CONF_OK;
L
Liu Jicong 已提交
311 312
  }

L
Liu Jicong 已提交
313
  if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
314
    conf->ssBatchSize = atoi(value);
L
Liu Jicong 已提交
315 316 317
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
318
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
319 320 321
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
322
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
323 324 325
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
326
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
327 328 329
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
330
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
331 332 333
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
334
  if (strcmp(key, "td.connect.db") == 0) {
335
    /*conf->db = strdup(value);*/
L
Liu Jicong 已提交
336 337 338
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
339
  return TMQ_CONF_UNKNOWN;
340 341 342
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
343 344
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
345 346
}

L
Liu Jicong 已提交
347 348
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
349 350 351 352 353
  if (src == NULL || src[0] == 0) return -1;
  char* topic = strdup(src);
  if (topic[0] != '`') {
    strtolower(topic, src);
  }
L
fix  
Liu Jicong 已提交
354
  if (taosArrayPush(container, &topic) == NULL) return -1;
355 356 357
  return 0;
}

L
Liu Jicong 已提交
358
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
359
  SArray* container = &list->container;
L
Liu Jicong 已提交
360
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
361 362
}

L
Liu Jicong 已提交
363 364 365 366 367 368 369 370 371 372
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

L
Liu Jicong 已提交
373 374 375 376
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408
int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParamSet->refId);
  if (tmq == NULL) {
    if (!pParamSet->async) {
      tsem_destroy(&pParamSet->rspSem);
    }
    taosMemoryFree(pParamSet);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  // if no more waiting rsp
  if (pParamSet->async) {
    // call async cb func
    if (pParamSet->automatic && tmq->commitCb) {
      tmq->commitCb(tmq, pParamSet->rspErr, tmq->commitCbUserParam);
    } else if (!pParamSet->automatic && pParamSet->userCb) {
      // sem post
      pParamSet->userCb(tmq, pParamSet->rspErr, pParamSet->userParam);
    }
    taosMemoryFree(pParamSet);
  } else {
    tsem_post(&pParamSet->rspSem);
  }

#if 0
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
#endif
  return 0;
}

409 410
int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
411 412
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
  // push into array
L
Liu Jicong 已提交
413
#if 0
414 415 416 417 418
  if (code == 0) {
    taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset);
  } else {
    taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
  }
L
Liu Jicong 已提交
419
#endif
L
Liu Jicong 已提交
420

L
Liu Jicong 已提交
421 422 423
  taosMemoryFree(pParam->pOffset);
  if (pBuf->pData) taosMemoryFree(pBuf->pData);

S
Shengliang Guan 已提交
424
  /*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
L
Liu Jicong 已提交
425 426
   * pOffset->version);*/

427
  // count down waiting rsp
L
Liu Jicong 已提交
428
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
429 430 431
  ASSERT(waitingRspNum >= 0);

  if (waitingRspNum == 0) {
432
    tmqCommitDone(pParamSet);
433 434 435 436
  }
  return 0;
}

L
Liu Jicong 已提交
437 438 439 440 441 442
static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pTopic, SMqCommitCbParamSet* pParamSet) {
  STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
  if (pOffset == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
443
  pOffset->val = pVg->currentOffset;
444

L
Liu Jicong 已提交
445 446 447 448
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pOffset->subKey, tmq->groupId, groupLen);
  pOffset->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);
L
Liu Jicong 已提交
449

L
Liu Jicong 已提交
450 451 452 453 454 455 456 457 458 459
  int32_t len;
  int32_t code;
  tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
  if (code < 0) {
    ASSERT(0);
    return -1;
  }
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
  if (buf == NULL) return -1;
  ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
460

L
Liu Jicong 已提交
461 462 463 464 465 466 467
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
  tEncodeSTqOffset(&encoder, pOffset);

  // build param
468
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
469 470 471 472 473 474 475 476 477 478 479 480 481 482
  pParam->params = pParamSet;
  pParam->pOffset = pOffset;

  // build send info
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
    return -1;
  }
  pMsgSendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = sizeof(SMsgHead) + len,
      .handle = NULL,
  };

S
Shengliang Guan 已提交
483 484
  tscDebug("consumer:%" PRId64 ", commit offset of %s on vgId:%d, offset is %" PRId64, tmq->consumerId, pOffset->subKey,
           pVg->vgId, pOffset->val.version);
L
Liu Jicong 已提交
485 486

  // TODO: put into cb
L
Liu Jicong 已提交
487
  pVg->committedOffset = pVg->currentOffset;
L
Liu Jicong 已提交
488 489 490 491

  pMsgSendInfo->requestId = generateRequestId();
  pMsgSendInfo->requestObjRefId = 0;
  pMsgSendInfo->param = pParam;
L
Liu Jicong 已提交
492
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
493
  pMsgSendInfo->fp = tmqCommitCb;
L
Liu Jicong 已提交
494 495 496
  pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
  // send msg

L
Liu Jicong 已提交
497 498 499
  atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
  atomic_add_fetch_32(&pParamSet->totalRspNum, 1);

L
Liu Jicong 已提交
500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
  return 0;
}

int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
  char*   topic;
  int32_t vgId;
  ASSERT(msg != NULL);
  if (TD_RES_TMQ(msg)) {
    SMqRspObj* pRspObj = (SMqRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
  } else if (TD_RES_TMQ_META(msg)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
    topic = pMetaRspObj->topic;
    vgId = pMetaRspObj->vgId;
  } else {
    return TSDB_CODE_TMQ_INVALID_MSG;
  }

  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
526 527
  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;
L
Liu Jicong 已提交
528 529 530 531 532 533
  pParamSet->automatic = 0;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

L
Liu Jicong 已提交
534 535
  int32_t code = -1;

L
Liu Jicong 已提交
536 537 538 539 540 541 542
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(pTopic->topicName, topic) != 0) continue;
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      if (pVg->vgId != vgId) continue;

L
Liu Jicong 已提交
543
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
L
Liu Jicong 已提交
544 545
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          goto FAIL;
L
Liu Jicong 已提交
546
        }
L
Liu Jicong 已提交
547
        goto HANDLE_RSP;
L
Liu Jicong 已提交
548 549
      }
    }
L
Liu Jicong 已提交
550
  }
L
Liu Jicong 已提交
551

L
Liu Jicong 已提交
552 553 554 555
HANDLE_RSP:
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
556 557 558
    return 0;
  }

L
Liu Jicong 已提交
559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
    return code;
  } else {
    code = 0;
  }

FAIL:
  if (code != 0 && async) {
    userCb(tmq, code, userParam);
  }
  return 0;
}

L
Liu Jicong 已提交
575 576
int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                       void* userParam) {
L
Liu Jicong 已提交
577 578 579 580 581 582
  int32_t code = -1;

  if (msg != NULL) {
    return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
  }

583 584 585 586 587
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
588 589 590 591

  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;

592 593 594 595 596 597
  pParamSet->automatic = automatic;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

598 599 600
  // init as 1 to prevent concurrency issue
  pParamSet->waitingRspNum = 1;

601 602
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
603

S
Shengliang Guan 已提交
604
    tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgNum %d", tmq->consumerId, pTopic->topicName,
L
Liu Jicong 已提交
605 606
             (int32_t)taosArrayGetSize(pTopic->vgs));

607
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
L
Liu Jicong 已提交
608 609
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);

S
Shengliang Guan 已提交
610 611
      tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgId:%d", tmq->consumerId, pTopic->topicName,
               pVg->vgId);
L
Liu Jicong 已提交
612

L
Liu Jicong 已提交
613 614 615
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
        tscDebug("consumer: %ld, vg:%d, current %ld, committed %ld", tmq->consumerId, pVg->vgId,
                 pVg->currentOffset.version, pVg->committedOffset.version);
L
Liu Jicong 已提交
616 617 618
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          continue;
        }
619 620 621 622
      }
    }
  }

L
Liu Jicong 已提交
623 624 625 626 627 628
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
    return 0;
  }

629 630 631 632 633 634
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
  ASSERT(waitingRspNum >= 0);
  if (waitingRspNum == 0) {
    tmqCommitDone(pParamSet);
  }

635 636 637 638
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
639
    taosMemoryFree(pParamSet);
640 641 642 643 644 645
  } else {
    code = 0;
  }

  if (code != 0 && async) {
    if (automatic) {
L
Liu Jicong 已提交
646
      tmq->commitCb(tmq, code, tmq->commitCbUserParam);
647
    } else {
L
Liu Jicong 已提交
648
      userCb(tmq, code, userParam);
649 650 651
    }
  }

652
#if 0
653
  if (!async) {
654 655
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
656
  }
657
#endif
658 659 660 661

  return 0;
}

662
void tmqAssignAskEpTask(void* param, void* tmrId) {
663 664 665 666 667 668 669 670 671
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
    *pTaskType = TMQ_DELAYED_TASK__ASK_EP;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
672 673 674
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
675 676 677 678 679 680 681 682 683
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
    *pTaskType = TMQ_DELAYED_TASK__COMMIT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
684 685 686
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
687 688 689 690 691 692 693 694 695
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
    *pTaskType = TMQ_DELAYED_TASK__REPORT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
696 697
}

698 699 700 701 702 703
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
  if (pMsg && pMsg->pData) taosMemoryFree(pMsg->pData);
  return 0;
}

void tmqSendHbReq(void* param, void* tmrId) {
704 705 706 707 708
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq == NULL) {
    return;
  }
709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737
  int64_t   consumerId = tmq->consumerId;
  int32_t   epoch = tmq->epoch;
  SMqHbReq* pReq = taosMemoryMalloc(sizeof(SMqHbReq));
  if (pReq == NULL) goto OVER;
  pReq->consumerId = consumerId;
  pReq->epoch = epoch;

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pReq);
  }
  sendInfo->msgInfo = (SDataBuf){
      .pData = pReq,
      .len = sizeof(SMqHbReq),
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = NULL;
  sendInfo->fp = tmqHbCb;
  sendInfo->msgType = TDMT_MND_MQ_HB;

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

OVER:
738
  taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer);
739 740
}

L
Liu Jicong 已提交
741 742 743 744 745 746 747 748
int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
  STaosQall* qall = taosAllocateQall();
  taosReadAllQitems(tmq->delayedTask, qall);
  while (1) {
    int8_t* pTaskType = NULL;
    taosGetQitem(qall, (void**)&pTaskType);
    if (pTaskType == NULL) break;

749
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
L
Liu Jicong 已提交
750
      tmqAskEp(tmq, true);
751 752 753 754 755

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
      *pRefId = tmq->refId;

      taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &tmq->epTimer);
L
Liu Jicong 已提交
756
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
L
Liu Jicong 已提交
757
      tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
758 759 760 761 762

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
      *pRefId = tmq->refId;

      taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId, tmqMgmt.timer, &tmq->commitTimer);
L
Liu Jicong 已提交
763 764 765 766
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    } else {
      ASSERT(0);
    }
L
Liu Jicong 已提交
767
    taosFreeQitem(pTaskType);
L
Liu Jicong 已提交
768 769 770 771 772
  }
  taosFreeQall(qall);
  return 0;
}

L
Liu Jicong 已提交
773
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
774
  SMqRspWrapper* msg = NULL;
L
Liu Jicong 已提交
775 776 777 778 779 780 781 782
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }

L
fix  
Liu Jicong 已提交
783
  msg = NULL;
L
Liu Jicong 已提交
784 785 786 787 788 789 790 791 792 793
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }
}

D
dapan1121 已提交
794
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
795 796 797 798 799
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
  tsem_post(&pParam->rspSem);
  return 0;
}
800

L
Liu Jicong 已提交
801
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
X
Xiaoyu Wang 已提交
802 803 804 805
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
806
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
807
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
808
  }
L
Liu Jicong 已提交
809
  return 0;
X
Xiaoyu Wang 已提交
810 811
}

L
Liu Jicong 已提交
812 813 814
int32_t tmq_unsubscribe(tmq_t* tmq) {
  tmq_list_t* lst = tmq_list_new();
  int32_t     rsp = tmq_subscribe(tmq, lst);
L
Liu Jicong 已提交
815 816
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
817 818
}

819 820
void tmqFreeImpl(void* handle) {
  tmq_t* tmq = (tmq_t*)handle;
L
Liu Jicong 已提交
821

822 823 824 825
  // TODO stop timer
  if (tmq->mqueue) taosCloseQueue(tmq->mqueue);
  if (tmq->delayedTask) taosCloseQueue(tmq->delayedTask);
  if (tmq->qall) taosFreeQall(tmq->qall);
L
Liu Jicong 已提交
826

827
  tsem_destroy(&tmq->rspSem);
L
Liu Jicong 已提交
828

829 830 831 832 833 834 835 836 837 838
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (pTopic->schema.nCols) taosMemoryFree(pTopic->schema.pSchema);
    int32_t vgSz = taosArrayGetSize(pTopic->vgs);
    taosArrayDestroy(pTopic->vgs);
  }
  taosArrayDestroy(tmq->clientTopics);
  taos_close_internal(tmq->pTscObj);
  taosMemoryFree(tmq);
L
Liu Jicong 已提交
839 840
}

L
Liu Jicong 已提交
841
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
L
Liu Jicong 已提交
842 843 844 845 846 847 848 849 850
  // init timer
  int8_t inited = atomic_val_compare_exchange_8(&tmqMgmt.inited, 0, 1);
  if (inited == 0) {
    tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
    if (tmqMgmt.timer == NULL) {
      atomic_store_8(&tmqMgmt.inited, 0);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
851
    tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
L
Liu Jicong 已提交
852 853
  }

L
Liu Jicong 已提交
854 855
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
L
Liu Jicong 已提交
856
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
857 858
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
859 860
    return NULL;
  }
L
Liu Jicong 已提交
861

L
Liu Jicong 已提交
862 863 864 865 866
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

  ASSERT(user);
  ASSERT(pass);
L
Liu Jicong 已提交
867
  ASSERT(conf->groupId[0]);
L
Liu Jicong 已提交
868

L
Liu Jicong 已提交
869 870 871 872 873 874
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
  pTmq->delayedTask = taosOpenQueue();

  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
L
Liu Jicong 已提交
875
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
876 877
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
878 879
    goto FAIL;
  }
L
Liu Jicong 已提交
880

L
Liu Jicong 已提交
881 882
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
883 884
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
885 886
  /*pTmq->epStatus = 0;*/
  /*pTmq->epSkipCnt = 0;*/
L
Liu Jicong 已提交
887

L
Liu Jicong 已提交
888 889 890
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
891
  pTmq->withTbName = conf->withTbName;
892
  pTmq->useSnapshot = conf->ssEnable;
L
Liu Jicong 已提交
893
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
894
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
895 896
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
897 898
  pTmq->resetOffsetCfg = conf->resetOffset;

899 900
  pTmq->hbBgEnable = conf->hbBgEnable;

L
Liu Jicong 已提交
901
  // assign consumerId
L
Liu Jicong 已提交
902
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
903

L
Liu Jicong 已提交
904 905
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
S
Shengliang Guan 已提交
906 907
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
908 909
    goto FAIL;
  }
L
Liu Jicong 已提交
910

L
Liu Jicong 已提交
911 912 913
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
S
Shengliang Guan 已提交
914 915
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
916 917 918
    tsem_destroy(&pTmq->rspSem);
    goto FAIL;
  }
L
Liu Jicong 已提交
919

920 921 922 923 924 925 926 927 928
  pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
  if (pTmq->refId < 0) {
    tmqFreeImpl(pTmq);
    return NULL;
  }

  int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
  *pRefId = pTmq->refId;

929
  if (pTmq->hbBgEnable) {
930
    pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
931 932
  }

S
Shengliang Guan 已提交
933
  tscInfo("consumer %" PRId64 " is setup, consumer group %s", pTmq->consumerId, pTmq->groupId);
L
Liu Jicong 已提交
934

935
  return pTmq;
L
Liu Jicong 已提交
936 937 938 939 940 941 942 943

FAIL:
  if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
  if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
  if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
  if (pTmq->qall) taosFreeQall(pTmq->qall);
  taosMemoryFree(pTmq);
  return NULL;
944 945
}

L
Liu Jicong 已提交
946
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
L
Liu Jicong 已提交
947 948 949 950 951
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
  SCMSubscribeReq req = {0};
  int32_t         code = -1;
952 953

  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
954
  tstrncpy(req.clientId, tmq->clientId, 256);
L
Liu Jicong 已提交
955
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
956
  req.topicNames = taosArrayInit(sz, sizeof(void*));
L
Liu Jicong 已提交
957
  if (req.topicNames == NULL) goto FAIL;
958

L
Liu Jicong 已提交
959 960
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
961 962

    SName name = {0};
L
Liu Jicong 已提交
963
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
964

L
Liu Jicong 已提交
965 966 967
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
968
    }
L
Liu Jicong 已提交
969
    tNameExtractFullName(&name, topicFName);
970

L
Liu Jicong 已提交
971 972 973
    tscDebug("subscribe topic: %s", topicFName);

    taosArrayPush(req.topicNames, &topicFName);
974 975
  }

L
Liu Jicong 已提交
976 977 978 979
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) goto FAIL;

980 981 982
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

983
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
984
  if (sendInfo == NULL) goto FAIL;
985

X
Xiaoyu Wang 已提交
986
  SMqSubscribeCbParam param = {
L
Liu Jicong 已提交
987
      .rspErr = 0,
988 989
      .refId = tmq->refId,
      .epoch = tmq->epoch,
X
Xiaoyu Wang 已提交
990
  };
L
Liu Jicong 已提交
991

L
Liu Jicong 已提交
992 993 994
  if (tsem_init(&param.rspSem, 0, 0) != 0) goto FAIL;

  sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
995 996 997 998
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
999

L
Liu Jicong 已提交
1000 1001
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1002 1003
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
1004 1005
  sendInfo->msgType = TDMT_MND_SUBSCRIBE;

1006 1007 1008 1009 1010
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
1011 1012 1013
  // avoid double free if msg is sent
  buf = NULL;

L
Liu Jicong 已提交
1014 1015
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
1016

L
Liu Jicong 已提交
1017 1018 1019
  code = param.rspErr;
  if (code != 0) goto FAIL;

L
Liu Jicong 已提交
1020
  int32_t retryCnt = 0;
L
Liu Jicong 已提交
1021
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
L
Liu Jicong 已提交
1022 1023 1024
    if (retryCnt++ > 10) {
      goto FAIL;
    }
L
fix  
Liu Jicong 已提交
1025
    tscDebug("consumer not ready, retry");
L
Liu Jicong 已提交
1026 1027
    taosMsleep(500);
  }
1028

1029 1030
  // init ep timer
  if (tmq->epTimer == NULL) {
1031 1032 1033
    int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId1 = tmq->refId;
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, pRefId1, tmqMgmt.timer);
1034
  }
L
Liu Jicong 已提交
1035 1036

  // init auto commit timer
1037
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
1038 1039 1040
    int64_t* pRefId2 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId2 = tmq->refId;
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId2, tmqMgmt.timer);
L
Liu Jicong 已提交
1041 1042
  }

L
Liu Jicong 已提交
1043 1044 1045
  code = 0;
FAIL:
  if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
1046
  if (code != 0 && buf) {
L
Liu Jicong 已提交
1047 1048 1049
    taosMemoryFree(buf);
  }
  return code;
1050 1051
}

L
Liu Jicong 已提交
1052
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
1053
  //
1054
  conf->commitCb = cb;
L
Liu Jicong 已提交
1055
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
1056
}
1057

D
dapan1121 已提交
1058
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1059 1060
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
1061
  SMqClientTopic* pTopic = pParam->pTopic;
1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073

  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
  if (tmq == NULL) {
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
    taosMemoryFree(pMsg->pData);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  int32_t epoch = pParam->epoch;
  int32_t vgId = pParam->vgId;
L
Liu Jicong 已提交
1074
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
1075
  if (code != 0) {
L
Liu Jicong 已提交
1076
    tscWarn("msg discard from vgId:%d, epoch %d, since %s", vgId, epoch, terrstr());
L
Liu Jicong 已提交
1077
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1078 1079 1080 1081
    if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
      goto CREATE_MSG_FAIL;
    }
L
Liu Jicong 已提交
1082 1083 1084 1085
    if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
      if (pRspWrapper == NULL) {
        taosMemoryFree(pMsg->pData);
S
Shengliang Guan 已提交
1086
        tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
Liu Jicong 已提交
1087 1088 1089 1090 1091 1092 1093 1094
        goto CREATE_MSG_FAIL;
      }
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      /*pRspWrapper->vgHandle = pVg;*/
      /*pRspWrapper->topicHandle = pTopic;*/
      taosWriteQitem(tmq->mqueue, pRspWrapper);
      tsem_post(&tmq->rspSem);
    }
L
fix txn  
Liu Jicong 已提交
1095
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1096 1097
  }

X
Xiaoyu Wang 已提交
1098 1099 1100
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
1101
    // do not write into queue since updating epoch reset
S
Shengliang Guan 已提交
1102
    tscWarn("msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d", vgId, msgEpoch,
L
Liu Jicong 已提交
1103
            tmqEpoch);
1104
    tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1105
    taosMemoryFree(pMsg->pData);
X
Xiaoyu Wang 已提交
1106 1107 1108 1109
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
S
Shengliang Guan 已提交
1110
    tscWarn("mismatch rsp from vgId:%d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
X
Xiaoyu Wang 已提交
1111 1112
  }

L
Liu Jicong 已提交
1113 1114 1115
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

1116
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1117
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1118
    taosMemoryFree(pMsg->pData);
S
Shengliang Guan 已提交
1119
    tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
fix txn  
Liu Jicong 已提交
1120
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1121
  }
L
Liu Jicong 已提交
1122

L
Liu Jicong 已提交
1123
  pRspWrapper->tmqRspType = rspType;
L
Liu Jicong 已提交
1124 1125
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
1126

L
Liu Jicong 已提交
1127
  if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1128 1129 1130
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
wmmhello's avatar
wmmhello 已提交
1131
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1132
    memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1133 1134 1135
  } else {
    ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP);
    tDecodeSMqMetaRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->metaRsp);
L
Liu Jicong 已提交
1136
    memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1137
  }
L
Liu Jicong 已提交
1138

L
Liu Jicong 已提交
1139
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1140

S
Shengliang Guan 已提交
1141 1142 1143
  tscDebug("consumer:%" PRId64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d",
           tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version,
           rspType);
L
fix  
Liu Jicong 已提交
1144

L
Liu Jicong 已提交
1145
  taosWriteQitem(tmq->mqueue, pRspWrapper);
1146
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1147

L
Liu Jicong 已提交
1148
  return 0;
L
fix txn  
Liu Jicong 已提交
1149
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1150
  if (epoch == tmq->epoch) {
L
Liu Jicong 已提交
1151 1152
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
1153
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1154
  return -1;
1155 1156
}

L
Liu Jicong 已提交
1157
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
1158 1159 1160 1161
  bool set = false;

  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
S
Shengliang Guan 已提交
1162
  tscDebug("consumer:%" PRId64 ", update ep epoch %d to epoch %d, topic num:%d", tmq->consumerId, tmq->epoch, epoch,
1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180
           topicNumGet);

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
S
Shengliang Guan 已提交
1181
      tscDebug("consumer:%" PRId64 ", new vg num: %d", tmq->consumerId, vgNumCur);
1182 1183 1184
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
        sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
L
Liu Jicong 已提交
1185
        char buf[80];
L
Liu Jicong 已提交
1186
        tFormatOffset(buf, 80, &pVgCur->currentOffset);
L
Liu Jicong 已提交
1187 1188
        tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch,
                 pVgCur->vgId, vgKey, buf);
L
Liu Jicong 已提交
1189
        taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal));
1190 1191 1192 1193 1194 1195 1196 1197
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
    topic.schema = pTopicEp->schema;
1198
    tstrncpy(topic.topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
1199 1200
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);

S
Shengliang Guan 已提交
1201
    tscDebug("consumer:%" PRId64 ", update topic: %s", tmq->consumerId, topic.topicName);
1202 1203 1204 1205 1206 1207

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
L
Liu Jicong 已提交
1208 1209
      STqOffsetVal* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      STqOffsetVal  offsetNew = {.type = tmq->resetOffsetCfg};
1210
      if (pOffset != NULL) {
L
Liu Jicong 已提交
1211
        offsetNew = *pOffset;
1212 1213 1214 1215
      }

      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1216
          .currentOffset = offsetNew,
X
Xiaoyu Wang 已提交
1217 1218 1219
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
L
Liu Jicong 已提交
1220
          .vgSkipCnt = 0,
X
Xiaoyu Wang 已提交
1221 1222 1223 1224
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
L
Liu Jicong 已提交
1225
    taosArrayPush(newTopics, &topic);
X
Xiaoyu Wang 已提交
1226
  }
1227 1228 1229 1230 1231 1232 1233 1234 1235 1236
  if (tmq->clientTopics) {
    int32_t sz = taosArrayGetSize(tmq->clientTopics);
    for (int32_t i = 0; i < sz; i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
      if (pTopic->schema.nCols) taosMemoryFree(pTopic->schema.pSchema);
      int32_t vgSz = taosArrayGetSize(pTopic->vgs);
      taosArrayDestroy(pTopic->vgs);
    }
    taosArrayDestroy(tmq->clientTopics);
  }
L
Liu Jicong 已提交
1237
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
1238
  tmq->clientTopics = newTopics;
1239

1240 1241 1242 1243
  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
1244

X
Xiaoyu Wang 已提交
1245 1246 1247 1248
  atomic_store_32(&tmq->epoch, epoch);
  return set;
}

D
dapan1121 已提交
1249
int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
1250
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1251
  int8_t           async = pParam->async;
1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264
  tmq_t*           tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);

  if (tmq == NULL) {
    if (!async) {
      tsem_destroy(&pParam->rspSem);
    } else {
      taosMemoryFree(pParam);
    }
    taosMemoryFree(pMsg->pData);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

L
Liu Jicong 已提交
1265
  pParam->code = code;
1266
  if (code != 0) {
S
Shengliang Guan 已提交
1267
    tscError("consumer:%" PRId64 ", get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->async);
L
Liu Jicong 已提交
1268
    goto END;
1269
  }
L
Liu Jicong 已提交
1270

L
Liu Jicong 已提交
1271
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1272
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1273
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1274 1275
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
S
Shengliang Guan 已提交
1276
  tscDebug("consumer:%" PRId64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1277 1278
  if (head->epoch <= epoch) {
    goto END;
1279
  }
L
Liu Jicong 已提交
1280

L
Liu Jicong 已提交
1281
  if (!async) {
L
Liu Jicong 已提交
1282 1283
    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
S
Shengliang Guan 已提交
1284 1285
    /*printf("rsp epoch %" PRId64 " sz %" PRId64 "\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %" PRId64 " sz %" PRId64 "\n", tmq->epoch, tmq->clientTopics->size);*/
L
Liu Jicong 已提交
1286
    tmqUpdateEp(tmq, head->epoch, &rsp);
L
Liu Jicong 已提交
1287
    tDeleteSMqAskEpRsp(&rsp);
X
Xiaoyu Wang 已提交
1288
  } else {
1289
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1290
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1291
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1292 1293
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1294
    }
L
Liu Jicong 已提交
1295 1296 1297
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1298
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1299

L
Liu Jicong 已提交
1300
    taosWriteQitem(tmq->mqueue, pWrapper);
1301
    tsem_post(&tmq->rspSem);
1302
  }
L
Liu Jicong 已提交
1303 1304

END:
L
Liu Jicong 已提交
1305
  /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1306
  if (!async) {
L
Liu Jicong 已提交
1307
    tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
1308 1309
  } else {
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
1310
  }
L
Liu Jicong 已提交
1311
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1312
  return code;
1313 1314
}

L
Liu Jicong 已提交
1315
int32_t tmqAskEp(tmq_t* tmq, bool async) {
L
Liu Jicong 已提交
1316
  int32_t code = 0;
L
Liu Jicong 已提交
1317
#if 0
L
Liu Jicong 已提交
1318
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
L
fix txn  
Liu Jicong 已提交
1319
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
1320
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
S
Shengliang Guan 已提交
1321
    tscTrace("consumer:%" PRId64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
1322
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
1323
  }
L
temp  
Liu Jicong 已提交
1324
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
1325
#endif
L
Liu Jicong 已提交
1326
  int32_t      tlen = sizeof(SMqAskEpReq);
L
Liu Jicong 已提交
1327
  SMqAskEpReq* req = taosMemoryCalloc(1, tlen);
L
Liu Jicong 已提交
1328
  if (req == NULL) {
L
Liu Jicong 已提交
1329
    tscError("failed to malloc get subscribe ep buf");
L
Liu Jicong 已提交
1330
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1331
    return -1;
L
Liu Jicong 已提交
1332
  }
L
Liu Jicong 已提交
1333 1334 1335
  req->consumerId = htobe64(tmq->consumerId);
  req->epoch = htonl(tmq->epoch);
  strcpy(req->cgroup, tmq->groupId);
1336

L
Liu Jicong 已提交
1337
  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
1338 1339
  if (pParam == NULL) {
    tscError("failed to malloc subscribe param");
wafwerar's avatar
wafwerar 已提交
1340
    taosMemoryFree(req);
L
Liu Jicong 已提交
1341
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1342
    return -1;
L
Liu Jicong 已提交
1343
  }
1344 1345
  pParam->refId = tmq->refId;
  pParam->epoch = tmq->epoch;
L
Liu Jicong 已提交
1346
  pParam->async = async;
X
Xiaoyu Wang 已提交
1347
  tsem_init(&pParam->rspSem, 0, 0);
1348

1349
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1350 1351
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1352 1353
    taosMemoryFree(pParam);
    taosMemoryFree(req);
L
Liu Jicong 已提交
1354
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1355 1356 1357 1358 1359 1360 1361 1362 1363 1364
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
      .pData = req,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
L
Liu Jicong 已提交
1365 1366 1367
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1368
  sendInfo->msgType = TDMT_MND_MQ_ASK_EP;
1369

L
Liu Jicong 已提交
1370 1371
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

S
Shengliang Guan 已提交
1372
  tscDebug("consumer:%" PRId64 ", ask ep", tmq->consumerId);
L
add log  
Liu Jicong 已提交
1373

L
Liu Jicong 已提交
1374 1375
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1376

L
Liu Jicong 已提交
1377
  if (!async) {
L
Liu Jicong 已提交
1378 1379 1380 1381 1382
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }
  return code;
1383 1384
}

1385
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1386
  SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq));
1387 1388 1389
  if (pReq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
1390

L
Liu Jicong 已提交
1391 1392 1393
  /*strcpy(pReq->topic, pTopic->topicName);*/
  /*strcpy(pReq->cgroup, tmq->groupId);*/

L
Liu Jicong 已提交
1394 1395 1396 1397
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1398

1399
  pReq->withTbName = tmq->withTbName;
1400
  pReq->timeout = timeout;
L
Liu Jicong 已提交
1401
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1402
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1403
  /*pReq->currentOffset = reqOffset;*/
L
Liu Jicong 已提交
1404
  pReq->reqOffset = pVg->currentOffset;
L
Liu Jicong 已提交
1405
  pReq->reqId = generateRequestId();
1406

L
Liu Jicong 已提交
1407 1408
  pReq->useSnapshot = tmq->useSnapshot;

1409
  pReq->head.vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
1410
  pReq->head.contLen = htonl(sizeof(SMqPollReq));
1411 1412 1413
  return pReq;
}

L
Liu Jicong 已提交
1414 1415
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
L
Liu Jicong 已提交
1416
  pRspObj->resType = RES_TYPE__TMQ_META;
L
Liu Jicong 已提交
1417 1418 1419 1420 1421 1422 1423 1424
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

L
Liu Jicong 已提交
1425 1426 1427
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1428 1429
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1430
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1431
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1432
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
L
Liu Jicong 已提交
1433

L
Liu Jicong 已提交
1434 1435
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
L
Liu Jicong 已提交
1436
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1437 1438
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1439

L
Liu Jicong 已提交
1440
  return pRspObj;
X
Xiaoyu Wang 已提交
1441 1442
}

1443
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1444
  /*tscDebug("call poll");*/
X
Xiaoyu Wang 已提交
1445 1446 1447 1448 1449 1450
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
      if (vgStatus != TMQ_VG_STATUS__IDLE) {
L
Liu Jicong 已提交
1451
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
L
Liu Jicong 已提交
1452 1453
        tscTrace("consumer:%" PRId64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId,
                 vgSkipCnt);
X
Xiaoyu Wang 已提交
1454
        continue;
L
Liu Jicong 已提交
1455
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1456 1457 1458 1459
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
S
Shengliang Guan 已提交
1460
        tscDebug("consumer:%" PRId64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId);
L
temp  
Liu Jicong 已提交
1461 1462
        }
#endif
X
Xiaoyu Wang 已提交
1463
      }
L
Liu Jicong 已提交
1464
      atomic_store_32(&pVg->vgSkipCnt, 0);
1465
      SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, timeout, pTopic, pVg);
X
Xiaoyu Wang 已提交
1466 1467
      if (pReq == NULL) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1468
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1469 1470
        return -1;
      }
wafwerar's avatar
wafwerar 已提交
1471
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1472
      if (pParam == NULL) {
wafwerar's avatar
wafwerar 已提交
1473
        taosMemoryFree(pReq);
X
Xiaoyu Wang 已提交
1474
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1475
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1476 1477
        return -1;
      }
1478 1479 1480
      pParam->refId = tmq->refId;
      pParam->epoch = tmq->epoch;

L
Liu Jicong 已提交
1481
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1482
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1483
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1484

1485
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1486
      if (sendInfo == NULL) {
wafwerar's avatar
wafwerar 已提交
1487 1488
        taosMemoryFree(pReq);
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1489
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1490
        tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1491 1492 1493 1494
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1495
          .pData = pReq,
L
Liu Jicong 已提交
1496
          .len = sizeof(SMqPollReq),
X
Xiaoyu Wang 已提交
1497 1498
          .handle = NULL,
      };
L
Liu Jicong 已提交
1499
      sendInfo->requestId = pReq->reqId;
X
Xiaoyu Wang 已提交
1500
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1501
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1502
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1503
      sendInfo->msgType = TDMT_VND_CONSUME;
X
Xiaoyu Wang 已提交
1504 1505

      int64_t transporterId = 0;
L
fix  
Liu Jicong 已提交
1506
      /*printf("send poll\n");*/
L
Liu Jicong 已提交
1507

1508
      char offsetFormatBuf[80];
L
Liu Jicong 已提交
1509
      tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffset);
L
Liu Jicong 已提交
1510 1511
      tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64,
               tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId);
S
Shengliang Guan 已提交
1512
      /*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/
X
Xiaoyu Wang 已提交
1513 1514 1515 1516 1517 1518 1519 1520
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
  return 0;
}

L
Liu Jicong 已提交
1521 1522
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1523
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1524 1525
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1526
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
L
Liu Jicong 已提交
1527
      tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1528
      /*tmqClearUnhandleMsg(tmq);*/
X
Xiaoyu Wang 已提交
1529 1530 1531 1532 1533 1534 1535 1536 1537 1538
      *pReset = true;
    } else {
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
1539
void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
X
Xiaoyu Wang 已提交
1540
  while (1) {
L
Liu Jicong 已提交
1541 1542 1543
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1544
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1545 1546
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
      if (rspWrapper == NULL) return NULL;
X
Xiaoyu Wang 已提交
1547 1548
    }

L
Liu Jicong 已提交
1549 1550 1551 1552 1553
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(rspWrapper);
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
      return NULL;
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1554
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1555
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
1556
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1557
      if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1558
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1559
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1560
         * rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1561
        pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;
X
Xiaoyu Wang 已提交
1562
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1563
        if (pollRspWrapper->dataRsp.blockNum == 0) {
L
Liu Jicong 已提交
1564 1565
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1566 1567
          continue;
        }
L
Liu Jicong 已提交
1568
        // build rsp
L
Liu Jicong 已提交
1569
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1570
        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1571
        return pRsp;
X
Xiaoyu Wang 已提交
1572
      } else {
L
Liu Jicong 已提交
1573 1574 1575 1576 1577 1578 1579
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
                 pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
        taosFreeQitem(pollRspWrapper);
      }
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1580
      if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1581
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1582
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1583
         * rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1584 1585
        pVg->currentOffset.version = pollRspWrapper->metaRsp.rspOffset;
        pVg->currentOffset.type = TMQ_OFFSET__LOG;
L
Liu Jicong 已提交
1586 1587
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // build rsp
L
Liu Jicong 已提交
1588
        SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1589 1590 1591 1592
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
L
Liu Jicong 已提交
1593
                 pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1594
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1595 1596
      }
    } else {
L
fix  
Liu Jicong 已提交
1597
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1598
      bool reset = false;
L
Liu Jicong 已提交
1599 1600
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1601
      if (pollIfReset && reset) {
S
Shengliang Guan 已提交
1602
        tscDebug("consumer:%" PRId64 ", reset and repoll", tmq->consumerId);
1603
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
1604 1605 1606 1607 1608
      }
    }
  }
}

1609
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1610
  /*tscDebug("call poll1");*/
L
Liu Jicong 已提交
1611 1612
  void*   rspObj;
  int64_t startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1613

1614 1615 1616
#if 0
  tmqHandleAllDelayedTask(tmq);
  tmqPollImpl(tmq, timeout);
1617
  rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1618 1619
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1620
  }
1621
#endif
X
Xiaoyu Wang 已提交
1622

1623
  // in no topic status, delayed task also need to be processed
L
Liu Jicong 已提交
1624
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1625 1626 1627
    return NULL;
  }

L
Liu Jicong 已提交
1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
    int32_t retryCnt = 0;
    while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
      if (retryCnt++ > 10) {
        return NULL;
      }
      tscDebug("consumer not ready, retry");
      taosMsleep(500);
    }
  }

X
Xiaoyu Wang 已提交
1639
  while (1) {
L
Liu Jicong 已提交
1640
    tmqHandleAllDelayedTask(tmq);
1641
    if (tmqPollImpl(tmq, timeout) < 0) return NULL;
L
Liu Jicong 已提交
1642

1643
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1644 1645
    if (rspObj) {
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
1646 1647
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      return NULL;
X
Xiaoyu Wang 已提交
1648
    }
1649
    if (timeout != -1) {
X
Xiaoyu Wang 已提交
1650
      int64_t endTime = taosGetTimestampMs();
1651
      int64_t leftTime = endTime - startTime;
1652
      if (leftTime > timeout) {
S
Shengliang Guan 已提交
1653
        tscDebug("consumer:%" PRId64 ", (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
X
Xiaoyu Wang 已提交
1654 1655
        return NULL;
      }
1656
      tsem_timewait(&tmq->rspSem, leftTime * 1000);
L
Liu Jicong 已提交
1657 1658 1659
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
      tsem_timewait(&tmq->rspSem, 500 * 1000);
X
Xiaoyu Wang 已提交
1660 1661 1662 1663
    }
  }
}

L
Liu Jicong 已提交
1664
int32_t tmq_consumer_close(tmq_t* tmq) {
1665
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
L
Liu Jicong 已提交
1666 1667
    int32_t rsp = tmq_commit_sync(tmq, NULL);
    if (rsp != 0) {
L
Liu Jicong 已提交
1668
      return rsp;
1669 1670
    }

L
Liu Jicong 已提交
1671
    int32_t     retryCnt = 0;
1672
    tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
1673 1674 1675 1676 1677 1678 1679 1680 1681 1682
    while (1) {
      rsp = tmq_subscribe(tmq, lst);
      if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
        break;
      } else {
        retryCnt++;
        taosMsleep(500);
      }
    }

1683
    tmq_list_destroy(lst);
1684

L
Liu Jicong 已提交
1685 1686
    /*return rsp;*/
    return 0;
L
Liu Jicong 已提交
1687
  }
1688
  taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
L
Liu Jicong 已提交
1689
  return 0;
1690
}
L
Liu Jicong 已提交
1691

L
Liu Jicong 已提交
1692 1693
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
1694
    return "success";
L
Liu Jicong 已提交
1695
  } else if (err == -1) {
L
Liu Jicong 已提交
1696 1697 1698
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
1699 1700
  }
}
L
Liu Jicong 已提交
1701

L
Liu Jicong 已提交
1702 1703 1704 1705
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
wmmhello's avatar
wmmhello 已提交
1706 1707 1708 1709
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DELETE) {
      return TMQ_RES_DATA;
    }
L
Liu Jicong 已提交
1710 1711 1712 1713 1714 1715
    return TMQ_RES_TABLE_META;
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
1716
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
1717 1718
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
1719
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1720 1721 1722
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1723 1724 1725 1726 1727
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1728 1729 1730 1731
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1732 1733 1734
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1735 1736 1737 1738 1739
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1740 1741 1742 1743
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
1744 1745 1746
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
L
Liu Jicong 已提交
1747 1748 1749 1750
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
1751 1752 1753 1754 1755 1756 1757 1758

const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
1759
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
L
Liu Jicong 已提交
1760 1761 1762
  }
  return NULL;
}
1763

L
Liu Jicong 已提交
1764
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
1765
  //
L
Liu Jicong 已提交
1766
  tmqCommitInner(tmq, msg, 0, 1, cb, param);
L
Liu Jicong 已提交
1767 1768
}

1769 1770
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) {
  //
L
Liu Jicong 已提交
1771
  return tmqCommitInner(tmq, msg, 0, 0, NULL, NULL);
1772
}