clientTmq.c 64.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "cJSON.h"
17 18 19
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22 23
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
24
#include "tqueue.h"
25
#include "tref.h"
L
Liu Jicong 已提交
26 27
#include "ttimer.h"

L
Liu Jicong 已提交
28 29 30 31 32 33 34
#if 0
#undef tsem_post
#define tsem_post(x)                                         \
  tscInfo("call sem post at %s %d", __FUNCTION__, __LINE__); \
  sem_post(x)
#endif

X
Xiaoyu Wang 已提交
35
struct SMqMgmt {
36 37 38
  int8_t  inited;
  tmr_h   timer;
  int32_t rsetId;
39
};
L
Liu Jicong 已提交
40

X
Xiaoyu Wang 已提交
41 42
static TdThreadOnce   tmqInit = PTHREAD_ONCE_INIT;  // initialize only once
volatile int32_t      tmqInitRes = 0;               // initialize rsp code
43
static struct SMqMgmt tmqMgmt = {0};
44

L
Liu Jicong 已提交
45 46 47 48 49 50
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
51 52 53
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
54 55
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
56
struct tmq_list_t {
L
Liu Jicong 已提交
57
  SArray container;
L
Liu Jicong 已提交
58
};
L
Liu Jicong 已提交
59

L
Liu Jicong 已提交
60
struct tmq_conf_t {
61 62 63 64 65 66 67 68
  char           clientId[256];
  char           groupId[TSDB_CGROUP_LEN];
  int8_t         autoCommit;
  int8_t         resetOffset;
  int8_t         withTbName;
  int8_t         snapEnable;
  int32_t        snapBatchSize;
  bool           hbBgEnable;
69 70 71 72 73
  uint16_t       port;
  int32_t        autoCommitInterval;
  char*          ip;
  char*          user;
  char*          pass;
74
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
75
  void*          commitCbUserParam;
L
Liu Jicong 已提交
76 77 78
};

struct tmq_t {
79
  int64_t refId;
L
Liu Jicong 已提交
80
  // conf
81 82 83 84 85 86 87 88 89
  char     groupId[TSDB_CGROUP_LEN];
  char     clientId[256];
  int8_t   withTbName;
  int8_t   useSnapshot;
  int8_t   autoCommit;
  int32_t  autoCommitInterval;
  int32_t  resetOffsetCfg;
  uint64_t consumerId;
  bool     hbBgEnable;
90

L
Liu Jicong 已提交
91 92
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
93 94 95 96

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
97 98
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
99
  int32_t epSkipCnt;
L
Liu Jicong 已提交
100
#endif
L
Liu Jicong 已提交
101 102
  int64_t pollCnt;

L
Liu Jicong 已提交
103
  // timer
104 105
  tmr_h hbLiveTimer;
  tmr_h epTimer;
L
Liu Jicong 已提交
106 107 108
  tmr_h reportTimer;
  tmr_h commitTimer;

H
Haojun Liao 已提交
109 110 111 112 113 114 115
  STscObj*      pTscObj;       // connection
  SArray*       clientTopics;  // SArray<SMqClientTopic>
  STaosQueue*   mqueue;        // queue of rsp
  STaosQall*    qall;
  STaosQueue*   delayedTask;   // delayed task queue for heartbeat and auto commit
  TdThreadMutex lock;          // used to protect the operation on each topic, when updating the epsets.
  tsem_t        rspSem;
L
Liu Jicong 已提交
116 117
};

X
Xiaoyu Wang 已提交
118 119 120 121 122 123 124 125
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
126
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
127
  TMQ_CONSUMER_STATUS__RECOVER,
L
Liu Jicong 已提交
128 129
};

L
Liu Jicong 已提交
130
enum {
131
  TMQ_DELAYED_TASK__ASK_EP = 1,
L
Liu Jicong 已提交
132 133 134 135
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

L
Liu Jicong 已提交
136
typedef struct {
137 138 139
  // statistics
  int64_t pollCnt;
  // offset
L
Liu Jicong 已提交
140 141
  STqOffsetVal committedOffset;
  STqOffsetVal currentOffset;
L
Liu Jicong 已提交
142
  // connection info
143
  int32_t vgId;
X
Xiaoyu Wang 已提交
144
  int32_t vgStatus;
L
Liu Jicong 已提交
145
  int32_t vgSkipCnt;
146 147 148
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
149
typedef struct {
150 151 152
  char           topicName[TSDB_TOPIC_FNAME_LEN];
  char           db[TSDB_DB_FNAME_LEN];
  SArray*        vgs;  // SArray<SMqClientVg>
L
Liu Jicong 已提交
153
  SSchemaWrapper schema;
154 155
} SMqClientTopic;

L
Liu Jicong 已提交
156 157 158 159 160
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
161
  union {
L
Liu Jicong 已提交
162 163
    SMqDataRsp dataRsp;
    SMqMetaRsp metaRsp;
L
Liu Jicong 已提交
164
    STaosxRsp  taosxRsp;
L
Liu Jicong 已提交
165
  };
L
Liu Jicong 已提交
166 167
} SMqPollRspWrapper;

L
Liu Jicong 已提交
168
typedef struct {
169 170
  int64_t refId;
  int32_t epoch;
L
Liu Jicong 已提交
171 172
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
173
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
174

L
Liu Jicong 已提交
175
typedef struct {
176 177
  int64_t refId;
  int32_t epoch;
L
Liu Jicong 已提交
178
  int32_t code;
L
Liu Jicong 已提交
179
  int32_t async;
X
Xiaoyu Wang 已提交
180
  tsem_t  rspSem;
181 182
} SMqAskEpCbParam;

L
Liu Jicong 已提交
183
typedef struct {
184 185
  int64_t         refId;
  int32_t         epoch;
L
Liu Jicong 已提交
186
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
187
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
188
  int32_t         vgId;
L
Liu Jicong 已提交
189
  tsem_t          rspSem;
H
Haojun Liao 已提交
190
  uint64_t        requestId; // request id for debug purpose
X
Xiaoyu Wang 已提交
191
} SMqPollCbParam;
192

193
typedef struct {
194 195
  int64_t        refId;
  int32_t        epoch;
L
Liu Jicong 已提交
196 197
  int8_t         automatic;
  int8_t         async;
L
Liu Jicong 已提交
198 199
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
L
Liu Jicong 已提交
200
  int32_t        rspErr;
201
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
202 203 204 205
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
  void*  userParam;
  tsem_t rspSem;
206 207 208 209 210
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
  STqOffset*           pOffset;
H
Haojun Liao 已提交
211 212 213
  char                 topicName[TSDB_TOPIC_FNAME_LEN];
  int32_t              vgId;
  tmq_t*               pTmq;
214
} SMqCommitCbParam;
215

216
static int32_t tmqAskEp(tmq_t* tmq, bool async);
217 218
static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg);
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet);
219

220
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
221
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
222 223 224 225 226
  if (conf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return conf;
  }

227
  conf->withTbName = false;
L
Liu Jicong 已提交
228
  conf->autoCommit = true;
L
Liu Jicong 已提交
229
  conf->autoCommitInterval = 5000;
X
Xiaoyu Wang 已提交
230
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
231
  conf->hbBgEnable = true;
232

233 234 235
  return conf;
}

L
Liu Jicong 已提交
236
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
237 238 239 240 241 242
  if (conf) {
    if (conf->ip) taosMemoryFree(conf->ip);
    if (conf->user) taosMemoryFree(conf->user);
    if (conf->pass) taosMemoryFree(conf->pass);
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
243 244 245
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
246
  if (strcmp(key, "group.id") == 0) {
L
Liu Jicong 已提交
247
    tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN);
L
Liu Jicong 已提交
248
    return TMQ_CONF_OK;
249
  }
L
Liu Jicong 已提交
250

251
  if (strcmp(key, "client.id") == 0) {
L
Liu Jicong 已提交
252
    tstrncpy(conf->clientId, value, 256);
L
Liu Jicong 已提交
253 254
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
255

L
Liu Jicong 已提交
256 257
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
258
      conf->autoCommit = true;
L
Liu Jicong 已提交
259 260
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
261
      conf->autoCommit = false;
L
Liu Jicong 已提交
262 263 264 265
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
266
  }
L
Liu Jicong 已提交
267

