tmq.c 53.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "cJSON.h"
17 18 19
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22 23
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
24
#include "tqueue.h"
25
#include "tref.h"
L
Liu Jicong 已提交
26 27
#include "ttimer.h"

L
Liu Jicong 已提交
28 29 30
int32_t tmqAskEp(tmq_t* tmq, bool async);

typedef struct {
31 32 33
  int8_t  inited;
  tmr_h   timer;
  int32_t rsetId;
L
Liu Jicong 已提交
34 35 36
} SMqMgmt;

static SMqMgmt tmqMgmt = {0};
37

L
Liu Jicong 已提交
38 39 40 41 42 43
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
44 45 46
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
47 48
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
49
struct tmq_list_t {
L
Liu Jicong 已提交
50
  SArray container;
L
Liu Jicong 已提交
51
};
L
Liu Jicong 已提交
52

L
Liu Jicong 已提交
53
struct tmq_conf_t {
54 55 56 57 58
  char    clientId[256];
  char    groupId[TSDB_CGROUP_LEN];
  int8_t  autoCommit;
  int8_t  resetOffset;
  int8_t  withTbName;
L
Liu Jicong 已提交
59 60
  int8_t  snapEnable;
  int32_t snapBatchSize;
61 62 63

  bool hbBgEnable;

64 65 66 67 68
  uint16_t       port;
  int32_t        autoCommitInterval;
  char*          ip;
  char*          user;
  char*          pass;
69
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
70
  void*          commitCbUserParam;
L
Liu Jicong 已提交
71 72 73
};

struct tmq_t {
74
  int64_t refId;
L
Liu Jicong 已提交
75
  // conf
76 77 78 79 80 81 82 83 84 85 86
  char    groupId[TSDB_CGROUP_LEN];
  char    clientId[256];
  int8_t  withTbName;
  int8_t  useSnapshot;
  int8_t  autoCommit;
  int32_t autoCommitInterval;
  int32_t resetOffsetCfg;
  int64_t consumerId;

  bool hbBgEnable;

L
Liu Jicong 已提交
87 88
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
89 90 91 92

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
93 94
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
95
  int32_t epSkipCnt;
L
Liu Jicong 已提交
96
#endif
L
Liu Jicong 已提交
97 98
  int64_t pollCnt;

L
Liu Jicong 已提交
99
  // timer
100 101
  tmr_h hbLiveTimer;
  tmr_h epTimer;
L
Liu Jicong 已提交
102 103 104
  tmr_h reportTimer;
  tmr_h commitTimer;

L
Liu Jicong 已提交
105 106 107 108
  // connection
  STscObj* pTscObj;

  // container
L
Liu Jicong 已提交
109
  SArray*     clientTopics;  // SArray<SMqClientTopic>
L
Liu Jicong 已提交
110
  STaosQueue* mqueue;        // queue of rsp
L
Liu Jicong 已提交
111
  STaosQall*  qall;
L
Liu Jicong 已提交
112 113 114 115
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit

  // ctl
  tsem_t rspSem;
L
Liu Jicong 已提交
116 117
};

X
Xiaoyu Wang 已提交
118 119 120 121 122 123 124 125
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
126
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
127
  TMQ_CONSUMER_STATUS__RECOVER,
L
Liu Jicong 已提交
128 129
};