L
Liu Jicong 已提交
268 269 270 271 272
  if (strcmp(key, "auto.commit.interval.ms") == 0) {
    conf->autoCommitInterval = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
273 274 275 276 277 278 279 280 281 282 283 284 285 286
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
287

288 289
  if (strcmp(key, "msg.with.table.name") == 0) {
    if (strcmp(value, "true") == 0) {
290
      conf->withTbName = true;
L
Liu Jicong 已提交
291
      return TMQ_CONF_OK;
292
    } else if (strcmp(value, "false") == 0) {
293
      conf->withTbName = false;
L
Liu Jicong 已提交
294
      return TMQ_CONF_OK;
295 296 297 298 299
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
300
  if (strcmp(key, "experimental.snapshot.enable") == 0) {
L
Liu Jicong 已提交
301
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
302
      conf->snapEnable = true;
303 304
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
305
      conf->snapEnable = false;
306 307 308 309 310 311
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
312 313 314 315 316
  if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
    conf->snapBatchSize = atoi(value);
    return TMQ_CONF_OK;
  }

317 318 319
  if (strcmp(key, "enable.heartbeat.background") == 0) {
    if (strcmp(value, "true") == 0) {
      conf->hbBgEnable = true;
L
Liu Jicong 已提交
320 321
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
322
      conf->hbBgEnable = false;
L
Liu Jicong 已提交
323 324 325 326
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
327
    return TMQ_CONF_OK;
L
Liu Jicong 已提交
328 329
  }

L
Liu Jicong 已提交
330
  if (strcmp(key, "td.connect.ip") == 0) {
331
    conf->ip = taosStrdup(value);
L
Liu Jicong 已提交
332 333
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
334
  if (strcmp(key, "td.connect.user") == 0) {
335
    conf->user = taosStrdup(value);
L
Liu Jicong 已提交
336 337
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
338
  if (strcmp(key, "td.connect.pass") == 0) {
339
    conf->pass = taosStrdup(value);
L
Liu Jicong 已提交
340 341
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
342
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
343 344 345
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
346
  if (strcmp(key, "td.connect.db") == 0) {
347
    /*conf->db = taosStrdup(value);*/
L
Liu Jicong 已提交
348 349 350
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
351
  return TMQ_CONF_UNKNOWN;
352 353 354
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
355 356
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
357 358
}

L
Liu Jicong 已提交
359 360
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
361
  if (src == NULL || src[0] == 0) return -1;
362
  char* topic = taosStrdup(src);
363 364 365
  if (topic[0] != '`') {
    strtolower(topic, src);
  }
L
fix  
Liu Jicong 已提交
366
  if (taosArrayPush(container, &topic) == NULL) return -1;
367 368 369
  return 0;
}

L
Liu Jicong 已提交
370
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
371
  SArray* container = &list->container;
L
Liu Jicong 已提交
372
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
373 374
}

L
Liu Jicong 已提交
375 376 377 378 379 380 381 382 383 384
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

L
Liu Jicong 已提交
385 386 387 388 389 390 391
static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet) {
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
  if (waitingRspNum == 0) {
    tmqCommitDone(pParamSet);
  }
}

H
Haojun Liao 已提交
392
static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
393
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
394
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
H
Haojun Liao 已提交
395

396
  // push into array
L
Liu Jicong 已提交
397
#if 0
398 399 400 401 402
  if (code == 0) {
    taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset);
  } else {
    taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
  }
L
Liu Jicong 已提交
403
#endif
L
Liu Jicong 已提交
404

H
Haojun Liao 已提交
405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430
  if (pBuf->pEpSet != NULL) {
    // todo extract method
    taosThreadMutexLock(&pParam->pTmq->lock);

    int32_t numOfTopics = taosArrayGetSize(pParam->pTmq->clientTopics);
    for(int32_t i = 0; i < numOfTopics; ++i) {
      SMqClientTopic* pTopic = taosArrayGet(pParam->pTmq->clientTopics, i);
      if (strcmp(pTopic->topicName, pParam->topicName) != 0) {
        continue;
      }

      int32_t numOfVgs = taosArrayGetSize(pTopic->vgs);
      for(int32_t j = 0; j < numOfVgs; ++j) {
        SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
        if (pClientVg->vgId == pParam->vgId) {
          SEp* pEp = GET_ACTIVE_EP(pBuf->pEpSet);
          SEp* pOld = GET_ACTIVE_EP(&(pClientVg->epSet));
          uDebug("subKey:%s update the epset vgId:%d, ep:%s:%d, old ep:%s:%d", pParam->pOffset->subKey, pParam->vgId,
                 pEp->fqdn, pEp->port, pOld->fqdn, pOld->port);
          pClientVg->epSet = *pBuf->pEpSet;
          break;
        }
      }

      break;
    }
431

H
Haojun Liao 已提交
432
    taosThreadMutexUnlock(&pParam->pTmq->lock);
433 434
  }

L
Liu Jicong 已提交
435
  taosMemoryFree(pParam->pOffset);
L
Liu Jicong 已提交
436
  taosMemoryFree(pBuf->pData);
dengyihao's avatar
dengyihao 已提交
437
  taosMemoryFree(pBuf->pEpSet);
L
Liu Jicong 已提交
438

S
Shengliang Guan 已提交
439
  /*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
L
Liu Jicong 已提交
440 441
   * pOffset->version);*/

L
Liu Jicong 已提交
442
  tmqCommitRspCountDown(pParamSet);
443 444 445
  return 0;
}

H
Haojun Liao 已提交
446
static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet) {
L
Liu Jicong 已提交
447 448 449 450 451
  STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
  if (pOffset == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
452

L
Liu Jicong 已提交
453
  pOffset->val = pVg->currentOffset;
454

L
Liu Jicong 已提交
455 456 457
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pOffset->subKey, tmq->groupId, groupLen);
  pOffset->subKey[groupLen] = TMQ_SEPARATOR;
H
Haojun Liao 已提交
458
  strcpy(pOffset->subKey + groupLen + 1, pTopicName);
L
Liu Jicong 已提交
459

L
Liu Jicong 已提交
460 461 462 463 464 465
  int32_t len;
  int32_t code;
  tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
  if (code < 0) {
    return -1;
  }
466

L
Liu Jicong 已提交
467
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
L
Liu Jicong 已提交
468 469 470 471
  if (buf == NULL) {
    taosMemoryFree(pOffset);
    return -1;
  }
472

L
Liu Jicong 已提交
473
  ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
474

L
Liu Jicong 已提交
475 476 477 478 479
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
  tEncodeSTqOffset(&encoder, pOffset);
L
Liu Jicong 已提交
480
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
481 482

  // build param
483
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
484
  if (pParam == NULL) {
L
Liu Jicong 已提交
485
    taosMemoryFree(pOffset);
L
Liu Jicong 已提交
486 487 488
    taosMemoryFree(buf);
    return -1;
  }
489

L
Liu Jicong 已提交
490 491
  pParam->params = pParamSet;
  pParam->pOffset = pOffset;
H
Haojun Liao 已提交
492 493 494
  pParam->vgId = pVg->vgId;
  pParam->pTmq = tmq;

H
Haojun Liao 已提交
495
  tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName));
L
Liu Jicong 已提交
496 497 498 499

  // build send info
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
L
Liu Jicong 已提交
500
    taosMemoryFree(pOffset);
L
Liu Jicong 已提交
501 502
    taosMemoryFree(buf);
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
503 504
    return -1;
  }
505

L
Liu Jicong 已提交
506 507 508 509 510 511
  pMsgSendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = sizeof(SMsgHead) + len,
      .handle = NULL,
  };

H
Haojun Liao 已提交
512
  SEp* pEp = GET_ACTIVE_EP(&pVg->epSet);
X
Xiaoyu Wang 已提交
513 514
  tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d offset:%" PRId64 " prev:%" PRId64 ", ep:%s:%d", tmq->consumerId,
           pOffset->subKey, pVg->vgId, pOffset->val.version, pVg->committedOffset.version, pEp->fqdn, pEp->port);
L
Liu Jicong 已提交
515

516
  // TODO: put into cb, the commit offset should be move to the callback function
L
Liu Jicong 已提交
517
  pVg->committedOffset = pVg->currentOffset;
L
Liu Jicong 已提交
518 519 520 521

  pMsgSendInfo->requestId = generateRequestId();
  pMsgSendInfo->requestObjRefId = 0;
  pMsgSendInfo->param = pParam;
L
Liu Jicong 已提交
522
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
523
  pMsgSendInfo->fp = tmqCommitCb;
L
Liu Jicong 已提交
524
  pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET;
L
Liu Jicong 已提交
525 526
  // send msg

L
Liu Jicong 已提交
527 528 529
  atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
  atomic_add_fetch_32(&pParamSet->totalRspNum, 1);

L
Liu Jicong 已提交
530 531 532 533 534
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
  return 0;
}

H
Haojun Liao 已提交
535
static int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
L
Liu Jicong 已提交
536 537 538 539 540 541 542 543 544 545
  char*   topic;
  int32_t vgId;
  if (TD_RES_TMQ(msg)) {
    SMqRspObj* pRspObj = (SMqRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
  } else if (TD_RES_TMQ_META(msg)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
    topic = pMetaRspObj->topic;
    vgId = pMetaRspObj->vgId;
L
Liu Jicong 已提交
546
  } else if (TD_RES_TMQ_METADATA(msg)) {
547 548 549
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
L
Liu Jicong 已提交
550 551 552 553 554 555 556 557 558
  } else {
    return TSDB_CODE_TMQ_INVALID_MSG;
  }

  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
H
Haojun Liao 已提交
559

560 561
  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;
L
Liu Jicong 已提交
562 563 564 565 566 567
  pParamSet->automatic = 0;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

L
Liu Jicong 已提交
568 569
  int32_t code = -1;

H
Haojun Liao 已提交
570
  taosThreadMutexLock(&tmq->lock);
L
Liu Jicong 已提交
571 572 573 574 575
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(pTopic->topicName, topic) != 0) continue;
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
H
Haojun Liao 已提交
576 577 578
      if (pVg->vgId != vgId) {
        continue;
      }
L
Liu Jicong 已提交
579

L
Liu Jicong 已提交
580
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
H
Haojun Liao 已提交
581
        if (tmqSendCommitReq(tmq, pVg, pTopic->topicName, pParamSet) < 0) {
L
Liu Jicong 已提交
582 583
          tsem_destroy(&pParamSet->rspSem);
          taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
584
          goto FAIL;
L
Liu Jicong 已提交
585
        }
L
Liu Jicong 已提交
586
        goto HANDLE_RSP;
L
Liu Jicong 已提交
587 588
      }
    }
L
Liu Jicong 已提交
589
  }
L
Liu Jicong 已提交
590

L
Liu Jicong 已提交
591 592 593 594
HANDLE_RSP:
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
H
Haojun Liao 已提交
595
    taosThreadMutexUnlock(&tmq->lock);
L
Liu Jicong 已提交
596 597 598
    return 0;
  }

L
Liu Jicong 已提交
599 600 601 602
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
L
Liu Jicong 已提交
603
    taosMemoryFree(pParamSet);
H
Haojun Liao 已提交
604
    taosThreadMutexUnlock(&tmq->lock);
L
Liu Jicong 已提交
605 606 607 608 609 610
    return code;
  } else {
    code = 0;
  }

FAIL:
H
Haojun Liao 已提交
611
  taosThreadMutexUnlock(&tmq->lock);
L
Liu Jicong 已提交
612 613 614
  if (code != 0 && async) {
    userCb(tmq, code, userParam);
  }
H
Haojun Liao 已提交
615

L
Liu Jicong 已提交
616 617 618
  return 0;
}

L
Liu Jicong 已提交
619 620
static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                                     void* userParam) {
L
Liu Jicong 已提交
621 622
  int32_t code = -1;

623 624
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
L
Liu Jicong 已提交
625 626 627 628 629 630 631 632
    code = TSDB_CODE_OUT_OF_MEMORY;
    if (async) {
      if (automatic) {
        tmq->commitCb(tmq, code, tmq->commitCbUserParam);
      } else {
        userCb(tmq, code, userParam);
      }
    }
633 634
    return -1;
  }
635 636 637 638

  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;

639 640 641 642 643 644
  pParamSet->automatic = automatic;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

645 646 647
  // init as 1 to prevent concurrency issue
  pParamSet->waitingRspNum = 1;

H
Haojun Liao 已提交
648
  taosThreadMutexLock(&tmq->lock);
649
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
H
Haojun Liao 已提交
650

X
Xiaoyu Wang 已提交
651
  tscDebug("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
652 653

  for (int32_t i = 0; i < numOfTopics; i++) {
654
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
655

H
Haojun Liao 已提交
656
    // todo race condition: fix it
657 658
    int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
    for (int32_t j = 0; j < numOfVgroups; j++) {
H
Haojun Liao 已提交
659 660 661
      SMqClientVg clientVg = *(SMqClientVg*)taosArrayGet(pTopic->vgs, j);
      if (clientVg.currentOffset.type > 0 && !tOffsetEqual(&clientVg.currentOffset, &clientVg.committedOffset)) {
        if (tmqSendCommitReq(tmq, &clientVg, pTopic->topicName, pParamSet) < 0) {
L
Liu Jicong 已提交
662 663
          continue;
        }
664 665
      } else {
        tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, not commit, current:%" PRId64 ", ordinal:%d/%d",
H
Haojun Liao 已提交
666
                 tmq->consumerId, pTopic->topicName, clientVg.vgId, clientVg.currentOffset.version, j + 1, numOfVgroups);
667 668 669 670
      }
    }
  }

H
Haojun Liao 已提交
671 672
  taosThreadMutexUnlock(&tmq->lock);

L
Liu Jicong 已提交
673
  // no request is sent
L
Liu Jicong 已提交
674 675 676 677 678 679
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
    return 0;
  }

L
Liu Jicong 已提交
680 681
  // count down since waiting rsp num init as 1
  tmqCommitRspCountDown(pParamSet);
682

683 684 685 686
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
687
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
688
#if 0
689 690
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
L
Liu Jicong 已提交
691
#endif
L
Liu Jicong 已提交
692
  }
693

L
Liu Jicong 已提交
694 695 696
  return code;
}

697 698
static int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                              void* userParam) {
L
Liu Jicong 已提交
699 700 701 702 703
  if (msg) {
    return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
  } else {
    return tmqCommitConsumerImpl(tmq, automatic, async, userCb, userParam);
  }
704 705
}

706
void tmqAssignAskEpTask(void* param, void* tmrId) {
707 708 709
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
S
Shengliang Guan 已提交
710
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
711 712 713 714 715
    *pTaskType = TMQ_DELAYED_TASK__ASK_EP;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
716 717 718
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
719 720 721
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
S
Shengliang Guan 已提交
722
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
723 724 725 726 727
    *pTaskType = TMQ_DELAYED_TASK__COMMIT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
728 729 730
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
731 732 733
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
S
Shengliang Guan 已提交
734
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
735 736 737 738 739
    *pTaskType = TMQ_DELAYED_TASK__REPORT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
740 741
}

742
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
dengyihao's avatar
dengyihao 已提交
743 744 745 746
  if (pMsg) {
    taosMemoryFree(pMsg->pData);
    taosMemoryFree(pMsg->pEpSet);
  }
747 748 749 750
  return 0;
}

void tmqSendHbReq(void* param, void* tmrId) {
751 752 753
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq == NULL) {
L
Liu Jicong 已提交
754
    taosMemoryFree(param);
755 756
    return;
  }
D
dapan1121 已提交
757 758 759 760 761

  SMqHbReq req = {0};
  req.consumerId = tmq->consumerId;
  req.epoch = tmq->epoch;

L
Liu Jicong 已提交
762
  int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
D
dapan1121 已提交
763 764 765 766
  if (tlen < 0) {
    tscError("tSerializeSMqHbReq failed");
    return;
  }
L
Liu Jicong 已提交
767
  void* pReq = taosMemoryCalloc(1, tlen);
D
dapan1121 已提交
768 769 770 771 772 773 774 775 776
  if (tlen < 0) {
    tscError("failed to malloc MqHbReq msg, size:%d", tlen);
    return;
  }
  if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) {
    tscError("tSerializeSMqHbReq %d failed", tlen);
    taosMemoryFree(pReq);
    return;
  }
777 778 779 780

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
781
    goto OVER;
782 783 784
  }
  sendInfo->msgInfo = (SDataBuf){
      .pData = pReq,
D
dapan1121 已提交
785
      .len = tlen,
786 787 788 789 790 791 792
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = NULL;
  sendInfo->fp = tmqHbCb;
L
Liu Jicong 已提交
793
  sendInfo->msgType = TDMT_MND_TMQ_HB;
794 795 796 797 798 799 800

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

OVER:
801
  taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer);
802 803
}

804
int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
L
Liu Jicong 已提交
805
  STaosQall* qall = taosAllocateQall();
806
  taosReadAllQitems(pTmq->delayedTask, qall);
L
Liu Jicong 已提交
807

808 809 810 811
  if (qall->numOfItems == 0) {
    taosFreeQall(qall);
    return TSDB_CODE_SUCCESS;
  }
812

X
Xiaoyu Wang 已提交
813
  tscDebug("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, qall->numOfItems);
814 815
  int8_t* pTaskType = NULL;
  taosGetQitem(qall, (void**)&pTaskType);
L
Liu Jicong 已提交
816

817
  while (pTaskType != NULL) {
818
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
819
      tmqAskEp(pTmq, true);
820 821

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
822
      *pRefId = pTmq->refId;
823

X
Xiaoyu Wang 已提交
824
      tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
825
      taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer);
L
Liu Jicong 已提交
826
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
827
      tmqCommitInner(pTmq, NULL, 1, 1, pTmq->commitCb, pTmq->commitCbUserParam);
828 829

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
830
      *pRefId = pTmq->refId;
831

X
Xiaoyu Wang 已提交
832
      tscDebug("consumer:0x%" PRIx64 " commit to vnode(s) in %.2fs", pTmq->consumerId,
X
Xiaoyu Wang 已提交
833
               pTmq->autoCommitInterval / 1000.0);
834
      taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer);