L
Liu Jicong 已提交
130
enum {
131
  TMQ_DELAYED_TASK__ASK_EP = 1,
L
Liu Jicong 已提交
132 133 134 135
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

L
Liu Jicong 已提交
136
typedef struct {
137 138 139
  // statistics
  int64_t pollCnt;
  // offset
L
Liu Jicong 已提交
140 141
  STqOffsetVal committedOffset;
  STqOffsetVal currentOffset;
L
Liu Jicong 已提交
142
  // connection info
143
  int32_t vgId;
X
Xiaoyu Wang 已提交
144
  int32_t vgStatus;
L
Liu Jicong 已提交
145
  int32_t vgSkipCnt;
146 147 148
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
149
typedef struct {
150
  // subscribe info
151 152
  char topicName[TSDB_TOPIC_FNAME_LEN];
  char db[TSDB_DB_FNAME_LEN];
L
Liu Jicong 已提交
153 154 155

  SArray* vgs;  // SArray<SMqClientVg>

L
Liu Jicong 已提交
156
  SSchemaWrapper schema;
157 158
} SMqClientTopic;

L
Liu Jicong 已提交
159 160 161 162 163
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
164
  union {
L
Liu Jicong 已提交
165 166
    SMqDataRsp dataRsp;
    SMqMetaRsp metaRsp;
L
Liu Jicong 已提交
167
    STaosxRsp  taosxRsp;
L
Liu Jicong 已提交
168
  };
L
Liu Jicong 已提交
169 170
} SMqPollRspWrapper;

L
Liu Jicong 已提交
171
typedef struct {
172 173
  int64_t refId;
  int32_t epoch;
L
Liu Jicong 已提交
174 175
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
176
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
177

L
Liu Jicong 已提交
178
typedef struct {
179 180
  int64_t refId;
  int32_t epoch;
L
Liu Jicong 已提交
181
  int32_t code;
L
Liu Jicong 已提交
182
  int32_t async;
X
Xiaoyu Wang 已提交
183
  tsem_t  rspSem;
184 185
} SMqAskEpCbParam;

L
Liu Jicong 已提交
186
typedef struct {
187 188
  int64_t         refId;
  int32_t         epoch;
L
Liu Jicong 已提交
189
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
190
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
191
  int32_t         vgId;
L
Liu Jicong 已提交
192
  tsem_t          rspSem;
X
Xiaoyu Wang 已提交
193
} SMqPollCbParam;
194

195
typedef struct {
196 197
  int64_t        refId;
  int32_t        epoch;
L
Liu Jicong 已提交
198 199
  int8_t         automatic;
  int8_t         async;
L
Liu Jicong 已提交
200 201
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
L
Liu Jicong 已提交
202
  int32_t        rspErr;
203
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
204 205 206 207
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
  void*  userParam;
  tsem_t rspSem;
208 209 210 211 212
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
  STqOffset*           pOffset;
213
} SMqCommitCbParam;
214

215
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
216
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
217
  conf->withTbName = false;
L
Liu Jicong 已提交
218
  conf->autoCommit = true;
L
Liu Jicong 已提交
219
  conf->autoCommitInterval = 5000;
X
Xiaoyu Wang 已提交
220
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
221
  conf->hbBgEnable = true;
222 223 224
  return conf;
}

L
Liu Jicong 已提交
225
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
226 227 228 229 230 231
  if (conf) {
    if (conf->ip) taosMemoryFree(conf->ip);
    if (conf->user) taosMemoryFree(conf->user);
    if (conf->pass) taosMemoryFree(conf->pass);
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
232 233 234
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
235 236
  if (strcmp(key, "group.id") == 0) {
    strcpy(conf->groupId, value);
L
Liu Jicong 已提交
237
    return TMQ_CONF_OK;
238
  }
L
Liu Jicong 已提交
239

240 241
  if (strcmp(key, "client.id") == 0) {
    strcpy(conf->clientId, value);
L
Liu Jicong 已提交
242 243
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
244

L
Liu Jicong 已提交
245 246
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
247
      conf->autoCommit = true;
L
Liu Jicong 已提交
248 249
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
250
      conf->autoCommit = false;
L
Liu Jicong 已提交
251 252 253 254
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
255
  }
L
Liu Jicong 已提交
256

L
Liu Jicong 已提交
257 258 259 260 261
  if (strcmp(key, "auto.commit.interval.ms") == 0) {
    conf->autoCommitInterval = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
262 263 264 265 266 267 268 269 270 271 272 273 274 275
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
276

277 278
  if (strcmp(key, "msg.with.table.name") == 0) {
    if (strcmp(value, "true") == 0) {
279
      conf->withTbName = true;
L
Liu Jicong 已提交
280
      return TMQ_CONF_OK;
281
    } else if (strcmp(value, "false") == 0) {
282
      conf->withTbName = false;
L
Liu Jicong 已提交
283
      return TMQ_CONF_OK;
284 285 286 287 288
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
289
  if (strcmp(key, "experimental.snapshot.enable") == 0) {
L
Liu Jicong 已提交
290
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
291
      conf->snapEnable = true;
292 293
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
294
      conf->snapEnable = false;
295 296 297 298 299 300
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
301 302 303 304 305
  if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
    conf->snapBatchSize = atoi(value);
    return TMQ_CONF_OK;
  }

306 307 308
  if (strcmp(key, "enable.heartbeat.background") == 0) {
    if (strcmp(value, "true") == 0) {
      conf->hbBgEnable = true;
L
Liu Jicong 已提交
309 310
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
311
      conf->hbBgEnable = false;
L
Liu Jicong 已提交
312 313 314 315
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
316
    return TMQ_CONF_OK;
L
Liu Jicong 已提交
317 318
  }

L
Liu Jicong 已提交
319
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
320 321 322
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
323
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
324 325 326
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
327
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
328 329 330
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
331
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
332 333 334
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
335
  if (strcmp(key, "td.connect.db") == 0) {
336
    /*conf->db = strdup(value);*/
L
Liu Jicong 已提交
337 338 339
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
340
  return TMQ_CONF_UNKNOWN;
341 342 343
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
344 345
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
346 347
}

L
Liu Jicong 已提交
348 349
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
350 351 352 353 354
  if (src == NULL || src[0] == 0) return -1;
  char* topic = strdup(src);
  if (topic[0] != '`') {
    strtolower(topic, src);
  }
L
fix  
Liu Jicong 已提交
355
  if (taosArrayPush(container, &topic) == NULL) return -1;
356 357 358
  return 0;
}

L
Liu Jicong 已提交
359
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
360
  SArray* container = &list->container;
L
Liu Jicong 已提交
361
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
362 363
}

L
Liu Jicong 已提交
364 365 366 367 368 369 370 371 372 373
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

L
Liu Jicong 已提交
374 375 376 377
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409
int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParamSet->refId);
  if (tmq == NULL) {
    if (!pParamSet->async) {
      tsem_destroy(&pParamSet->rspSem);
    }
    taosMemoryFree(pParamSet);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  // if no more waiting rsp
  if (pParamSet->async) {
    // call async cb func
    if (pParamSet->automatic && tmq->commitCb) {
      tmq->commitCb(tmq, pParamSet->rspErr, tmq->commitCbUserParam);
    } else if (!pParamSet->automatic && pParamSet->userCb) {
      // sem post
      pParamSet->userCb(tmq, pParamSet->rspErr, pParamSet->userParam);
    }
    taosMemoryFree(pParamSet);
  } else {
    tsem_post(&pParamSet->rspSem);
  }

#if 0
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
#endif
  return 0;
}

410 411
int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
412 413
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
  // push into array
L
Liu Jicong 已提交
414
#if 0
415 416 417 418 419
  if (code == 0) {
    taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset);
  } else {
    taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
  }
L
Liu Jicong 已提交
420
#endif
L
Liu Jicong 已提交
421

L
Liu Jicong 已提交
422 423 424
  taosMemoryFree(pParam->pOffset);
  if (pBuf->pData) taosMemoryFree(pBuf->pData);

S
Shengliang Guan 已提交
425
  /*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
L
Liu Jicong 已提交
426 427
   * pOffset->version);*/

428
  // count down waiting rsp
L
Liu Jicong 已提交
429
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
430 431 432
  ASSERT(waitingRspNum >= 0);

  if (waitingRspNum == 0) {
433
    tmqCommitDone(pParamSet);
434 435 436 437
  }
  return 0;
}

L
Liu Jicong 已提交
438 439 440 441 442 443
static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pTopic, SMqCommitCbParamSet* pParamSet) {
  STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
  if (pOffset == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
444
  pOffset->val = pVg->currentOffset;
445

L
Liu Jicong 已提交
446 447 448 449
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pOffset->subKey, tmq->groupId, groupLen);
  pOffset->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);
L
Liu Jicong 已提交
450

L
Liu Jicong 已提交
451 452 453 454 455 456 457 458 459 460
  int32_t len;
  int32_t code;
  tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
  if (code < 0) {
    ASSERT(0);
    return -1;
  }
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
  if (buf == NULL) return -1;
  ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
461

L
Liu Jicong 已提交
462 463 464 465 466 467 468
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
  tEncodeSTqOffset(&encoder, pOffset);

  // build param
469
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
470 471 472 473 474 475 476 477 478 479 480 481 482 483
  pParam->params = pParamSet;
  pParam->pOffset = pOffset;

  // build send info
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
    return -1;
  }
  pMsgSendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = sizeof(SMsgHead) + len,
      .handle = NULL,
  };

S
Shengliang Guan 已提交
484 485
  tscDebug("consumer:%" PRId64 ", commit offset of %s on vgId:%d, offset is %" PRId64, tmq->consumerId, pOffset->subKey,
           pVg->vgId, pOffset->val.version);
L
Liu Jicong 已提交
486 487

  // TODO: put into cb