L
Liu Jicong 已提交
835 836
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    }
837

L
Liu Jicong 已提交
838
    taosFreeQitem(pTaskType);
839
    taosGetQitem(qall, (void**)&pTaskType);
L
Liu Jicong 已提交
840
  }
841

L
Liu Jicong 已提交
842 843 844 845
  taosFreeQall(qall);
  return 0;
}

L
Liu Jicong 已提交
846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872
static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
    // do nothing
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
    SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
    tDeleteSMqAskEpRsp(&pEpRspWrapper->msg);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
    taosArrayDestroyP(pRsp->dataRsp.blockData, taosMemoryFree);
    taosArrayDestroy(pRsp->dataRsp.blockDataLen);
    taosArrayDestroyP(pRsp->dataRsp.blockTbName, taosMemoryFree);
    taosArrayDestroyP(pRsp->dataRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
    taosMemoryFree(pRsp->metaRsp.metaRsp);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
    taosArrayDestroyP(pRsp->taosxRsp.blockData, taosMemoryFree);
    taosArrayDestroy(pRsp->taosxRsp.blockDataLen);
    taosArrayDestroyP(pRsp->taosxRsp.blockTbName, taosMemoryFree);
    taosArrayDestroyP(pRsp->taosxRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
    // taosx
    taosArrayDestroy(pRsp->taosxRsp.createTableLen);
    taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree);
  }
}

L
Liu Jicong 已提交
873
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
874
  SMqRspWrapper* rspWrapper = NULL;
L
Liu Jicong 已提交
875
  while (1) {
L
Liu Jicong 已提交
876 877 878 879 880
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper) {
      tmqFreeRspWrapper(rspWrapper);
      taosFreeQitem(rspWrapper);
    } else {
L
Liu Jicong 已提交
881
      break;
L
Liu Jicong 已提交
882
    }
L
Liu Jicong 已提交
883 884
  }

L
Liu Jicong 已提交
885
  rspWrapper = NULL;
L
Liu Jicong 已提交
886 887
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
L
Liu Jicong 已提交
888 889 890 891 892
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper) {
      tmqFreeRspWrapper(rspWrapper);
      taosFreeQitem(rspWrapper);
    } else {
L
Liu Jicong 已提交
893
      break;
L
Liu Jicong 已提交
894
    }
L
Liu Jicong 已提交
895 896 897
  }
}

D
dapan1121 已提交
898
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
899 900
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
dengyihao's avatar
dengyihao 已提交
901 902

  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
903 904 905
  tsem_post(&pParam->rspSem);
  return 0;
}
906

L
Liu Jicong 已提交
907
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
X
Xiaoyu Wang 已提交
908 909 910 911
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
912
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
913
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
914
  }
L
Liu Jicong 已提交
915
  return 0;
X
Xiaoyu Wang 已提交
916 917
}

L
Liu Jicong 已提交
918
int32_t tmq_unsubscribe(tmq_t* tmq) {
L
Liu Jicong 已提交
919 920
  int32_t     rsp;
  int32_t     retryCnt = 0;
L
Liu Jicong 已提交
921
  tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
922 923 924 925 926 927 928 929 930 931
  while (1) {
    rsp = tmq_subscribe(tmq, lst);
    if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
      break;
    } else {
      retryCnt++;
      taosMsleep(500);
    }
  }

L
Liu Jicong 已提交
932 933
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
934 935
}

936 937
void tmqFreeImpl(void* handle) {
  tmq_t* tmq = (tmq_t*)handle;
L
Liu Jicong 已提交
938

939
  // TODO stop timer
L
Liu Jicong 已提交
940 941 942 943
  if (tmq->mqueue) {
    tmqClearUnhandleMsg(tmq);
    taosCloseQueue(tmq->mqueue);
  }
L
Liu Jicong 已提交
944

H
Haojun Liao 已提交
945 946 947 948 949
  if (tmq->delayedTask) {
    taosCloseQueue(tmq->delayedTask);
  }

  taosFreeQall(tmq->qall);
950
  tsem_destroy(&tmq->rspSem);
H
Haojun Liao 已提交
951
  taosThreadMutexDestroy(&tmq->lock);
L
Liu Jicong 已提交
952

953 954 955
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
956
    taosMemoryFreeClear(pTopic->schema.pSchema);
957 958
    taosArrayDestroy(pTopic->vgs);
  }
H
Haojun Liao 已提交
959

960 961 962
  taosArrayDestroy(tmq->clientTopics);
  taos_close_internal(tmq->pTscObj);
  taosMemoryFree(tmq);
L
Liu Jicong 已提交
963 964
}

965 966 967 968 969 970 971 972 973
static void tmqMgmtInit(void) {
  tmqInitRes = 0;
  tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");

  if (tmqMgmt.timer == NULL) {
    tmqInitRes = TSDB_CODE_OUT_OF_MEMORY;
  }

  tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
974
  if (tmqMgmt.rsetId < 0) {
975 976 977 978
    tmqInitRes = terrno;
  }
}

L
Liu Jicong 已提交
979
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
980 981 982 983
  taosThreadOnce(&tmqInit, tmqMgmtInit);
  if (tmqInitRes != 0) {
    terrno = tmqInitRes;
    return NULL;
L
Liu Jicong 已提交
984 985
  }

L
Liu Jicong 已提交
986 987
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
L
Liu Jicong 已提交
988
    terrno = TSDB_CODE_OUT_OF_MEMORY;
989
    tscError("failed to create consumer, consumer group %s, code:%s", conf->groupId, terrstr());
L
Liu Jicong 已提交
990 991
    return NULL;
  }
L
Liu Jicong 已提交
992

L
Liu Jicong 已提交
993 994 995
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

L
Liu Jicong 已提交
996 997 998
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->delayedTask = taosOpenQueue();
H
Haojun Liao 已提交
999
  pTmq->qall = taosAllocateQall();
L
Liu Jicong 已提交
1000

H
Haojun Liao 已提交
1001
  taosThreadMutexInit(&pTmq->lock, NULL);
X
Xiaoyu Wang 已提交
1002 1003
  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL ||
      conf->groupId[0] == 0) {
L
Liu Jicong 已提交
1004
    terrno = TSDB_CODE_OUT_OF_MEMORY;
1005
    tscError("consumer:0x%" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
S
Shengliang Guan 已提交
1006
             pTmq->groupId);
L
Liu Jicong 已提交
1007 1008
    goto FAIL;
  }
L
Liu Jicong 已提交
1009

L
Liu Jicong 已提交
1010 1011
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
1012 1013
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
1014 1015
  /*pTmq->epStatus = 0;*/
  /*pTmq->epSkipCnt = 0;*/
L
Liu Jicong 已提交
1016

L
Liu Jicong 已提交
1017 1018 1019
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
1020
  pTmq->withTbName = conf->withTbName;
L
Liu Jicong 已提交
1021
  pTmq->useSnapshot = conf->snapEnable;
L
Liu Jicong 已提交
1022
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
1023
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
1024 1025
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
1026 1027
  pTmq->resetOffsetCfg = conf->resetOffset;

1028 1029
  pTmq->hbBgEnable = conf->hbBgEnable;

L
Liu Jicong 已提交
1030
  // assign consumerId
L
Liu Jicong 已提交
1031
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
1032

L
Liu Jicong 已提交
1033 1034
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
1035
    tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
S
Shengliang Guan 已提交
1036
             pTmq->groupId);
L
Liu Jicong 已提交
1037 1038
    goto FAIL;
  }
L
Liu Jicong 已提交
1039

L
Liu Jicong 已提交
1040 1041 1042
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
1043
    tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
S
Shengliang Guan 已提交
1044
             pTmq->groupId);
L
Liu Jicong 已提交
1045 1046 1047
    tsem_destroy(&pTmq->rspSem);
    goto FAIL;
  }
L
Liu Jicong 已提交
1048

1049 1050 1051 1052 1053 1054
  pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
  if (pTmq->refId < 0) {
    tmqFreeImpl(pTmq);
    return NULL;
  }

1055
  if (pTmq->hbBgEnable) {
L
Liu Jicong 已提交
1056 1057
    int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
    *pRefId = pTmq->refId;
1058
    pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
1059 1060
  }

H
Haojun Liao 已提交
1061
  tscInfo("consumer:0x%" PRIx64 " is setup, groupId:%s", pTmq->consumerId, pTmq->groupId);
1062
  return pTmq;
L
Liu Jicong 已提交
1063 1064 1065 1066 1067 1068 1069

FAIL:
  if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
  if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
  if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
  if (pTmq->qall) taosFreeQall(pTmq->qall);
  taosMemoryFree(pTmq);