L
Liu Jicong 已提交
488
  pVg->committedOffset = pVg->currentOffset;
L
Liu Jicong 已提交
489 490 491 492

  pMsgSendInfo->requestId = generateRequestId();
  pMsgSendInfo->requestObjRefId = 0;
  pMsgSendInfo->param = pParam;
L
Liu Jicong 已提交
493
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
494
  pMsgSendInfo->fp = tmqCommitCb;
L
Liu Jicong 已提交
495 496 497
  pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
  // send msg

L
Liu Jicong 已提交
498 499 500
  atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
  atomic_add_fetch_32(&pParamSet->totalRspNum, 1);

L
Liu Jicong 已提交
501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
  return 0;
}

int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
  char*   topic;
  int32_t vgId;
  ASSERT(msg != NULL);
  if (TD_RES_TMQ(msg)) {
    SMqRspObj* pRspObj = (SMqRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
  } else if (TD_RES_TMQ_META(msg)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
    topic = pMetaRspObj->topic;
    vgId = pMetaRspObj->vgId;
  } else {
    return TSDB_CODE_TMQ_INVALID_MSG;
  }

  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
527 528
  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;
L
Liu Jicong 已提交
529 530 531 532 533 534
  pParamSet->automatic = 0;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

L
Liu Jicong 已提交
535 536
  int32_t code = -1;

L
Liu Jicong 已提交
537 538 539 540 541 542 543
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(pTopic->topicName, topic) != 0) continue;
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      if (pVg->vgId != vgId) continue;

L
Liu Jicong 已提交
544
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
L
Liu Jicong 已提交
545 546
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          goto FAIL;
L
Liu Jicong 已提交
547
        }
L
Liu Jicong 已提交
548
        goto HANDLE_RSP;
L
Liu Jicong 已提交
549 550
      }
    }
L
Liu Jicong 已提交
551
  }
L
Liu Jicong 已提交
552

L
Liu Jicong 已提交
553 554 555 556
HANDLE_RSP:
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
557 558 559
    return 0;
  }

L
Liu Jicong 已提交
560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
    return code;
  } else {
    code = 0;
  }

FAIL:
  if (code != 0 && async) {
    userCb(tmq, code, userParam);
  }
  return 0;
}

L
Liu Jicong 已提交
576 577
int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                       void* userParam) {
L
Liu Jicong 已提交
578 579 580 581 582 583
  int32_t code = -1;

  if (msg != NULL) {
    return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
  }

584 585 586 587 588
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
589 590 591 592

  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;

593 594 595 596 597 598
  pParamSet->automatic = automatic;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

599 600 601
  // init as 1 to prevent concurrency issue
  pParamSet->waitingRspNum = 1;

602 603
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
604

S
Shengliang Guan 已提交
605
    tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgNum %d", tmq->consumerId, pTopic->topicName,
L
Liu Jicong 已提交
606 607
             (int32_t)taosArrayGetSize(pTopic->vgs));

608
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
L
Liu Jicong 已提交
609 610
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);

S
Shengliang Guan 已提交
611 612
      tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgId:%d", tmq->consumerId, pTopic->topicName,
               pVg->vgId);
L
Liu Jicong 已提交
613

L
Liu Jicong 已提交
614 615 616
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
        tscDebug("consumer: %ld, vg:%d, current %ld, committed %ld", tmq->consumerId, pVg->vgId,
                 pVg->currentOffset.version, pVg->committedOffset.version);
L
Liu Jicong 已提交
617 618 619
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          continue;
        }
620 621 622 623
      }
    }
  }

L
Liu Jicong 已提交
624 625 626 627 628 629
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
    return 0;
  }

630 631 632 633 634 635
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
  ASSERT(waitingRspNum >= 0);
  if (waitingRspNum == 0) {
    tmqCommitDone(pParamSet);
  }

636 637 638 639
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
640
    taosMemoryFree(pParamSet);
641 642 643 644 645 646
  } else {
    code = 0;
  }

  if (code != 0 && async) {
    if (automatic) {
L
Liu Jicong 已提交
647
      tmq->commitCb(tmq, code, tmq->commitCbUserParam);
648
    } else {
L
Liu Jicong 已提交
649
      userCb(tmq, code, userParam);
650 651 652
    }
  }

L
Liu Jicong 已提交
653
#if 0
654 655 656 657
  if (!async) {
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
  }
L
Liu Jicong 已提交
658
#endif
659 660 661 662

  return 0;
}

663
void tmqAssignAskEpTask(void* param, void* tmrId) {
664 665 666 667 668 669 670 671 672
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
    *pTaskType = TMQ_DELAYED_TASK__ASK_EP;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
673 674 675
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
676 677 678 679 680 681 682 683 684
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
    *pTaskType = TMQ_DELAYED_TASK__COMMIT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
685 686 687
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
688 689 690 691 692 693 694 695 696
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
    *pTaskType = TMQ_DELAYED_TASK__REPORT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
697 698
}

699 700 701 702 703 704
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
  if (pMsg && pMsg->pData) taosMemoryFree(pMsg->pData);
  return 0;
}

void tmqSendHbReq(void* param, void* tmrId) {
705 706 707 708 709
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq == NULL) {
    return;
  }
710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738
  int64_t   consumerId = tmq->consumerId;
  int32_t   epoch = tmq->epoch;
  SMqHbReq* pReq = taosMemoryMalloc(sizeof(SMqHbReq));
  if (pReq == NULL) goto OVER;
  pReq->consumerId = consumerId;
  pReq->epoch = epoch;

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pReq);
  }
  sendInfo->msgInfo = (SDataBuf){
      .pData = pReq,
      .len = sizeof(SMqHbReq),
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = NULL;
  sendInfo->fp = tmqHbCb;
  sendInfo->msgType = TDMT_MND_MQ_HB;

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

OVER:
739
  taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer);
740 741
}

L
Liu Jicong 已提交
742 743 744 745 746 747 748 749
int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
  STaosQall* qall = taosAllocateQall();
  taosReadAllQitems(tmq->delayedTask, qall);
  while (1) {
    int8_t* pTaskType = NULL;
    taosGetQitem(qall, (void**)&pTaskType);
    if (pTaskType == NULL) break;

750
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
L
Liu Jicong 已提交
751
      tmqAskEp(tmq, true);
752 753 754 755 756

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
      *pRefId = tmq->refId;

      taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &tmq->epTimer);
L
Liu Jicong 已提交
757
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
L
Liu Jicong 已提交
758
      tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
759 760 761 762 763

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
      *pRefId = tmq->refId;

      taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId, tmqMgmt.timer, &tmq->commitTimer);