1070

L
Liu Jicong 已提交
1071
  return NULL;
1072 1073
}

L
Liu Jicong 已提交
1074
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
L
Liu Jicong 已提交
1075 1076 1077
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
L
Liu Jicong 已提交
1078
  SMsgSendInfo*   sendInfo = NULL;
L
Liu Jicong 已提交
1079
  SCMSubscribeReq req = {0};
1080
  int32_t         code = 0;
1081

1082
  tscDebug("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz);
L
Liu Jicong 已提交
1083

1084
  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
1085
  tstrncpy(req.clientId, tmq->clientId, 256);
L
Liu Jicong 已提交
1086
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
1087 1088
  req.topicNames = taosArrayInit(sz, sizeof(void*));

1089 1090 1091 1092
  if (req.topicNames == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
1093

L
Liu Jicong 已提交
1094 1095
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
1096 1097

    SName name = {0};
L
Liu Jicong 已提交
1098 1099 1100 1101
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
1102 1103
    }

1104
    tNameExtractFullName(&name, topicFName);
X
Xiaoyu Wang 已提交
1105
    tscDebug("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName);
L
Liu Jicong 已提交
1106 1107

    taosArrayPush(req.topicNames, &topicFName);
1108 1109
  }

L
Liu Jicong 已提交
1110
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
1111

L
Liu Jicong 已提交
1112
  buf = taosMemoryMalloc(tlen);
1113 1114 1115 1116
  if (buf == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
L
Liu Jicong 已提交
1117

1118 1119 1120
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

L
Liu Jicong 已提交
1121
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
1122 1123 1124 1125
  if (sendInfo == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
1126

X
Xiaoyu Wang 已提交
1127
  SMqSubscribeCbParam param = {
L
Liu Jicong 已提交
1128
      .rspErr = 0,
1129 1130
      .refId = tmq->refId,
      .epoch = tmq->epoch,
X
Xiaoyu Wang 已提交
1131
  };
L
Liu Jicong 已提交
1132

1133 1134 1135
  if (tsem_init(&param.rspSem, 0, 0) != 0) {
    goto FAIL;
  }
L
Liu Jicong 已提交
1136 1137

  sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1138 1139 1140 1141
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
1142

L
Liu Jicong 已提交
1143 1144
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1145 1146
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
1147
  sendInfo->msgType = TDMT_MND_TMQ_SUBSCRIBE;
L
Liu Jicong 已提交
1148

1149 1150 1151 1152 1153
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
1154 1155
  // avoid double free if msg is sent
  buf = NULL;
L
Liu Jicong 已提交
1156
  sendInfo = NULL;
L
Liu Jicong 已提交
1157

L
Liu Jicong 已提交
1158 1159
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
1160

1161 1162 1163 1164
  if (param.rspErr != 0) {
    code = param.rspErr;
    goto FAIL;
  }
L
Liu Jicong 已提交
1165

L
Liu Jicong 已提交
1166
  int32_t retryCnt = 0;
L
Liu Jicong 已提交
1167
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
wmmhello's avatar
wmmhello 已提交
1168
    if (retryCnt++ > 40) {
L
Liu Jicong 已提交
1169 1170
      goto FAIL;
    }
1171

X
Xiaoyu Wang 已提交
1172
    tscDebug("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
L
Liu Jicong 已提交
1173 1174
    taosMsleep(500);
  }
1175

1176 1177
  // init ep timer
  if (tmq->epTimer == NULL) {
1178 1179 1180
    int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId1 = tmq->refId;
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, pRefId1, tmqMgmt.timer);
1181
  }
L
Liu Jicong 已提交
1182 1183

  // init auto commit timer
1184
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
1185 1186 1187
    int64_t* pRefId2 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId2 = tmq->refId;
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId2, tmqMgmt.timer);
L
Liu Jicong 已提交
1188 1189
  }

L
Liu Jicong 已提交
1190
FAIL:
L
Liu Jicong 已提交
1191
  taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
1192
  taosMemoryFree(buf);
L
Liu Jicong 已提交
1193
  taosMemoryFree(sendInfo);
L
Liu Jicong 已提交
1194

L
Liu Jicong 已提交
1195
  return code;
1196 1197
}

L
Liu Jicong 已提交
1198
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
1199
  conf->commitCb = cb;
L
Liu Jicong 已提交
1200
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
1201
}
1202

D
dapan1121 已提交
1203
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1204 1205
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
1206
  SMqClientTopic* pTopic = pParam->pTopic;
1207 1208 1209 1210 1211 1212

  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
  if (tmq == NULL) {
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1213
    taosMemoryFree(pMsg->pEpSet);
1214 1215 1216 1217
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

H
Haojun Liao 已提交
1218 1219 1220 1221
  int32_t  epoch = pParam->epoch;
  int32_t  vgId = pParam->vgId;
  uint64_t requestId = pParam->requestId;

L
Liu Jicong 已提交
1222
  taosMemoryFree(pParam);
H
Haojun Liao 已提交
1223

L
Liu Jicong 已提交
1224
  if (code != 0) {
H
Haojun Liao 已提交
1225 1226 1227
    tscWarn("consumer:0x%"PRIx64" msg from vgId:%d discarded, epoch %d, since %s, reqId:0x%"PRIx64, tmq->consumerId, vgId,
        epoch, tstrerror(code), requestId);

L
Liu Jicong 已提交
1228
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1229 1230
    if (pMsg->pEpSet) taosMemoryFree(pMsg->pEpSet);

H
Haojun Liao 已提交
1231
    // in case of consumer mismatch, wait for 500ms and retry
L
Liu Jicong 已提交
1232
    if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
1233
      taosMsleep(500);
L
Liu Jicong 已提交
1234
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
1235
      tscDebug("consumer:0x%" PRIx64" wait for the re-balance, wait for 500ms and set status to be RECOVER", tmq->consumerId);
H
Haojun Liao 已提交
1236
    } else if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
S
Shengliang Guan 已提交
1237
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1238
      if (pRspWrapper == NULL) {
H
Haojun Liao 已提交
1239 1240
        tscWarn("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d since out of memory, reqId:0x%" PRIx64,
                tmq->consumerId, vgId, epoch, requestId);
L
Liu Jicong 已提交
1241 1242
        goto CREATE_MSG_FAIL;
      }
H
Haojun Liao 已提交
1243

L
Liu Jicong 已提交
1244 1245 1246 1247 1248 1249
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      /*pRspWrapper->vgHandle = pVg;*/
      /*pRspWrapper->topicHandle = pTopic;*/
      taosWriteQitem(tmq->mqueue, pRspWrapper);
      tsem_post(&tmq->rspSem);
    }
H
Haojun Liao 已提交
1250

L
fix txn  
Liu Jicong 已提交
1251
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1252 1253
  }

X
Xiaoyu Wang 已提交
1254 1255 1256
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
1257
    // do not write into queue since updating epoch reset
H
Haojun Liao 已提交
1258 1259 1260
    tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d, reqId:0x%"PRIx64,
            tmq->consumerId, vgId, msgEpoch, tmqEpoch, requestId);

1261
    tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1262
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1263
    taosMemoryFree(pMsg->pEpSet);
X
Xiaoyu Wang 已提交
1264 1265 1266 1267
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
H
Haojun Liao 已提交
1268 1269
    tscWarn("consumer:0x%"PRIx64" mismatch rsp from vgId:%d, epoch %d, current epoch %d, reqId:0x%"PRIx64, tmq->consumerId, vgId,
        msgEpoch, tmqEpoch, requestId);
X
Xiaoyu Wang 已提交
1270 1271
  }

L
Liu Jicong 已提交
1272 1273 1274
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

S
Shengliang Guan 已提交
1275
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1276
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1277
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1278
    taosMemoryFree(pMsg->pEpSet);
H
Haojun Liao 已提交
1279
    tscWarn("consumer:0x%"PRIx64" msg discard from vgId:%d, epoch %d since out of memory", tmq->consumerId, vgId, epoch);
L
fix txn  
Liu Jicong 已提交
1280
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1281
  }
L
Liu Jicong 已提交
1282

L
Liu Jicong 已提交
1283
  pRspWrapper->tmqRspType = rspType;
L
Liu Jicong 已提交
1284 1285
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
1286

L
Liu Jicong 已提交
1287
  if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1288 1289 1290
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
wmmhello's avatar
wmmhello 已提交
1291
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1292
    memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1293

H
Haojun Liao 已提交
1294 1295
    tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req:%" PRId64 ", rsp:%" PRId64
             " type %d, reqId:0x%" PRIx64,
H
Haojun Liao 已提交
1296
             tmq->consumerId, vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version, rspType, requestId);
H
Haojun Liao 已提交
1297

L
Liu Jicong 已提交
1298
  } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