L
Liu Jicong 已提交
764 765 766 767
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    } else {
      ASSERT(0);
    }
L
Liu Jicong 已提交
768
    taosFreeQitem(pTaskType);
L
Liu Jicong 已提交
769 770 771 772 773
  }
  taosFreeQall(qall);
  return 0;
}

L
Liu Jicong 已提交
774
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
775
  SMqRspWrapper* msg = NULL;
L
Liu Jicong 已提交
776 777 778 779 780 781 782 783
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }

L
fix  
Liu Jicong 已提交
784
  msg = NULL;
L
Liu Jicong 已提交
785 786 787 788 789 790 791 792 793 794
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }
}

D
dapan1121 已提交
795
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
796 797 798 799 800
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
  tsem_post(&pParam->rspSem);
  return 0;
}
801

L
Liu Jicong 已提交
802
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
X
Xiaoyu Wang 已提交
803 804 805 806
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
807
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
808
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
809
  }
L
Liu Jicong 已提交
810
  return 0;
X
Xiaoyu Wang 已提交
811 812
}

L
Liu Jicong 已提交
813
int32_t tmq_unsubscribe(tmq_t* tmq) {
L
Liu Jicong 已提交
814 815
  int32_t     rsp;
  int32_t     retryCnt = 0;
L
Liu Jicong 已提交
816
  tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
817 818 819 820 821 822 823 824 825 826
  while (1) {
    rsp = tmq_subscribe(tmq, lst);
    if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
      break;
    } else {
      retryCnt++;
      taosMsleep(500);
    }
  }

L
Liu Jicong 已提交
827 828
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
829 830
}

831 832
void tmqFreeImpl(void* handle) {
  tmq_t* tmq = (tmq_t*)handle;
L
Liu Jicong 已提交
833

834 835 836 837
  // TODO stop timer
  if (tmq->mqueue) taosCloseQueue(tmq->mqueue);
  if (tmq->delayedTask) taosCloseQueue(tmq->delayedTask);
  if (tmq->qall) taosFreeQall(tmq->qall);
L
Liu Jicong 已提交
838

839
  tsem_destroy(&tmq->rspSem);
L
Liu Jicong 已提交
840

841 842 843 844 845 846 847 848 849 850
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (pTopic->schema.nCols) taosMemoryFree(pTopic->schema.pSchema);
    int32_t vgSz = taosArrayGetSize(pTopic->vgs);
    taosArrayDestroy(pTopic->vgs);
  }
  taosArrayDestroy(tmq->clientTopics);
  taos_close_internal(tmq->pTscObj);
  taosMemoryFree(tmq);
L
Liu Jicong 已提交
851 852
}

L
Liu Jicong 已提交
853
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
L
Liu Jicong 已提交
854 855 856 857 858 859 860 861 862
  // init timer
  int8_t inited = atomic_val_compare_exchange_8(&tmqMgmt.inited, 0, 1);
  if (inited == 0) {
    tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
    if (tmqMgmt.timer == NULL) {
      atomic_store_8(&tmqMgmt.inited, 0);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
863
    tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
L
Liu Jicong 已提交
864 865
  }

L
Liu Jicong 已提交
866 867
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
L
Liu Jicong 已提交
868
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
869 870
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
871 872
    return NULL;
  }
L
Liu Jicong 已提交
873

L
Liu Jicong 已提交
874 875 876 877 878
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

  ASSERT(user);
  ASSERT(pass);
L
Liu Jicong 已提交
879
  ASSERT(conf->groupId[0]);
L
Liu Jicong 已提交
880

L
Liu Jicong 已提交
881 882 883 884 885 886
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
  pTmq->delayedTask = taosOpenQueue();

  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
L
Liu Jicong 已提交
887
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
888 889
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
890 891
    goto FAIL;
  }
L
Liu Jicong 已提交
892

L
Liu Jicong 已提交
893 894
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
895 896
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
897 898
  /*pTmq->epStatus = 0;*/
  /*pTmq->epSkipCnt = 0;*/
L
Liu Jicong 已提交
899

L
Liu Jicong 已提交
900 901 902
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
903
  pTmq->withTbName = conf->withTbName;
L
Liu Jicong 已提交
904
  pTmq->useSnapshot = conf->snapEnable;
L
Liu Jicong 已提交
905
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
906
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
907 908
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
909 910
  pTmq->resetOffsetCfg = conf->resetOffset;

911 912
  pTmq->hbBgEnable = conf->hbBgEnable;

L
Liu Jicong 已提交
913
  // assign consumerId
L
Liu Jicong 已提交
914
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
915

L
Liu Jicong 已提交
916 917
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
S
Shengliang Guan 已提交
918 919
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
920 921
    goto FAIL;
  }
L
Liu Jicong 已提交
922

L
Liu Jicong 已提交
923 924 925
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
S
Shengliang Guan 已提交
926 927
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
928 929 930
    tsem_destroy(&pTmq->rspSem);
    goto FAIL;
  }
L
Liu Jicong 已提交
931

932 933 934 935 936 937 938 939 940
  pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
  if (pTmq->refId < 0) {
    tmqFreeImpl(pTmq);
    return NULL;
  }

  int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
  *pRefId = pTmq->refId;

941
  if (pTmq->hbBgEnable) {
942
    pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
943 944
  }

S
Shengliang Guan 已提交
945
  tscInfo("consumer %" PRId64 " is setup, consumer group %s", pTmq->consumerId, pTmq->groupId);
L
Liu Jicong 已提交
946

947
  return pTmq;
L
Liu Jicong 已提交
948 949 950 951 952 953 954 955

FAIL:
  if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
  if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
  if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
  if (pTmq->qall) taosFreeQall(pTmq->qall);
  taosMemoryFree(pTmq);
  return NULL;
956 957
}

L
Liu Jicong 已提交
958
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
L
Liu Jicong 已提交
959 960 961 962 963
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
  SCMSubscribeReq req = {0};
  int32_t         code = -1;
964 965

  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
966
  tstrncpy(req.clientId, tmq->clientId, 256);
L
Liu Jicong 已提交
967
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
968
  req.topicNames = taosArrayInit(sz, sizeof(void*));
L
Liu Jicong 已提交
969
  if (req.topicNames == NULL) goto FAIL;
970

L
Liu Jicong 已提交
971 972
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
973 974

    SName name = {0};
L
Liu Jicong 已提交
975
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
976

L
Liu Jicong 已提交
977 978 979
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
980
    }
L
Liu Jicong 已提交
981
    tNameExtractFullName(&name, topicFName);
982