1299 1300 1301 1302
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqMetaRsp(&decoder, &pRspWrapper->metaRsp);
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1303
    memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1304 1305 1306 1307 1308 1309
  } else if (rspType == TMQ_MSG_TYPE__TAOSX_RSP) {
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp);
    tDecoderClear(&decoder);
    memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1310
  }
L
Liu Jicong 已提交
1311

L
Liu Jicong 已提交
1312
  taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1313
  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
1314

H
Haojun Liao 已提交
1315 1316
  tscDebug("consumer:0x%" PRIx64 ", put poll res into mqueue, total in queue:%d, reqId:0x%" PRIx64, tmq->consumerId,
           tmq->mqueue->numOfItems, requestId);
L
Liu Jicong 已提交
1317

L
Liu Jicong 已提交
1318
  taosWriteQitem(tmq->mqueue, pRspWrapper);
1319
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1320

L
Liu Jicong 已提交
1321
  return 0;
H
Haojun Liao 已提交
1322

L
fix txn  
Liu Jicong 已提交
1323
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1324
  if (epoch == tmq->epoch) {
L
Liu Jicong 已提交
1325 1326
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
H
Haojun Liao 已提交
1327

1328
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1329
  return -1;
1330 1331
}

H
Haojun Liao 已提交
1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348
static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap,
                                   tmq_t* tmq) {
  pTopic->schema = pTopicEp->schema;
  pTopicEp->schema.nCols = 0;
  pTopicEp->schema.pSchema = NULL;

  char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
  int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);

  tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN);

  tscDebug("consumer:0x%" PRIx64 ", update topic:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet);
  pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));

  for (int32_t j = 0; j < vgNumGet; j++) {
    SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
H
Haojun Liao 已提交
1349 1350

    makeTopicVgroupKey(vgKey, pTopic->topicName, pVgEp->vgId);
H
Haojun Liao 已提交
1351
    STqOffsetVal* pOffset = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
H
Haojun Liao 已提交
1352

H
Haojun Liao 已提交
1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380
    STqOffsetVal  offsetNew = {.type = tmq->resetOffsetCfg};
    if (pOffset != NULL) {
      offsetNew = *pOffset;
    }

    SMqClientVg clientVg = {
        .pollCnt = 0,
        .currentOffset = offsetNew,
        .vgId = pVgEp->vgId,
        .epSet = pVgEp->epSet,
        .vgStatus = TMQ_VG_STATUS__IDLE,
        .vgSkipCnt = 0,
    };

    taosArrayPush(pTopic->vgs, &clientVg);
  }
}

static void freeClientVgInfo(void* param) {
  SMqClientTopic* pTopic = param;
  if (pTopic->schema.nCols) {
    taosMemoryFreeClear(pTopic->schema.pSchema);
  }

  taosArrayDestroy(pTopic->vgs);
}

static bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
1381 1382
  bool set = false;

1383
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
1384
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
1385

X
Xiaoyu Wang 已提交
1386 1387
  char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
  tscDebug("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
1388
           tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur);
1389 1390 1391 1392 1393 1394

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

H
Haojun Liao 已提交
1395 1396
  SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pVgOffsetHashMap == NULL) {
1397 1398 1399
    taosArrayDestroy(newTopics);
    return false;
  }
1400

H
Haojun Liao 已提交
1401
  // todo extract method
1402 1403 1404 1405 1406
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
1407
      tscDebug("consumer:0x%" PRIx64 ", new vg num: %d", tmq->consumerId, vgNumCur);
1408 1409
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
H
Haojun Liao 已提交
1410 1411
        makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId);

L
Liu Jicong 已提交
1412
        char buf[80];
L
Liu Jicong 已提交
1413
        tFormatOffset(buf, 80, &pVgCur->currentOffset);
H
Haojun Liao 已提交
1414
        tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch,
L
Liu Jicong 已提交
1415
                 pVgCur->vgId, vgKey, buf);
H
Haojun Liao 已提交
1416
        taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal));
1417 1418 1419 1420 1421 1422 1423
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
H
Haojun Liao 已提交
1424
    initClientTopicFromRsp(&topic, pTopicEp, pVgOffsetHashMap, tmq);
1425 1426
    taosArrayPush(newTopics, &topic);
  }
1427

H
Haojun Liao 已提交
1428 1429 1430
  taosHashCleanup(pVgOffsetHashMap);

  taosThreadMutexLock(&tmq->lock);
1431
  // destroy current buffered existed topics info
1432
  if (tmq->clientTopics) {
H
Haojun Liao 已提交
1433
    taosArrayDestroyEx(tmq->clientTopics, freeClientVgInfo);
X
Xiaoyu Wang 已提交
1434
  }
1435

H
Haojun Liao 已提交
1436 1437
  tmq->clientTopics = newTopics;
  taosThreadMutexUnlock(&tmq->lock);
1438

H
Haojun Liao 已提交
1439 1440
  int8_t flag = (topicNumGet == 0)? TMQ_CONSUMER_STATUS__NO_TOPIC:TMQ_CONSUMER_STATUS__READY;
  atomic_store_8(&tmq->status, flag);
X
Xiaoyu Wang 已提交
1441
  atomic_store_32(&tmq->epoch, epoch);
H
Haojun Liao 已提交
1442

1443
  tscDebug("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId);
X
Xiaoyu Wang 已提交
1444 1445 1446
  return set;
}

H
Haojun Liao 已提交
1447
static int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
1448
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1449
  int8_t           async = pParam->async;
1450 1451 1452 1453 1454 1455 1456 1457 1458
  tmq_t*           tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);

  if (tmq == NULL) {
    if (!async) {
      tsem_destroy(&pParam->rspSem);
    } else {
      taosMemoryFree(pParam);
    }
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1459
    taosMemoryFree(pMsg->pEpSet);
1460 1461 1462 1463
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

L
Liu Jicong 已提交
1464
  pParam->code = code;
H
Haojun Liao 已提交
1465
  if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
1466 1467
    tscError("consumer:0x%" PRIx64 ", get topic endpoint error, async:%d, code:%s", tmq->consumerId, pParam->async,
             tstrerror(code));
L
Liu Jicong 已提交
1468
    goto END;
1469
  }
L
Liu Jicong 已提交
1470

L
Liu Jicong 已提交
1471
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1472
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1473
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1474 1475 1476
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
  if (head->epoch <= epoch) {
1477 1478
    tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep",
             tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1479
    goto END;
1480
  }
L
Liu Jicong 已提交
1481

1482 1483 1484
  tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId,
           head->epoch, epoch);

L
Liu Jicong 已提交
1485
  if (!async) {
L
Liu Jicong 已提交
1486 1487
    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
L
Liu Jicong 已提交
1488
    tmqUpdateEp(tmq, head->epoch, &rsp);
L
Liu Jicong 已提交
1489
    tDeleteSMqAskEpRsp(&rsp);
X
Xiaoyu Wang 已提交
1490
  } else {
S
Shengliang Guan 已提交
1491
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1492
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1493
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1494 1495
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1496
    }
1497

L
Liu Jicong 已提交
1498 1499 1500
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1501
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1502

L
Liu Jicong 已提交
1503
    taosWriteQitem(tmq->mqueue, pWrapper);
1504
    tsem_post(&tmq->rspSem);
1505
  }
L
Liu Jicong 已提交
1506 1507

END:
L
Liu Jicong 已提交
1508
  if (!async) {
L
Liu Jicong 已提交
1509
    tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
1510 1511
  } else {
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
1512
  }
dengyihao's avatar
dengyihao 已提交
1513 1514

  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
1515
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1516
  return code;
1517 1518
}

L
Liu Jicong 已提交
1519
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1520 1521 1522 1523
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1524

1525
  pReq->withTbName = tmq->withTbName;
L
Liu Jicong 已提交
1526
  pReq->consumerId = tmq->consumerId;
1527
  pReq->timeout = timeout;
X
Xiaoyu Wang 已提交
1528
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1529
  /*pReq->currentOffset = reqOffset;*/
L
Liu Jicong 已提交
1530
  pReq->reqOffset = pVg->currentOffset;
D
dapan1121 已提交
1531
  pReq->head.vgId = pVg->vgId;
1532 1533
  pReq->useSnapshot = tmq->useSnapshot;
  pReq->reqId = generateRequestId();
1534 1535
}

L
Liu Jicong 已提交
1536 1537
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
L
Liu Jicong 已提交
1538
  pRspObj->resType = RES_TYPE__TMQ_META;
L
Liu Jicong 已提交
1539 1540 1541 1542 1543 1544 1545 1546
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

L
Liu Jicong 已提交
1547 1548 1549
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1550 1551
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1552
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1553
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1554
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
L
Liu Jicong 已提交
1555

L
Liu Jicong 已提交
1556 1557
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
L
Liu Jicong 已提交
1558
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1559 1560
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1561