L
Liu Jicong 已提交
983 984 985
    tscDebug("subscribe topic: %s", topicFName);

    taosArrayPush(req.topicNames, &topicFName);
986 987
  }

L
Liu Jicong 已提交
988 989 990 991
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) goto FAIL;

992 993 994
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

995
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
996
  if (sendInfo == NULL) goto FAIL;
997

X
Xiaoyu Wang 已提交
998
  SMqSubscribeCbParam param = {
L
Liu Jicong 已提交
999
      .rspErr = 0,
1000 1001
      .refId = tmq->refId,
      .epoch = tmq->epoch,
X
Xiaoyu Wang 已提交
1002
  };
L
Liu Jicong 已提交
1003

L
Liu Jicong 已提交
1004 1005 1006
  if (tsem_init(&param.rspSem, 0, 0) != 0) goto FAIL;

  sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1007 1008 1009 1010
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
1011

L
Liu Jicong 已提交
1012 1013
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1014 1015
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
1016 1017
  sendInfo->msgType = TDMT_MND_SUBSCRIBE;

1018 1019 1020 1021 1022
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
1023 1024 1025
  // avoid double free if msg is sent
  buf = NULL;

L
Liu Jicong 已提交
1026 1027
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
1028

L
Liu Jicong 已提交
1029 1030 1031
  code = param.rspErr;
  if (code != 0) goto FAIL;

L
Liu Jicong 已提交
1032
  int32_t retryCnt = 0;
L
Liu Jicong 已提交
1033
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
L
Liu Jicong 已提交
1034 1035 1036
    if (retryCnt++ > 10) {
      goto FAIL;
    }
L
fix  
Liu Jicong 已提交
1037
    tscDebug("consumer not ready, retry");
L
Liu Jicong 已提交
1038 1039
    taosMsleep(500);
  }
1040

1041 1042
  // init ep timer
  if (tmq->epTimer == NULL) {
1043 1044 1045
    int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId1 = tmq->refId;
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, pRefId1, tmqMgmt.timer);
1046
  }
L
Liu Jicong 已提交
1047 1048

  // init auto commit timer
1049
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
1050 1051 1052
    int64_t* pRefId2 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId2 = tmq->refId;
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId2, tmqMgmt.timer);
L
Liu Jicong 已提交
1053 1054
  }

L
Liu Jicong 已提交
1055 1056 1057
  code = 0;
FAIL:
  if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
1058
  if (code != 0 && buf) {
L
Liu Jicong 已提交
1059 1060 1061
    taosMemoryFree(buf);
  }
  return code;
1062 1063
}

L
Liu Jicong 已提交
1064
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
1065
  //
1066
  conf->commitCb = cb;
L
Liu Jicong 已提交
1067
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
1068
}
1069

D
dapan1121 已提交
1070
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1071 1072
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
1073
  SMqClientTopic* pTopic = pParam->pTopic;
1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085

  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
  if (tmq == NULL) {
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
    taosMemoryFree(pMsg->pData);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  int32_t epoch = pParam->epoch;
  int32_t vgId = pParam->vgId;
L
Liu Jicong 已提交
1086
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
1087
  if (code != 0) {
L
Liu Jicong 已提交
1088
    tscWarn("msg discard from vgId:%d, epoch %d, since %s", vgId, epoch, terrstr());
L
Liu Jicong 已提交
1089
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1090 1091 1092 1093
    if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
      goto CREATE_MSG_FAIL;
    }
L
Liu Jicong 已提交
1094 1095 1096 1097
    if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
      if (pRspWrapper == NULL) {
        taosMemoryFree(pMsg->pData);
S
Shengliang Guan 已提交
1098
        tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
Liu Jicong 已提交
1099 1100 1101 1102 1103 1104 1105 1106
        goto CREATE_MSG_FAIL;
      }
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      /*pRspWrapper->vgHandle = pVg;*/
      /*pRspWrapper->topicHandle = pTopic;*/
      taosWriteQitem(tmq->mqueue, pRspWrapper);
      tsem_post(&tmq->rspSem);
    }
L
fix txn  
Liu Jicong 已提交
1107
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1108 1109
  }

X
Xiaoyu Wang 已提交
1110 1111 1112
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
1113
    // do not write into queue since updating epoch reset
S
Shengliang Guan 已提交
1114
    tscWarn("msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d", vgId, msgEpoch,
L
Liu Jicong 已提交
1115
            tmqEpoch);
1116
    tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1117
    taosMemoryFree(pMsg->pData);
X
Xiaoyu Wang 已提交
1118 1119 1120 1121
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
S
Shengliang Guan 已提交
1122
    tscWarn("mismatch rsp from vgId:%d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
X
Xiaoyu Wang 已提交
1123 1124
  }

L
Liu Jicong 已提交
1125 1126 1127
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

1128
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1129
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1130
    taosMemoryFree(pMsg->pData);
S
Shengliang Guan 已提交
1131
    tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
fix txn  
Liu Jicong 已提交
1132
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1133
  }
L
Liu Jicong 已提交
1134

L
Liu Jicong 已提交
1135
  pRspWrapper->tmqRspType = rspType;
L
Liu Jicong 已提交
1136 1137
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
1138

L
Liu Jicong 已提交
1139
  if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1140 1141 1142
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
wmmhello's avatar
wmmhello 已提交
1143
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1144
    memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1145 1146 1147 1148 1149 1150

    tscDebug("consumer:%" PRId64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d",
             tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version,
             rspType);

  } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
1151 1152 1153 1154
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqMetaRsp(&decoder, &pRspWrapper->metaRsp);
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1155
    memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1156 1157 1158 1159 1160 1161 1162 1163
  } else if (rspType == TMQ_MSG_TYPE__TAOSX_RSP) {
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp);
    tDecoderClear(&decoder);
    memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead));
  } else {
    ASSERT(0);
L
Liu Jicong 已提交
1164
  }
L
Liu Jicong 已提交
1165

L
Liu Jicong 已提交
1166
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1167

L
Liu Jicong 已提交
1168
  taosWriteQitem(tmq->mqueue, pRspWrapper);
1169
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1170

L
Liu Jicong 已提交
1171
  return 0;
L
fix txn  
Liu Jicong 已提交
1172
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1173
  if (epoch == tmq->epoch) {
L
Liu Jicong 已提交
1174 1175
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
1176
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1177
  return -1;
1178 1179
}

L
Liu Jicong 已提交
1180
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
1181 1182 1183 1184
  bool set = false;

  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
S
Shengliang Guan 已提交
1185
  tscDebug("consumer:%" PRId64 ", update ep epoch %d to epoch %d, topic num:%d", tmq->consumerId, tmq->epoch, epoch,
1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203
           topicNumGet);

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
S
Shengliang Guan 已提交
1204
      tscDebug("consumer:%" PRId64 ", new vg num: %d", tmq->consumerId, vgNumCur);
1205 1206 1207
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
        sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
L
Liu Jicong 已提交
1208
        char buf[80];
L
Liu Jicong 已提交
1209
        tFormatOffset(buf, 80, &pVgCur->currentOffset);
L
Liu Jicong 已提交
1210 1211
        tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch,
                 pVgCur->vgId, vgKey, buf);
L
Liu Jicong 已提交
1212
        taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal));
1213 1214 1215 1216 1217 1218 1219 1220
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
    topic.schema = pTopicEp->schema;
1221
    tstrncpy(topic.topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
1222 1223
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);

S
Shengliang Guan 已提交
1224
    tscDebug("consumer:%" PRId64 ", update topic: %s", tmq->consumerId, topic.topicName);
1225 1226 1227 1228 1229 1230

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
L
Liu Jicong 已提交
1231 1232
      STqOffsetVal* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      STqOffsetVal  offsetNew = {.type = tmq->resetOffsetCfg};
1233
      if (pOffset != NULL) {
L
Liu Jicong 已提交
1234
        offsetNew = *pOffset;
1235 1236 1237 1238
      }

      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1239
          .currentOffset = offsetNew,
1240 1241 1242 1243 1244 1245 1246 1247 1248 1249
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
          .vgSkipCnt = 0,
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
    taosArrayPush(newTopics, &topic);
  }
1250 1251 1252 1253 1254 1255 1256
  if (tmq->clientTopics) {
    int32_t sz = taosArrayGetSize(tmq->clientTopics);
    for (int32_t i = 0; i < sz; i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
      if (pTopic->schema.nCols) taosMemoryFree(pTopic->schema.pSchema);
      int32_t vgSz = taosArrayGetSize(pTopic->vgs);
      taosArrayDestroy(pTopic->vgs);
L
Liu Jicong 已提交
1257
    }
1258
    taosArrayDestroy(tmq->clientTopics);
X
Xiaoyu Wang 已提交
1259
  }
L
Liu Jicong 已提交
1260
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
1261
  tmq->clientTopics = newTopics;
1262

1263 1264 1265 1266
  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
1267

X
Xiaoyu Wang 已提交
1268 1269 1270 1271
  atomic_store_32(&tmq->epoch, epoch);
  return set;
}

D
dapan1121 已提交
1272
int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
1273
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1274
  int8_t           async = pParam->async;
1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287
  tmq_t*           tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);

  if (tmq == NULL) {
    if (!async) {
      tsem_destroy(&pParam->rspSem);
    } else {
      taosMemoryFree(pParam);
    }
    taosMemoryFree(pMsg->pData);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

L
Liu Jicong 已提交
1288
  pParam->code = code;
1289
  if (code != 0) {
S
Shengliang Guan 已提交
1290
    tscError("consumer:%" PRId64 ", get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->async);
L
Liu Jicong 已提交
1291
    goto END;
1292
  }
L
Liu Jicong 已提交
1293

L
Liu Jicong 已提交
1294
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1295
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1296
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1297 1298
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
S
Shengliang Guan 已提交
1299
  tscDebug("consumer:%" PRId64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1300 1301
  if (head->epoch <= epoch) {
    goto END;
1302
  }
L
Liu Jicong 已提交
1303

L
Liu Jicong 已提交
1304
  if (!async) {
L
Liu Jicong 已提交
1305 1306
    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
S
Shengliang Guan 已提交
1307 1308
    /*printf("rsp epoch %" PRId64 " sz %" PRId64 "\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %" PRId64 " sz %" PRId64 "\n", tmq->epoch, tmq->clientTopics->size);*/
L
Liu Jicong 已提交
1309
    tmqUpdateEp(tmq, head->epoch, &rsp);
L
Liu Jicong 已提交
1310
    tDeleteSMqAskEpRsp(&rsp);
X
Xiaoyu Wang 已提交
1311
  } else {
1312
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1313
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1314
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1315 1316
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1317
    }
L
Liu Jicong 已提交
1318 1319 1320
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1321
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1322

L
Liu Jicong 已提交
1323
    taosWriteQitem(tmq->mqueue, pWrapper);
1324
    tsem_post(&tmq->rspSem);
1325
  }
L
Liu Jicong 已提交
1326 1327

END:
L
Liu Jicong 已提交
1328
  /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1329
  if (!async) {
L
Liu Jicong 已提交
1330
    tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
1331 1332
  } else {
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
1333
  }
L
Liu Jicong 已提交
1334
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1335
  return code;
1336 1337
}

L
Liu Jicong 已提交
1338
int32_t tmqAskEp(tmq_t* tmq, bool async) {
L
Liu Jicong 已提交
1339
  int32_t code = 0;
L
Liu Jicong 已提交
1340
#if 0
L
Liu Jicong 已提交
1341
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
L
fix txn  
Liu Jicong 已提交
1342
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
1343
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
S
Shengliang Guan 已提交
1344
    tscTrace("consumer:%" PRId64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
1345
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
1346
  }
L
temp  
Liu Jicong 已提交
1347
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
1348
#endif
L
Liu Jicong 已提交
1349
  int32_t      tlen = sizeof(SMqAskEpReq);
L
Liu Jicong 已提交
1350
  SMqAskEpReq* req = taosMemoryCalloc(1, tlen);
L
Liu Jicong 已提交
1351
  if (req == NULL) {
L
Liu Jicong 已提交
1352
    tscError("failed to malloc get subscribe ep buf");
L
Liu Jicong 已提交
1353
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1354
    return -1;
L
Liu Jicong 已提交
1355
  }
L
Liu Jicong 已提交
1356 1357 1358
  req->consumerId = htobe64(tmq->consumerId);
  req->epoch = htonl(tmq->epoch);
  strcpy(req->cgroup, tmq->groupId);
1359

L
Liu Jicong 已提交
1360
  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
1361 1362
  if (pParam == NULL) {
    tscError("failed to malloc subscribe param");
wafwerar's avatar
wafwerar 已提交
1363
    taosMemoryFree(req);
L
Liu Jicong 已提交
1364
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1365
    return -1;
L
Liu Jicong 已提交
1366
  }
1367 1368
  pParam->refId = tmq->refId;
  pParam->epoch = tmq->epoch;
L
Liu Jicong 已提交
1369
  pParam->async = async;
X
Xiaoyu Wang 已提交
1370
  tsem_init(&pParam->rspSem, 0, 0);
1371

1372
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1373 1374
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1375 1376
    taosMemoryFree(pParam);
    taosMemoryFree(req);
L
Liu Jicong 已提交
1377
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1378 1379 1380 1381 1382 1383 1384 1385 1386 1387
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
      .pData = req,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
L
Liu Jicong 已提交
1388 1389 1390
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1391
  sendInfo->msgType = TDMT_MND_MQ_ASK_EP;
1392

L
Liu Jicong 已提交
1393 1394
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

S
Shengliang Guan 已提交
1395
  tscDebug("consumer:%" PRId64 ", ask ep", tmq->consumerId);
L
add log  
Liu Jicong 已提交
1396

L
Liu Jicong 已提交
1397 1398
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1399

L
Liu Jicong 已提交
1400
  if (!async) {
L
Liu Jicong 已提交
1401 1402 1403 1404 1405
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }
  return code;
1406 1407
}

1408
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1409
  SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq));