L
Liu Jicong 已提交
1562
  return pRspObj;
X
Xiaoyu Wang 已提交
1563 1564
}

L
Liu Jicong 已提交
1565 1566
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
1567
  pRspObj->resType = RES_TYPE__TMQ_METADATA;
L
Liu Jicong 已提交
1568 1569 1570 1571
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;
  pRspObj->resIter = -1;
1572
  memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp));
L
Liu Jicong 已提交
1573 1574 1575

  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
1576
  if (!pWrapper->taosxRsp.withSchema) {
L
Liu Jicong 已提交
1577 1578 1579 1580 1581 1582
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }

  return pRspObj;
}

1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615
static int32_t handleErrorBeforePoll(SMqClientVg* pVg, tmq_t* pTmq) {
  atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  tsem_post(&pTmq->rspSem);
  return -1;
}

static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg, int64_t timeout) {
  SMqPollReq req = {0};
  tmqBuildConsumeReqImpl(&req, pTmq, timeout, pTopic, pVg);

  int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
  if (msgSize < 0) {
    return handleErrorBeforePoll(pVg, pTmq);
  }

  char* msg = taosMemoryCalloc(1, msgSize);
  if (NULL == msg) {
    return handleErrorBeforePoll(pVg, pTmq);
  }

  if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
    taosMemoryFree(msg);
    return handleErrorBeforePoll(pVg, pTmq);
  }

  SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
  if (pParam == NULL) {
    taosMemoryFree(msg);
    return handleErrorBeforePoll(pVg, pTmq);
  }

  pParam->refId = pTmq->refId;
  pParam->epoch = pTmq->epoch;
X
Xiaoyu Wang 已提交
1616
  pParam->pVg = pVg;  // pVg may be released,fix it
1617 1618
  pParam->pTopic = pTopic;
  pParam->vgId = pVg->vgId;
H
Haojun Liao 已提交
1619
  pParam->requestId = req.reqId;
1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pParam);
    taosMemoryFree(msg);
    return handleErrorBeforePoll(pVg, pTmq);
  }

  sendInfo->msgInfo = (SDataBuf){
      .pData = msg,
      .len = msgSize,
      .handle = NULL,
  };

  sendInfo->requestId = req.reqId;
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqPollCb;
  sendInfo->msgType = TDMT_VND_TMQ_CONSUME;

  int64_t transporterId = 0;
  char    offsetFormatBuf[80];
  tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->currentOffset);

H
Haojun Liao 已提交
1644
  tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64,
1645 1646 1647 1648 1649 1650 1651 1652 1653
           pTmq->consumerId, pTopic->topicName, pVg->vgId, pTmq->epoch, offsetFormatBuf, req.reqId);
  asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);

  pVg->pollCnt++;
  pTmq->pollCnt++;

  return TSDB_CODE_SUCCESS;
}

1654
// broadcast the poll request to all related vnodes
H
Haojun Liao 已提交
1655
static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
1656
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
X
Xiaoyu Wang 已提交
1657
  tscDebug("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics);
1658 1659

  for (int i = 0; i < numOfTopics; i++) {
X
Xiaoyu Wang 已提交
1660
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
X
Xiaoyu Wang 已提交
1661
    int32_t         numOfVg = taosArrayGetSize(pTopic->vgs);
1662 1663

    for (int j = 0; j < numOfVg; j++) {
X
Xiaoyu Wang 已提交
1664 1665
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
1666
      if (vgStatus == TMQ_VG_STATUS__WAIT) {
L
Liu Jicong 已提交
1667
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
H
Haojun Liao 已提交
1668
        tscTrace("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
X
Xiaoyu Wang 已提交
1669
                 pVg->vgId, vgSkipCnt);
X
Xiaoyu Wang 已提交
1670
        continue;
L
temp  
Liu Jicong 已提交
1671 1672 1673 1674
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
1675
        tscDebug("consumer:0x%" PRIx64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId);
L
temp  
Liu Jicong 已提交
1676 1677
        }
#endif
X
Xiaoyu Wang 已提交
1678
      }
1679

L
Liu Jicong 已提交
1680
      atomic_store_32(&pVg->vgSkipCnt, 0);
1681 1682 1683
      int32_t code = doTmqPollImpl(tmq, pTopic, pVg, timeout);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
D
dapan1121 已提交
1684
      }
X
Xiaoyu Wang 已提交
1685 1686
    }
  }
1687

X
Xiaoyu Wang 已提交
1688 1689 1690
  return 0;
}

H
Haojun Liao 已提交
1691
static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
L
Liu Jicong 已提交
1692
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1693
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1694 1695
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1696
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
L
Liu Jicong 已提交
1697
      tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1698
      /*tmqClearUnhandleMsg(tmq);*/
L
Liu Jicong 已提交
1699
      tDeleteSMqAskEpRsp(rspMsg);
X
Xiaoyu Wang 已提交
1700 1701
      *pReset = true;
    } else {
L
Liu Jicong 已提交
1702
      tmqFreeRspWrapper(rspWrapper);
X
Xiaoyu Wang 已提交
1703 1704 1705 1706 1707 1708 1709 1710
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

H
Haojun Liao 已提交
1711
static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
H
Haojun Liao 已提交
1712
  tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems);
1713

X
Xiaoyu Wang 已提交
1714
  while (1) {
L
Liu Jicong 已提交
1715 1716
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
1717

L
Liu Jicong 已提交
1718
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1719
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1720
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
L
Liu Jicong 已提交
1721 1722 1723 1724

      if (rspWrapper == NULL) {
        return NULL;
      }
X
Xiaoyu Wang 已提交
1725 1726
    }

L
Liu Jicong 已提交
1727 1728 1729
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(rspWrapper);
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
H
Haojun Liao 已提交
1730
      tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(terrno));
L
Liu Jicong 已提交
1731 1732
      return NULL;
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1733
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
H
Haojun Liao 已提交
1734

L
Liu Jicong 已提交
1735
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
1736
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1737
      if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1738
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
L
Liu Jicong 已提交
1739
        pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;
X
Xiaoyu Wang 已提交
1740
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
H
Haojun Liao 已提交
1741

L
Liu Jicong 已提交
1742
        if (pollRspWrapper->dataRsp.blockNum == 0) {
H
Haojun Liao 已提交
1743
          tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d", tmq->consumerId, pVg->vgId);
L
Liu Jicong 已提交
1744 1745
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1746 1747
          continue;
        }
H
Haojun Liao 已提交
1748

L
Liu Jicong 已提交
1749
        // build rsp
H
Haojun Liao 已提交
1750 1751
        char buf[80];
        tFormatOffset(buf, 80, &pVg->currentOffset);
L
Liu Jicong 已提交
1752
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
H
Haojun Liao 已提交
1753 1754 1755
        tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d", tmq->consumerId,
                     pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum);

L
Liu Jicong 已提交
1756
        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1757
        return pRsp;
X
Xiaoyu Wang 已提交
1758
      } else {
X
Xiaoyu Wang 已提交
1759
        tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
1760
                 tmq->consumerId, pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1761
        tmqFreeRspWrapper(rspWrapper);
L
Liu Jicong 已提交
1762 1763 1764 1765 1766
        taosFreeQitem(pollRspWrapper);
      }
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
1767 1768 1769

      tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId);

L
Liu Jicong 已提交
1770
      if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1771
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1772
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1773
         * rspMsg->msg.rspOffset);*/
wmmhello's avatar
wmmhello 已提交
1774
        pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset;
L
Liu Jicong 已提交
1775 1776
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // build rsp
L
Liu Jicong 已提交
1777
        SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1778 1779 1780
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
X
Xiaoyu Wang 已提交
1781
        tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
1782
                 tmq->consumerId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1783
        tmqFreeRspWrapper(rspWrapper);
L
Liu Jicong 已提交
1784
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1785
      }
L
Liu Jicong 已提交
1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
      if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
         * rspMsg->msg.rspOffset);*/
        pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset;
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        if (pollRspWrapper->taosxRsp.blockNum == 0) {
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
          continue;
        }
wmmhello's avatar
wmmhello 已提交
1801

L
Liu Jicong 已提交
1802
        // build rsp
wmmhello's avatar
wmmhello 已提交
1803
        void* pRsp = NULL;
L
Liu Jicong 已提交
1804
        if (pollRspWrapper->taosxRsp.createTableNum == 0) {
wmmhello's avatar
wmmhello 已提交
1805
          pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1806
        } else {
wmmhello's avatar
wmmhello 已提交
1807 1808
          pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper);
        }
L
Liu Jicong 已提交
1809 1810 1811
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
X
Xiaoyu Wang 已提交
1812
        tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
1813
                 tmq->consumerId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1814
        tmqFreeRspWrapper(rspWrapper);
L
Liu Jicong 已提交
1815 1816
        taosFreeQitem(pollRspWrapper);
      }
X
Xiaoyu Wang 已提交
1817
    } else {
L
fix  
Liu Jicong 已提交
1818
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1819
      bool reset = false;
L
Liu Jicong 已提交
1820 1821
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1822
      if (pollIfReset && reset) {
1823
        tscDebug("consumer:0x%" PRIx64 ", reset and repoll", tmq->consumerId);
1824
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
1825 1826 1827
      }
    }
  }
H
Haojun Liao 已提交
1828 1829

  tscDebug("consumer:0x%" PRIx64 " handle the rsp completed", tmq->consumerId);
X
Xiaoyu Wang 已提交
1830 1831
}

1832
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1833 1834
  void*   rspObj;
  int64_t startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1835

1836
  tscDebug("consumer:0x%" PRIx64 " start to poll at %" PRId64, tmq->consumerId, startTime);
L
Liu Jicong 已提交
1837

1838 1839 1840
#if 0
  tmqHandleAllDelayedTask(tmq);
  tmqPollImpl(tmq, timeout);
1841
  rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1842 1843
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1844
  }
1845
#endif
X
Xiaoyu Wang 已提交
1846

1847
  // in no topic status, delayed task also need to be processed
L
Liu Jicong 已提交
1848
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1849
    tscDebug("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId);
1850
    taosMsleep(500);  //     sleep for a while
1851 1852 1853
    return NULL;
  }

L
Liu Jicong 已提交
1854 1855 1856
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
    int32_t retryCnt = 0;
    while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
H
Haojun Liao 已提交
1857
      if (retryCnt++ > 40) {
L
Liu Jicong 已提交
1858 1859
        return NULL;
      }
1860

H
Haojun Liao 已提交
1861
      tscDebug("consumer:0x%" PRIx64 " not ready, retry:%d/40 in 500ms", tmq->consumerId, retryCnt);
L
Liu Jicong 已提交
1862 1863 1864 1865
      taosMsleep(500);
    }
  }

X
Xiaoyu Wang 已提交
1866
  while (1) {
L
Liu Jicong 已提交
1867
    tmqHandleAllDelayedTask(tmq);
1868

L
Liu Jicong 已提交
1869
    if (tmqPollImpl(tmq, timeout) < 0) {
1870
      tscDebug("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId);
L
Liu Jicong 已提交
1871 1872
      /*return NULL;*/
    }
L
Liu Jicong 已提交
1873

1874
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1875
    if (rspObj) {
1876
      tscDebug("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj);
L
Liu Jicong 已提交
1877
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
1878
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
1879
      tscDebug("consumer:0x%" PRIx64 " return null since no committed offset", tmq->consumerId);
L
Liu Jicong 已提交
1880
      return NULL;
X
Xiaoyu Wang 已提交
1881
    }
1882

1883
    if (timeout != -1) {
L
Liu Jicong 已提交
1884
      int64_t currentTime = taosGetTimestampMs();
1885 1886 1887
      int64_t elapsedTime = currentTime - startTime;
      if (elapsedTime > timeout) {
        tscDebug("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
L
Liu Jicong 已提交
1888
                 tmq->consumerId, tmq->epoch, startTime, currentTime);
X
Xiaoyu Wang 已提交
1889 1890
        return NULL;
      }
1891
      /*tscInfo("consumer:0x%" PRIx64 ", (epoch %d) wait, start time %" PRId64 ", current time %" PRId64*/
L
Liu Jicong 已提交
1892
      /*", left time %" PRId64,*/
1893 1894
      /*tmq->consumerId, tmq->epoch, startTime, currentTime, (timeout - elapsedTime));*/
      tsem_timewait(&tmq->rspSem, (timeout - elapsedTime));
L
Liu Jicong 已提交
1895 1896
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
L
Liu Jicong 已提交
1897
      tsem_timewait(&tmq->rspSem, 1000);
X
Xiaoyu Wang 已提交
1898 1899 1900 1901
    }
  }
}

L
Liu Jicong 已提交
1902
int32_t tmq_consumer_close(tmq_t* tmq) {
1903
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
L
Liu Jicong 已提交
1904 1905
    int32_t rsp = tmq_commit_sync(tmq, NULL);
    if (rsp != 0) {
L
Liu Jicong 已提交
1906
      return rsp;
1907 1908
    }

L
Liu Jicong 已提交
1909
    int32_t     retryCnt = 0;
1910
    tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
1911 1912 1913 1914 1915 1916 1917 1918 1919 1920
    while (1) {
      rsp = tmq_subscribe(tmq, lst);
      if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
        break;
      } else {
        retryCnt++;
        taosMsleep(500);
      }
    }

1921
    tmq_list_destroy(lst);
L
Liu Jicong 已提交
1922
  }
H
Haojun Liao 已提交
1923

1924
  taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
L
Liu Jicong 已提交
1925
  return 0;
1926
}
L
Liu Jicong 已提交
1927

L
Liu Jicong 已提交
1928 1929
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
1930
    return "success";
L
Liu Jicong 已提交
1931
  } else if (err == -1) {
L
Liu Jicong 已提交
1932 1933 1934
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
1935 1936
  }
}
L
Liu Jicong 已提交
1937

L
Liu Jicong 已提交
1938 1939 1940 1941 1942
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
    return TMQ_RES_TABLE_META;
1943 1944
  } else if (TD_RES_TMQ_METADATA(res)) {
    return TMQ_RES_METADATA;
L
Liu Jicong 已提交
1945 1946 1947 1948 1949
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
1950
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
1951 1952
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
1953
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1954 1955 1956
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
1957 1958 1959
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1960 1961 1962 1963 1964
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1965 1966 1967 1968
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1969 1970 1971
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
1972 1973 1974
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1975 1976 1977 1978 1979
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1980 1981 1982 1983
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
1984 1985 1986
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
1987
  } else if (TD_RES_TMQ_METADATA(res)) {
1988 1989
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
1990 1991 1992 1993
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
1994 1995 1996 1997 1998 1999 2000 2001

const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
2002
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
2003 2004
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
L
Liu Jicong 已提交
2005 2006 2007
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
2008
    }
L
Liu Jicong 已提交
2009 2010
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
  }
L
Liu Jicong 已提交
2011 2012
  return NULL;
}
2013

L
Liu Jicong 已提交
2014
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
2015
  tmqCommitInner(tmq, msg, 0, 1, cb, param);
L
Liu Jicong 已提交
2016 2017
}

2018
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) {
L
Liu Jicong 已提交
2019
  return tmqCommitInner(tmq, msg, 0, 0, NULL, NULL);
2020
}
2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140

int32_t tmqAskEp(tmq_t* tmq, bool async) {
  int32_t code = TSDB_CODE_SUCCESS;
#if 0
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
  if (epStatus == 1) {
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
    tscTrace("consumer:0x%" PRIx64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
    if (epSkipCnt < 5000) return 0;
  }
  atomic_store_32(&tmq->epSkipCnt, 0);
#endif

  SMqAskEpReq req = {0};
  req.consumerId = tmq->consumerId;
  req.epoch = tmq->epoch;
  strcpy(req.cgroup, tmq->groupId);

  int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
  if (tlen < 0) {
    tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", tmq->consumerId);
    return -1;
  }

  void* pReq = taosMemoryCalloc(1, tlen);
  if (pReq == NULL) {
    tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", tmq->consumerId, tlen);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
    tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", tmq->consumerId, tlen);
    taosMemoryFree(pReq);
    return -1;
  }

  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
  if (pParam == NULL) {
    tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", tmq->consumerId);
    taosMemoryFree(pReq);
    return -1;
  }

  pParam->refId = tmq->refId;
  pParam->epoch = tmq->epoch;
  pParam->async = async;
  tsem_init(&pParam->rspSem, 0, 0);

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
    taosMemoryFree(pReq);
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
      .pData = pReq,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
  sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
  tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, async:%d, reqId:0x%" PRIx64, tmq->consumerId, async,
           sendInfo->requestId);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

  if (!async) {
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }

  return code;
}

int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParamSet->refId);
  if (tmq == NULL) {
    if (!pParamSet->async) {
      tsem_destroy(&pParamSet->rspSem);
    }
    taosMemoryFree(pParamSet);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  // if no more waiting rsp
  if (pParamSet->async) {
    // call async cb func
    if (pParamSet->automatic && tmq->commitCb) {
      tmq->commitCb(tmq, pParamSet->rspErr, tmq->commitCbUserParam);
    } else if (!pParamSet->automatic && pParamSet->userCb) {
      // sem post
      pParamSet->userCb(tmq, pParamSet->rspErr, pParamSet->userParam);
    }
    taosMemoryFree(pParamSet);
  } else {
    tsem_post(&pParamSet->rspSem);
  }

#if 0
  taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
#endif
  return 0;
}