1410 1411 1412
  if (pReq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
1413

L
Liu Jicong 已提交
1414 1415 1416
  /*strcpy(pReq->topic, pTopic->topicName);*/
  /*strcpy(pReq->cgroup, tmq->groupId);*/

L
Liu Jicong 已提交
1417 1418 1419 1420
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1421

1422
  pReq->withTbName = tmq->withTbName;
1423
  pReq->timeout = timeout;
L
Liu Jicong 已提交
1424
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1425
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1426
  /*pReq->currentOffset = reqOffset;*/
L
Liu Jicong 已提交
1427
  pReq->reqOffset = pVg->currentOffset;
L
Liu Jicong 已提交
1428
  pReq->reqId = generateRequestId();
1429

L
Liu Jicong 已提交
1430 1431
  pReq->useSnapshot = tmq->useSnapshot;

1432
  pReq->head.vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
1433
  pReq->head.contLen = htonl(sizeof(SMqPollReq));
1434 1435 1436
  return pReq;
}

L
Liu Jicong 已提交
1437 1438
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
L
Liu Jicong 已提交
1439
  pRspObj->resType = RES_TYPE__TMQ_META;
L
Liu Jicong 已提交
1440 1441 1442 1443 1444 1445 1446 1447
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

L
Liu Jicong 已提交
1448 1449 1450
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1451 1452
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1453
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1454
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1455
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
L
Liu Jicong 已提交
1456

L
Liu Jicong 已提交
1457 1458
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
L
Liu Jicong 已提交
1459
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1460 1461
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1462

L
Liu Jicong 已提交
1463
  return pRspObj;
X
Xiaoyu Wang 已提交
1464 1465
}

L
Liu Jicong 已提交
1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
  pRspObj->resType = RES_TYPE__TAOSX;
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;
  pRspObj->resIter = -1;
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqTaosxRspObj));

  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
  if (!pWrapper->dataRsp.withSchema) {
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }

  return pRspObj;
}

1484
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1485
  /*tscDebug("call poll");*/
X
Xiaoyu Wang 已提交
1486 1487 1488 1489 1490 1491
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
      if (vgStatus != TMQ_VG_STATUS__IDLE) {
L
Liu Jicong 已提交
1492
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
L
Liu Jicong 已提交
1493 1494
        tscTrace("consumer:%" PRId64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId,
                 vgSkipCnt);
X
Xiaoyu Wang 已提交
1495
        continue;
L
Liu Jicong 已提交
1496
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1497 1498 1499 1500
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
S
Shengliang Guan 已提交
1501
        tscDebug("consumer:%" PRId64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId);
L
temp  
Liu Jicong 已提交
1502 1503
        }
#endif
X
Xiaoyu Wang 已提交
1504
      }
L
Liu Jicong 已提交
1505
      atomic_store_32(&pVg->vgSkipCnt, 0);
1506
      SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, timeout, pTopic, pVg);
X
Xiaoyu Wang 已提交
1507 1508
      if (pReq == NULL) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1509
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1510 1511
        return -1;
      }
wafwerar's avatar
wafwerar 已提交
1512
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1513
      if (pParam == NULL) {
wafwerar's avatar
wafwerar 已提交
1514
        taosMemoryFree(pReq);
X
Xiaoyu Wang 已提交
1515
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1516
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1517 1518
        return -1;
      }
1519 1520 1521
      pParam->refId = tmq->refId;
      pParam->epoch = tmq->epoch;

L
Liu Jicong 已提交
1522
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1523
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1524
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1525

1526
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1527
      if (sendInfo == NULL) {
wafwerar's avatar
wafwerar 已提交
1528 1529
        taosMemoryFree(pReq);
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1530
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1531
        tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1532 1533 1534 1535
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1536
          .pData = pReq,
L
Liu Jicong 已提交
1537
          .len = sizeof(SMqPollReq),
X
Xiaoyu Wang 已提交
1538 1539
          .handle = NULL,
      };
L
Liu Jicong 已提交
1540
      sendInfo->requestId = pReq->reqId;
X
Xiaoyu Wang 已提交
1541
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1542
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1543
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1544
      sendInfo->msgType = TDMT_VND_CONSUME;
X
Xiaoyu Wang 已提交
1545 1546

      int64_t transporterId = 0;
L
fix  
Liu Jicong 已提交
1547
      /*printf("send poll\n");*/
L
Liu Jicong 已提交
1548

1549
      char offsetFormatBuf[80];
L
Liu Jicong 已提交
1550
      tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffset);
L
Liu Jicong 已提交
1551 1552
      tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64,
               tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId);
S
Shengliang Guan 已提交
1553
      /*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/
X
Xiaoyu Wang 已提交
1554 1555 1556 1557 1558 1559 1560 1561
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
  return 0;
}

L
Liu Jicong 已提交
1562 1563
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1564
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1565 1566
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1567
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
L
Liu Jicong 已提交
1568
      tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1569
      /*tmqClearUnhandleMsg(tmq);*/
X
Xiaoyu Wang 已提交
1570 1571 1572 1573 1574 1575 1576 1577 1578 1579
      *pReset = true;
    } else {
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
1580
void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
X
Xiaoyu Wang 已提交
1581
  while (1) {
L
Liu Jicong 已提交
1582 1583 1584
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1585
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1586 1587
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
      if (rspWrapper == NULL) return NULL;
X
Xiaoyu Wang 已提交
1588 1589
    }

L
Liu Jicong 已提交
1590 1591 1592 1593 1594
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(rspWrapper);
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
      return NULL;
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1595
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1596
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
1597
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1598
      if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1599
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1600
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1601
         * rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1602
        pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;
X
Xiaoyu Wang 已提交
1603
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1604
        if (pollRspWrapper->dataRsp.blockNum == 0) {
L
Liu Jicong 已提交
1605 1606
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1607 1608
          continue;
        }
L
Liu Jicong 已提交
1609
        // build rsp
L
Liu Jicong 已提交
1610
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1611
        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1612
        return pRsp;
X
Xiaoyu Wang 已提交
1613
      } else {
L
Liu Jicong 已提交
1614 1615 1616 1617 1618 1619 1620
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
                 pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
        taosFreeQitem(pollRspWrapper);
      }
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1621
      if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1622
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1623
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1624
         * rspMsg->msg.rspOffset);*/
wmmhello's avatar
wmmhello 已提交
1625
        pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset;
L
Liu Jicong 已提交
1626 1627
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // build rsp
L
Liu Jicong 已提交
1628
        SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1629 1630 1631 1632
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
L
Liu Jicong 已提交
1633
                 pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1634
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1635
      }
L
Liu Jicong 已提交
1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
      if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
         * rspMsg->msg.rspOffset);*/
        pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset;
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        if (pollRspWrapper->taosxRsp.blockNum == 0) {
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
          continue;
        }
        // build rsp
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
                 pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
        taosFreeQitem(pollRspWrapper);
      }
X
Xiaoyu Wang 已提交
1660
    } else {
L
fix  
Liu Jicong 已提交
1661
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1662
      bool reset = false;
L
Liu Jicong 已提交
1663 1664
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1665
      if (pollIfReset && reset) {
S
Shengliang Guan 已提交
1666
        tscDebug("consumer:%" PRId64 ", reset and repoll", tmq->consumerId);
1667
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
1668 1669 1670 1671 1672
      }
    }
  }
}

1673
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1674
  /*tscDebug("call poll1");*/
L
Liu Jicong 已提交
1675 1676
  void*   rspObj;
  int64_t startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1677

1678 1679 1680
#if 0
  tmqHandleAllDelayedTask(tmq);
  tmqPollImpl(tmq, timeout);
1681
  rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1682 1683
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1684
  }
1685
#endif
X
Xiaoyu Wang 已提交
1686

1687
  // in no topic status, delayed task also need to be processed
L
Liu Jicong 已提交
1688
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1689 1690 1691
    return NULL;
  }

L
Liu Jicong 已提交
1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
    int32_t retryCnt = 0;
    while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
      if (retryCnt++ > 10) {
        return NULL;
      }
      tscDebug("consumer not ready, retry");
      taosMsleep(500);
    }
  }

X
Xiaoyu Wang 已提交
1703
  while (1) {
L
Liu Jicong 已提交
1704
    tmqHandleAllDelayedTask(tmq);
1705
    if (tmqPollImpl(tmq, timeout) < 0) return NULL;
L
Liu Jicong 已提交
1706

1707
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1708 1709
    if (rspObj) {
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
1710 1711
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      return NULL;
X
Xiaoyu Wang 已提交
1712
    }
1713
    if (timeout != -1) {
X
Xiaoyu Wang 已提交
1714
      int64_t endTime = taosGetTimestampMs();
1715
      int64_t leftTime = endTime - startTime;
1716
      if (leftTime > timeout) {
S
Shengliang Guan 已提交
1717
        tscDebug("consumer:%" PRId64 ", (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
X
Xiaoyu Wang 已提交
1718 1719
        return NULL;
      }
1720
      tsem_timewait(&tmq->rspSem, leftTime * 1000);
L
Liu Jicong 已提交
1721 1722 1723
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
      tsem_timewait(&tmq->rspSem, 500 * 1000);
X
Xiaoyu Wang 已提交
1724 1725 1726 1727
    }
  }
}

L
Liu Jicong 已提交
1728
int32_t tmq_consumer_close(tmq_t* tmq) {
1729
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
L
Liu Jicong 已提交
1730 1731
    int32_t rsp = tmq_commit_sync(tmq, NULL);
    if (rsp != 0) {
L
Liu Jicong 已提交
1732
      return rsp;
1733 1734
    }

L
Liu Jicong 已提交
1735
    int32_t     retryCnt = 0;
1736
    tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
1737 1738 1739 1740 1741 1742 1743 1744 1745 1746
    while (1) {
      rsp = tmq_subscribe(tmq, lst);
      if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
        break;
      } else {
        retryCnt++;
        taosMsleep(500);
      }
    }

1747
    tmq_list_destroy(lst);
1748

L
Liu Jicong 已提交
1749 1750
    /*return rsp;*/
    return 0;
L
Liu Jicong 已提交
1751
  }
1752
  taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
L
Liu Jicong 已提交
1753
  return 0;
1754
}
L
Liu Jicong 已提交
1755

L
Liu Jicong 已提交
1756 1757
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
1758
    return "success";
L
Liu Jicong 已提交
1759
  } else if (err == -1) {
L
Liu Jicong 已提交
1760 1761 1762
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
1763 1764
  }
}
L
Liu Jicong 已提交
1765

L
Liu Jicong 已提交
1766 1767 1768 1769
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
wmmhello's avatar
wmmhello 已提交
1770 1771
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DELETE) {
L
Liu Jicong 已提交
1772
      return TMQ_RES_TAOSX;
wmmhello's avatar
wmmhello 已提交
1773
    }
L
Liu Jicong 已提交
1774
    return TMQ_RES_TABLE_META;
L
Liu Jicong 已提交
1775 1776
  } else if (TD_RES_TMQ_TAOSX(res)) {
    return TMQ_RES_TAOSX;
L
Liu Jicong 已提交
1777 1778 1779 1780 1781
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
1782
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
1783 1784
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
1785
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1786 1787 1788
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1789 1790 1791 1792 1793
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1794 1795 1796 1797
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1798 1799 1800
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1801 1802 1803 1804 1805
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1806 1807 1808 1809
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
1810 1811 1812
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
L
Liu Jicong 已提交
1813 1814 1815 1816
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
1817 1818 1819 1820 1821 1822 1823 1824

const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
1825
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
L
Liu Jicong 已提交
1826 1827 1828
  }
  return NULL;
}
1829

L
Liu Jicong 已提交
1830
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
1831
  //
L
Liu Jicong 已提交
1832
  tmqCommitInner(tmq, msg, 0, 1, cb, param);
L
Liu Jicong 已提交
1833 1834
}

1835 1836
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) {
  //
L
Liu Jicong 已提交
1837
  return tmqCommitInner(tmq, msg, 0, 0, NULL, NULL);
1838
}