clientTmq.c 67.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "cJSON.h"
17 18 19
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22
#include "tdef.h"
#include "tglobal.h"
X
Xiaoyu Wang 已提交
23
#include "tqueue.h"
24
#include "tref.h"
L
Liu Jicong 已提交
25 26
#include "ttimer.h"

X
Xiaoyu Wang 已提交
27
struct SMqMgmt {
28 29 30
  int8_t  inited;
  tmr_h   timer;
  int32_t rsetId;
31
};
L
Liu Jicong 已提交
32

X
Xiaoyu Wang 已提交
33 34
static TdThreadOnce   tmqInit = PTHREAD_ONCE_INIT;  // initialize only once
volatile int32_t      tmqInitRes = 0;               // initialize rsp code
35
static struct SMqMgmt tmqMgmt = {0};
36

L
Liu Jicong 已提交
37 38 39 40 41 42
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
43 44 45
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
46 47
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
48
struct tmq_list_t {
L
Liu Jicong 已提交
49
  SArray container;
L
Liu Jicong 已提交
50
};
L
Liu Jicong 已提交
51

L
Liu Jicong 已提交
52
struct tmq_conf_t {
53 54 55 56 57 58 59 60
  char           clientId[256];
  char           groupId[TSDB_CGROUP_LEN];
  int8_t         autoCommit;
  int8_t         resetOffset;
  int8_t         withTbName;
  int8_t         snapEnable;
  int32_t        snapBatchSize;
  bool           hbBgEnable;
61 62 63 64 65
  uint16_t       port;
  int32_t        autoCommitInterval;
  char*          ip;
  char*          user;
  char*          pass;
66
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
67
  void*          commitCbUserParam;
L
Liu Jicong 已提交
68 69 70
};

struct tmq_t {
71
  int64_t refId;
L
Liu Jicong 已提交
72
  // conf
73 74 75 76 77 78 79 80 81
  char     groupId[TSDB_CGROUP_LEN];
  char     clientId[256];
  int8_t   withTbName;
  int8_t   useSnapshot;
  int8_t   autoCommit;
  int32_t  autoCommitInterval;
  int32_t  resetOffsetCfg;
  uint64_t consumerId;
  bool     hbBgEnable;
82

L
Liu Jicong 已提交
83 84
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
85 86 87 88

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
89 90
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
91
  int32_t epSkipCnt;
L
Liu Jicong 已提交
92
#endif
L
Liu Jicong 已提交
93 94
  int64_t pollCnt;

L
Liu Jicong 已提交
95
  // timer
96 97
  tmr_h hbLiveTimer;
  tmr_h epTimer;
L
Liu Jicong 已提交
98 99 100
  tmr_h reportTimer;
  tmr_h commitTimer;

H
Haojun Liao 已提交
101 102 103 104 105 106 107
  STscObj*      pTscObj;       // connection
  SArray*       clientTopics;  // SArray<SMqClientTopic>
  STaosQueue*   mqueue;        // queue of rsp
  STaosQall*    qall;
  STaosQueue*   delayedTask;   // delayed task queue for heartbeat and auto commit
  TdThreadMutex lock;          // used to protect the operation on each topic, when updating the epsets.
  tsem_t        rspSem;
L
Liu Jicong 已提交
108 109
};

X
Xiaoyu Wang 已提交
110 111 112 113 114 115 116 117
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
118
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
119
  TMQ_CONSUMER_STATUS__RECOVER,
L
Liu Jicong 已提交
120 121
};

L
Liu Jicong 已提交
122
enum {
123
  TMQ_DELAYED_TASK__ASK_EP = 1,
L
Liu Jicong 已提交
124 125 126 127
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

L
Liu Jicong 已提交
128
typedef struct {
129 130 131
  // statistics
  int64_t pollCnt;
  // offset
L
Liu Jicong 已提交
132 133
  STqOffsetVal committedOffset;
  STqOffsetVal currentOffset;
L
Liu Jicong 已提交
134
  // connection info
135
  int32_t vgId;
X
Xiaoyu Wang 已提交
136
  int32_t vgStatus;
L
Liu Jicong 已提交
137
  int32_t vgSkipCnt;
138 139 140
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
141
typedef struct {
142 143 144
  char           topicName[TSDB_TOPIC_FNAME_LEN];
  char           db[TSDB_DB_FNAME_LEN];
  SArray*        vgs;  // SArray<SMqClientVg>
L
Liu Jicong 已提交
145
  SSchemaWrapper schema;
146 147
} SMqClientTopic;

L
Liu Jicong 已提交
148 149 150 151 152
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
153
  union {
L
Liu Jicong 已提交
154 155
    SMqDataRsp dataRsp;
    SMqMetaRsp metaRsp;
L
Liu Jicong 已提交
156
    STaosxRsp  taosxRsp;
L
Liu Jicong 已提交
157
  };
L
Liu Jicong 已提交
158 159
} SMqPollRspWrapper;

L
Liu Jicong 已提交
160
typedef struct {
161 162
  int64_t refId;
  int32_t epoch;
L
Liu Jicong 已提交
163 164
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
165
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
166

L
Liu Jicong 已提交
167
typedef struct {
168 169
  int64_t refId;
  int32_t epoch;
L
Liu Jicong 已提交
170
  int32_t code;
L
Liu Jicong 已提交
171
  int32_t async;
X
Xiaoyu Wang 已提交
172
  tsem_t  rspSem;
173 174
} SMqAskEpCbParam;

L
Liu Jicong 已提交
175
typedef struct {
176 177
  int64_t         refId;
  int32_t         epoch;
L
Liu Jicong 已提交
178
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
179
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
180
  int32_t         vgId;
L
Liu Jicong 已提交
181
  tsem_t          rspSem;
H
Haojun Liao 已提交
182
  uint64_t        requestId; // request id for debug purpose
X
Xiaoyu Wang 已提交
183
} SMqPollCbParam;
184

185
typedef struct {
186 187
  int64_t        refId;
  int32_t        epoch;
L
Liu Jicong 已提交
188 189
  int8_t         automatic;
  int8_t         async;
L
Liu Jicong 已提交
190 191
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
L
Liu Jicong 已提交
192
  int32_t        rspErr;
193
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
194 195 196 197
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
  void*  userParam;
  tsem_t rspSem;
198 199 200 201 202
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
  STqOffset*           pOffset;
H
Haojun Liao 已提交
203 204 205
  char                 topicName[TSDB_TOPIC_FNAME_LEN];
  int32_t              vgId;
  tmq_t*               pTmq;
206
} SMqCommitCbParam;
207

208
static int32_t tmqAskEp(tmq_t* tmq, bool async);
209 210
static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg);
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet);
211 212
static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet,
                               int32_t index, int32_t totalVgroups);
213
static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId);
214

215
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
216
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
217 218 219 220 221
  if (conf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return conf;
  }

222
  conf->withTbName = false;
L
Liu Jicong 已提交
223
  conf->autoCommit = true;
L
Liu Jicong 已提交
224
  conf->autoCommitInterval = 5000;
225
  conf->resetOffset = TMQ_OFFSET__RESET_EARLIEAST;
226
  conf->hbBgEnable = true;
227

228 229 230
  return conf;
}

L
Liu Jicong 已提交
231
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
232
  if (conf) {
233 234 235 236 237 238 239 240 241
    if (conf->ip) {
      taosMemoryFree(conf->ip);
    }
    if (conf->user) {
      taosMemoryFree(conf->user);
    }
    if (conf->pass) {
      taosMemoryFree(conf->pass);
    }
L
Liu Jicong 已提交
242 243
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
244 245 246
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
247
  if (strcasecmp(key, "group.id") == 0) {
L
Liu Jicong 已提交
248
    tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN);
L
Liu Jicong 已提交
249
    return TMQ_CONF_OK;
250
  }
L
Liu Jicong 已提交
251

252
  if (strcasecmp(key, "client.id") == 0) {
L
Liu Jicong 已提交
253
    tstrncpy(conf->clientId, value, 256);
L
Liu Jicong 已提交
254 255
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
256

257 258
  if (strcasecmp(key, "enable.auto.commit") == 0) {
    if (strcasecmp(value, "true") == 0) {
L
Liu Jicong 已提交
259
      conf->autoCommit = true;
L
Liu Jicong 已提交
260
      return TMQ_CONF_OK;
261
    } else if (strcasecmp(value, "false") == 0) {
L
Liu Jicong 已提交
262
      conf->autoCommit = false;
L
Liu Jicong 已提交
263 264 265 266
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
267
  }
L
Liu Jicong 已提交
268

269
  if (strcasecmp(key, "auto.commit.interval.ms") == 0) {
L
Liu Jicong 已提交
270 271 272 273
    conf->autoCommitInterval = atoi(value);
    return TMQ_CONF_OK;
  }

274 275 276
  if (strcasecmp(key, "auto.offset.reset") == 0) {
    if (strcasecmp(value, "none") == 0) {
      conf->resetOffset = TMQ_OFFSET__RESET_NONE;
L
Liu Jicong 已提交
277
      return TMQ_CONF_OK;
278 279
    } else if (strcasecmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_OFFSET__RESET_EARLIEAST;
L
Liu Jicong 已提交
280
      return TMQ_CONF_OK;
281 282
    } else if (strcasecmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
L
Liu Jicong 已提交
283 284 285 286 287
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
288

289 290
  if (strcasecmp(key, "msg.with.table.name") == 0) {
    if (strcasecmp(value, "true") == 0) {
291
      conf->withTbName = true;
L
Liu Jicong 已提交
292
      return TMQ_CONF_OK;
293
    } else if (strcasecmp(value, "false") == 0) {
294
      conf->withTbName = false;
L
Liu Jicong 已提交
295
      return TMQ_CONF_OK;
296 297 298 299 300
    } else {
      return TMQ_CONF_INVALID;
    }
  }

301 302
  if (strcasecmp(key, "experimental.snapshot.enable") == 0) {
    if (strcasecmp(value, "true") == 0) {
L
Liu Jicong 已提交
303
      conf->snapEnable = true;
304
      return TMQ_CONF_OK;
305
    } else if (strcasecmp(value, "false") == 0) {
L
Liu Jicong 已提交
306
      conf->snapEnable = false;
307 308 309 310 311 312
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

313
  if (strcasecmp(key, "experimental.snapshot.batch.size") == 0) {
L
Liu Jicong 已提交
314 315 316 317
    conf->snapBatchSize = atoi(value);
    return TMQ_CONF_OK;
  }

318 319
  if (strcasecmp(key, "enable.heartbeat.background") == 0) {
    if (strcasecmp(value, "true") == 0) {
320
      conf->hbBgEnable = true;
L
Liu Jicong 已提交
321
      return TMQ_CONF_OK;
322
    } else if (strcasecmp(value, "false") == 0) {
323
      conf->hbBgEnable = false;
L
Liu Jicong 已提交
324 325 326 327 328 329
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

330
  if (strcasecmp(key, "td.connect.ip") == 0) {
331
    conf->ip = taosStrdup(value);
L
Liu Jicong 已提交
332 333
    return TMQ_CONF_OK;
  }
334
  if (strcasecmp(key, "td.connect.user") == 0) {
335
    conf->user = taosStrdup(value);
L
Liu Jicong 已提交
336 337
    return TMQ_CONF_OK;
  }
338
  if (strcasecmp(key, "td.connect.pass") == 0) {
339
    conf->pass = taosStrdup(value);
L
Liu Jicong 已提交
340 341
    return TMQ_CONF_OK;
  }
342
  if (strcasecmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
343 344 345
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
346
  if (strcasecmp(key, "td.connect.db") == 0) {
L
Liu Jicong 已提交
347 348 349
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
350
  return TMQ_CONF_UNKNOWN;
351 352 353
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
354
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
355 356
}

L
Liu Jicong 已提交
357 358
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
359
  if (src == NULL || src[0] == 0) return -1;
360
  char* topic = taosStrdup(src);
361 362 363
  if (topic[0] != '`') {
    strtolower(topic, src);
  }
L
fix  
Liu Jicong 已提交
364
  if (taosArrayPush(container, &topic) == NULL) return -1;
365 366 367
  return 0;
}

L
Liu Jicong 已提交
368
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
369
  SArray* container = &list->container;
L
Liu Jicong 已提交
370
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
371 372
}

L
Liu Jicong 已提交
373 374 375 376 377 378 379 380 381 382
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

383 384 385 386 387
static SMqClientVg* foundClientVg(SArray* pTopicList, const char* pName, int32_t vgId, int32_t* index, int32_t* numOfVgroups) {
  int32_t numOfTopics = taosArrayGetSize(pTopicList);
  *index = -1;
  *numOfVgroups = 0;

388
  for(int32_t i = 0; i < numOfTopics; ++i) {
389 390
    SMqClientTopic* pTopic = taosArrayGet(pTopicList, i);
    if (strcmp(pTopic->topicName, pName) != 0) {
391 392 393
      continue;
    }

394 395
    *numOfVgroups = taosArrayGetSize(pTopic->vgs);
    for (int32_t j = 0; j < (*numOfVgroups); ++j) {
396
      SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
397 398 399
      if (pClientVg->vgId == vgId) {
        *index = j;
        return pClientVg;
400 401
      }
    }
L
Liu Jicong 已提交
402
  }
403 404

  return NULL;
L
Liu Jicong 已提交
405
}
406

H
Haojun Liao 已提交
407
static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
408
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
409
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
H
Haojun Liao 已提交
410

411
  if (code != TSDB_CODE_SUCCESS) { // if commit offset failed, let's try again
H
Haojun Liao 已提交
412
    taosThreadMutexLock(&pParam->pTmq->lock);
413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429
    int32_t numOfVgroups, index;
    SMqClientVg* pVg = foundClientVg(pParam->pTmq->clientTopics, pParam->topicName, pParam->vgId, &index, &numOfVgroups);

    if (pVg == NULL) {
      tscDebug("consumer:0x%" PRIx64
               " subKey:%s vgId:%d commit failed, code:%s has been transferred to other consumer, no need retry ordinal:%d/%d",
               pParam->pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, tstrerror(code), index + 1, numOfVgroups);
    } else { // let's retry the commit
      int32_t code1 = doSendCommitMsg(pParam->pTmq, pVg, pParam->topicName, pParamSet, index, numOfVgroups);
      if (code1 != TSDB_CODE_SUCCESS) {  // retry failed.
        tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64
                 " retry failed, ignore this commit. code:%s ordinal:%d/%d",
                 pParam->pTmq->consumerId, pParam->topicName, pVg->vgId, pVg->committedOffset.version,
                 tstrerror(terrno), index + 1, numOfVgroups);
      }
    }

H
Haojun Liao 已提交
430
    taosThreadMutexUnlock(&pParam->pTmq->lock);
431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462

    taosMemoryFree(pParam->pOffset);
    taosMemoryFree(pBuf->pData);
    taosMemoryFree(pBuf->pEpSet);

    tmqCommitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
    return 0;
  }

  // todo replace the pTmq with refId
  taosThreadMutexLock(&pParam->pTmq->lock);
  tmq_t* pTmq = pParam->pTmq;
  int32_t index = 0, numOfVgroups = 0;

  SMqClientVg* pVg = foundClientVg(pTmq->clientTopics, pParam->topicName, pParam->vgId, &index, &numOfVgroups);
  if (pVg == NULL) {
    tscDebug("consumer:0x%" PRIx64 " subKey:%s vgId:%d has been transferred to other consumer, ordinal:%d/%d",
             pParam->pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, index + 1, numOfVgroups);
  } else { // update the epset if needed
    if (pBuf->pEpSet != NULL) {
      SEp* pEp = GET_ACTIVE_EP(pBuf->pEpSet);
      SEp* pOld = GET_ACTIVE_EP(&(pVg->epSet));

      tscDebug("consumer:0x%" PRIx64 " subKey:%s update the epset vgId:%d, ep:%s:%d, old ep:%s:%d, ordinal:%d/%d",
               pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, pEp->fqdn, pEp->port, pOld->fqdn, pOld->port,
               index + 1, numOfVgroups);

      pVg->epSet = *pBuf->pEpSet;
    }

    // update the offset value.
    pVg->committedOffset = pVg->currentOffset;
H
Haojun Liao 已提交
463 464
    tscDebug("consumer:0x%" PRIx64 " subKey:%s vgId:%d, commit offset success. ordinal:%d/%d", pTmq->consumerId,
             pParam->pOffset->subKey, pParam->vgId, index + 1, numOfVgroups);
465 466
  }

467 468
  taosThreadMutexUnlock(&pParam->pTmq->lock);

L
Liu Jicong 已提交
469
  taosMemoryFree(pParam->pOffset);
L
Liu Jicong 已提交
470
  taosMemoryFree(pBuf->pData);
dengyihao's avatar
dengyihao 已提交
471
  taosMemoryFree(pBuf->pEpSet);
L
Liu Jicong 已提交
472

473
  tmqCommitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
474 475 476
  return 0;
}

477 478
static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet,
                               int32_t index, int32_t totalVgroups) {
L
Liu Jicong 已提交
479 480 481 482 483
  STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
  if (pOffset == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
484

L
Liu Jicong 已提交
485
  pOffset->val = pVg->currentOffset;
486

L
Liu Jicong 已提交
487 488 489
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pOffset->subKey, tmq->groupId, groupLen);
  pOffset->subKey[groupLen] = TMQ_SEPARATOR;
H
Haojun Liao 已提交
490
  strcpy(pOffset->subKey + groupLen + 1, pTopicName);
L
Liu Jicong 已提交
491

L
Liu Jicong 已提交
492 493 494 495 496 497
  int32_t len;
  int32_t code;
  tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
  if (code < 0) {
    return -1;
  }
498

L
Liu Jicong 已提交
499
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
L
Liu Jicong 已提交
500 501 502 503
  if (buf == NULL) {
    taosMemoryFree(pOffset);
    return -1;
  }
504

L
Liu Jicong 已提交
505
  ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
506

L
Liu Jicong 已提交
507 508 509 510 511
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
  tEncodeSTqOffset(&encoder, pOffset);
L
Liu Jicong 已提交
512
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
513 514

  // build param
515
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
516
  if (pParam == NULL) {
L
Liu Jicong 已提交
517
    taosMemoryFree(pOffset);
L
Liu Jicong 已提交
518 519 520
    taosMemoryFree(buf);
    return -1;
  }
521

L
Liu Jicong 已提交
522 523
  pParam->params = pParamSet;
  pParam->pOffset = pOffset;
H
Haojun Liao 已提交
524 525 526
  pParam->vgId = pVg->vgId;
  pParam->pTmq = tmq;

H
Haojun Liao 已提交
527
  tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName));
L
Liu Jicong 已提交
528 529 530 531

  // build send info
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
L
Liu Jicong 已提交
532
    taosMemoryFree(pOffset);
L
Liu Jicong 已提交
533 534
    taosMemoryFree(buf);
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
535 536
    return -1;
  }
537

L
Liu Jicong 已提交
538 539 540 541 542 543
  pMsgSendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = sizeof(SMsgHead) + len,
      .handle = NULL,
  };

H
Haojun Liao 已提交
544
  SEp* pEp = GET_ACTIVE_EP(&pVg->epSet);
545 546 547
  tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d offset:%" PRId64 " prev:%" PRId64 ", ep:%s:%d, ordinal:%d/%d",
           tmq->consumerId, pOffset->subKey, pVg->vgId, pOffset->val.version, pVg->committedOffset.version, pEp->fqdn,
           pEp->port, index + 1, totalVgroups);
L
Liu Jicong 已提交
548 549 550 551

  pMsgSendInfo->requestId = generateRequestId();
  pMsgSendInfo->requestObjRefId = 0;
  pMsgSendInfo->param = pParam;
L
Liu Jicong 已提交
552
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
553
  pMsgSendInfo->fp = tmqCommitCb;
L
Liu Jicong 已提交
554
  pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET;
L
Liu Jicong 已提交
555

L
Liu Jicong 已提交
556 557 558
  atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
  atomic_add_fetch_32(&pParamSet->totalRspNum, 1);

L
Liu Jicong 已提交
559 560 561 562 563
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
  return 0;
}

H
Haojun Liao 已提交
564
static int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
L
Liu Jicong 已提交
565 566 567 568 569 570 571 572 573 574
  char*   topic;
  int32_t vgId;
  if (TD_RES_TMQ(msg)) {
    SMqRspObj* pRspObj = (SMqRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
  } else if (TD_RES_TMQ_META(msg)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
    topic = pMetaRspObj->topic;
    vgId = pMetaRspObj->vgId;
L
Liu Jicong 已提交
575
  } else if (TD_RES_TMQ_METADATA(msg)) {
576 577 578
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
L
Liu Jicong 已提交
579 580 581 582 583 584 585 586 587
  } else {
    return TSDB_CODE_TMQ_INVALID_MSG;
  }

  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
H
Haojun Liao 已提交
588

589 590
  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;
L
Liu Jicong 已提交
591 592 593 594 595 596
  pParamSet->automatic = 0;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

L
Liu Jicong 已提交
597 598
  int32_t code = -1;

H
Haojun Liao 已提交
599
  taosThreadMutexLock(&tmq->lock);
L
Liu Jicong 已提交
600 601
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
602 603 604 605 606 607
    if (strcmp(pTopic->topicName, topic) != 0) {
      continue;
    }

    int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
    for (int32_t j = 0; j < numOfVgroups; j++) {
L
Liu Jicong 已提交
608
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
H
Haojun Liao 已提交
609 610 611
      if (pVg->vgId != vgId) {
        continue;
      }
L
Liu Jicong 已提交
612

L
Liu Jicong 已提交
613
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
614
        if (doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups) < 0) {
L
Liu Jicong 已提交
615 616
          tsem_destroy(&pParamSet->rspSem);
          taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
617
          goto FAIL;
L
Liu Jicong 已提交
618
        }
L
Liu Jicong 已提交
619
        goto HANDLE_RSP;
L
Liu Jicong 已提交
620 621
      }
    }
L
Liu Jicong 已提交
622
  }
L
Liu Jicong 已提交
623

L
Liu Jicong 已提交
624 625 626 627
HANDLE_RSP:
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
H
Haojun Liao 已提交
628
    taosThreadMutexUnlock(&tmq->lock);
L
Liu Jicong 已提交
629 630 631
    return 0;
  }

L
Liu Jicong 已提交
632 633 634 635
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
L
Liu Jicong 已提交
636
    taosMemoryFree(pParamSet);
H
Haojun Liao 已提交
637
    taosThreadMutexUnlock(&tmq->lock);
L
Liu Jicong 已提交
638 639 640 641 642 643
    return code;
  } else {
    code = 0;
  }

FAIL:
H
Haojun Liao 已提交
644
  taosThreadMutexUnlock(&tmq->lock);
L
Liu Jicong 已提交
645 646 647
  if (code != 0 && async) {
    userCb(tmq, code, userParam);
  }
H
Haojun Liao 已提交
648

L
Liu Jicong 已提交
649 650 651
  return 0;
}

L
Liu Jicong 已提交
652 653
static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                                     void* userParam) {
L
Liu Jicong 已提交
654 655
  int32_t code = -1;

656 657
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
L
Liu Jicong 已提交
658 659 660 661 662 663 664 665
    code = TSDB_CODE_OUT_OF_MEMORY;
    if (async) {
      if (automatic) {
        tmq->commitCb(tmq, code, tmq->commitCbUserParam);
      } else {
        userCb(tmq, code, userParam);
      }
    }
666 667
    return -1;
  }
668 669 670 671

  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;

672 673 674 675 676 677
  pParamSet->automatic = automatic;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

678 679 680
  // init as 1 to prevent concurrency issue
  pParamSet->waitingRspNum = 1;

H
Haojun Liao 已提交
681
  taosThreadMutexLock(&tmq->lock);
682
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
X
Xiaoyu Wang 已提交
683
  tscDebug("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
684 685

  for (int32_t i = 0; i < numOfTopics; i++) {
686
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
687
    int32_t         numOfVgroups = taosArrayGetSize(pTopic->vgs);
L
Liu Jicong 已提交
688

689 690
    tscDebug("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName,
             numOfVgroups);
691
    for (int32_t j = 0; j < numOfVgroups; j++) {
692 693 694 695 696 697 698 699
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);

      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
        code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups);
        if (code != TSDB_CODE_SUCCESS) {
          tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64 " failed, code:%s ordinal:%d/%d",
                   tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->committedOffset.version, tstrerror(terrno),
                   j + 1, numOfVgroups);
L
Liu Jicong 已提交
700 701
          continue;
        }
702
      } else {
703 704
        tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d",
                 tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->currentOffset.version, j + 1, numOfVgroups);
705 706 707 708
      }
    }
  }

709 710
  tscDebug("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum,
           numOfTopics);
H
Haojun Liao 已提交
711 712
  taosThreadMutexUnlock(&tmq->lock);

L
Liu Jicong 已提交
713
  // no request is sent
L
Liu Jicong 已提交
714 715 716 717 718 719
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
    return 0;
  }

L
Liu Jicong 已提交
720
  // count down since waiting rsp num init as 1
721
  tmqCommitRspCountDown(pParamSet, tmq->consumerId, "", 0);
722

723 724 725 726
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
727
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
728
#if 0
729 730
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
L
Liu Jicong 已提交
731
#endif
L
Liu Jicong 已提交
732
  }
733

L
Liu Jicong 已提交
734 735 736
  return code;
}

737 738
static int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                              void* userParam) {
739
  if (msg) { // user invoked commit?
L
Liu Jicong 已提交
740
    return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
741
  } else {  // this for auto commit
L
Liu Jicong 已提交
742 743
    return tmqCommitConsumerImpl(tmq, automatic, async, userCb, userParam);
  }
744 745
}

746
void tmqAssignAskEpTask(void* param, void* tmrId) {
747 748 749
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
S
Shengliang Guan 已提交
750
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
751 752 753 754 755
    *pTaskType = TMQ_DELAYED_TASK__ASK_EP;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
756 757 758
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
759 760 761
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
S
Shengliang Guan 已提交
762
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
763 764 765 766 767
    *pTaskType = TMQ_DELAYED_TASK__COMMIT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
768 769 770
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
771 772 773
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
S
Shengliang Guan 已提交
774
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
775 776 777 778 779
    *pTaskType = TMQ_DELAYED_TASK__REPORT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
780 781
}

782
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
dengyihao's avatar
dengyihao 已提交
783 784 785 786
  if (pMsg) {
    taosMemoryFree(pMsg->pData);
    taosMemoryFree(pMsg->pEpSet);
  }
787 788 789 790
  return 0;
}

void tmqSendHbReq(void* param, void* tmrId) {
791 792 793
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq == NULL) {
L
Liu Jicong 已提交
794
    taosMemoryFree(param);
795 796
    return;
  }
D
dapan1121 已提交
797 798 799 800 801

  SMqHbReq req = {0};
  req.consumerId = tmq->consumerId;
  req.epoch = tmq->epoch;

L
Liu Jicong 已提交
802
  int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
D
dapan1121 已提交
803 804 805 806
  if (tlen < 0) {
    tscError("tSerializeSMqHbReq failed");
    return;
  }
L
Liu Jicong 已提交
807
  void* pReq = taosMemoryCalloc(1, tlen);
D
dapan1121 已提交
808 809 810 811 812 813 814 815 816
  if (tlen < 0) {
    tscError("failed to malloc MqHbReq msg, size:%d", tlen);
    return;
  }
  if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) {
    tscError("tSerializeSMqHbReq %d failed", tlen);
    taosMemoryFree(pReq);
    return;
  }
817 818 819 820

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
821
    goto OVER;
822 823 824
  }
  sendInfo->msgInfo = (SDataBuf){
      .pData = pReq,
D
dapan1121 已提交
825
      .len = tlen,
826 827 828 829 830 831 832
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = NULL;
  sendInfo->fp = tmqHbCb;
L
Liu Jicong 已提交
833
  sendInfo->msgType = TDMT_MND_TMQ_HB;
834 835 836 837 838 839 840

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

OVER:
841
  taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer);
842 843
}

844
int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
L
Liu Jicong 已提交
845
  STaosQall* qall = taosAllocateQall();
846
  taosReadAllQitems(pTmq->delayedTask, qall);
L
Liu Jicong 已提交
847

848 849 850 851
  if (qall->numOfItems == 0) {
    taosFreeQall(qall);
    return TSDB_CODE_SUCCESS;
  }
852

X
Xiaoyu Wang 已提交
853
  tscDebug("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, qall->numOfItems);
854 855
  int8_t* pTaskType = NULL;
  taosGetQitem(qall, (void**)&pTaskType);
L
Liu Jicong 已提交
856

857
  while (pTaskType != NULL) {
858
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
859
      tmqAskEp(pTmq, true);
860 861

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
862
      *pRefId = pTmq->refId;
863

X
Xiaoyu Wang 已提交
864
      tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
865
      taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer);
L
Liu Jicong 已提交
866
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
867
      tmqCommitInner(pTmq, NULL, 1, 1, pTmq->commitCb, pTmq->commitCbUserParam);
868 869

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
870
      *pRefId = pTmq->refId;
871

X
Xiaoyu Wang 已提交
872
      tscDebug("consumer:0x%" PRIx64 " commit to vnode(s) in %.2fs", pTmq->consumerId,
X
Xiaoyu Wang 已提交
873
               pTmq->autoCommitInterval / 1000.0);
874
      taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer);
L
Liu Jicong 已提交
875 876
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    }
877

L
Liu Jicong 已提交
878
    taosFreeQitem(pTaskType);
879
    taosGetQitem(qall, (void**)&pTaskType);
L
Liu Jicong 已提交
880
  }
881

L
Liu Jicong 已提交
882 883 884 885
  taosFreeQall(qall);
  return 0;
}

L
Liu Jicong 已提交
886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912
static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
    // do nothing
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
    SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
    tDeleteSMqAskEpRsp(&pEpRspWrapper->msg);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
    taosArrayDestroyP(pRsp->dataRsp.blockData, taosMemoryFree);
    taosArrayDestroy(pRsp->dataRsp.blockDataLen);
    taosArrayDestroyP(pRsp->dataRsp.blockTbName, taosMemoryFree);
    taosArrayDestroyP(pRsp->dataRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
    taosMemoryFree(pRsp->metaRsp.metaRsp);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
    taosArrayDestroyP(pRsp->taosxRsp.blockData, taosMemoryFree);
    taosArrayDestroy(pRsp->taosxRsp.blockDataLen);
    taosArrayDestroyP(pRsp->taosxRsp.blockTbName, taosMemoryFree);
    taosArrayDestroyP(pRsp->taosxRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
    // taosx
    taosArrayDestroy(pRsp->taosxRsp.createTableLen);
    taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree);
  }
}

L
Liu Jicong 已提交
913
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
914
  SMqRspWrapper* rspWrapper = NULL;
L
Liu Jicong 已提交
915
  while (1) {
L
Liu Jicong 已提交
916 917 918 919 920
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper) {
      tmqFreeRspWrapper(rspWrapper);
      taosFreeQitem(rspWrapper);
    } else {
L
Liu Jicong 已提交
921
      break;
L
Liu Jicong 已提交
922
    }
L
Liu Jicong 已提交
923 924
  }

L
Liu Jicong 已提交
925
  rspWrapper = NULL;
L
Liu Jicong 已提交
926 927
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
L
Liu Jicong 已提交
928 929 930 931 932
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper) {
      tmqFreeRspWrapper(rspWrapper);
      taosFreeQitem(rspWrapper);
    } else {
L
Liu Jicong 已提交
933
      break;
L
Liu Jicong 已提交
934
    }
L
Liu Jicong 已提交
935 936 937
  }
}

D
dapan1121 已提交
938
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
939 940
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
dengyihao's avatar
dengyihao 已提交
941 942

  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
943 944 945
  tsem_post(&pParam->rspSem);
  return 0;
}
946

L
Liu Jicong 已提交
947
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
X
Xiaoyu Wang 已提交
948 949 950 951
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
952
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
953
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
954
  }
L
Liu Jicong 已提交
955
  return 0;
X
Xiaoyu Wang 已提交
956 957
}

L
Liu Jicong 已提交
958
int32_t tmq_unsubscribe(tmq_t* tmq) {
L
Liu Jicong 已提交
959 960
  int32_t     rsp;
  int32_t     retryCnt = 0;
L
Liu Jicong 已提交
961
  tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
962 963 964 965 966 967 968 969 970 971
  while (1) {
    rsp = tmq_subscribe(tmq, lst);
    if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
      break;
    } else {
      retryCnt++;
      taosMsleep(500);
    }
  }

L
Liu Jicong 已提交
972 973
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
974 975
}

976 977
void tmqFreeImpl(void* handle) {
  tmq_t* tmq = (tmq_t*)handle;
L
Liu Jicong 已提交
978

979
  // TODO stop timer
L
Liu Jicong 已提交
980 981 982 983
  if (tmq->mqueue) {
    tmqClearUnhandleMsg(tmq);
    taosCloseQueue(tmq->mqueue);
  }
L
Liu Jicong 已提交
984

H
Haojun Liao 已提交
985 986 987 988 989
  if (tmq->delayedTask) {
    taosCloseQueue(tmq->delayedTask);
  }

  taosFreeQall(tmq->qall);
990
  tsem_destroy(&tmq->rspSem);
H
Haojun Liao 已提交
991
  taosThreadMutexDestroy(&tmq->lock);
L
Liu Jicong 已提交
992

993 994 995
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
996
    taosMemoryFreeClear(pTopic->schema.pSchema);
997 998
    taosArrayDestroy(pTopic->vgs);
  }
H
Haojun Liao 已提交
999

1000 1001 1002
  taosArrayDestroy(tmq->clientTopics);
  taos_close_internal(tmq->pTscObj);
  taosMemoryFree(tmq);
L
Liu Jicong 已提交
1003 1004
}

1005 1006 1007 1008 1009 1010 1011 1012 1013
static void tmqMgmtInit(void) {
  tmqInitRes = 0;
  tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");

  if (tmqMgmt.timer == NULL) {
    tmqInitRes = TSDB_CODE_OUT_OF_MEMORY;
  }

  tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
1014
  if (tmqMgmt.rsetId < 0) {
1015 1016 1017 1018
    tmqInitRes = terrno;
  }
}

L
Liu Jicong 已提交
1019
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
1020 1021 1022 1023
  taosThreadOnce(&tmqInit, tmqMgmtInit);
  if (tmqInitRes != 0) {
    terrno = tmqInitRes;
    return NULL;
L
Liu Jicong 已提交
1024 1025
  }

L
Liu Jicong 已提交
1026 1027
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
L
Liu Jicong 已提交
1028
    terrno = TSDB_CODE_OUT_OF_MEMORY;
1029
    tscError("failed to create consumer, groupId:%s, code:%s", conf->groupId, terrstr());
L
Liu Jicong 已提交
1030 1031
    return NULL;
  }
L
Liu Jicong 已提交
1032

L
Liu Jicong 已提交
1033 1034 1035
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

L
Liu Jicong 已提交
1036 1037 1038
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->delayedTask = taosOpenQueue();
H
Haojun Liao 已提交
1039
  pTmq->qall = taosAllocateQall();
L
Liu Jicong 已提交
1040

H
Haojun Liao 已提交
1041
  taosThreadMutexInit(&pTmq->lock, NULL);
X
Xiaoyu Wang 已提交
1042 1043
  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL ||
      conf->groupId[0] == 0) {
L
Liu Jicong 已提交
1044
    terrno = TSDB_CODE_OUT_OF_MEMORY;
1045
    tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(),
S
Shengliang Guan 已提交
1046
             pTmq->groupId);
1047
    goto _failed;
L
Liu Jicong 已提交
1048
  }
L
Liu Jicong 已提交
1049

L
Liu Jicong 已提交
1050 1051
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
1052 1053
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
1054 1055
  /*pTmq->epStatus = 0;*/
  /*pTmq->epSkipCnt = 0;*/
L
Liu Jicong 已提交
1056

L
Liu Jicong 已提交
1057 1058 1059
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
1060
  pTmq->withTbName = conf->withTbName;
L
Liu Jicong 已提交
1061
  pTmq->useSnapshot = conf->snapEnable;
L
Liu Jicong 已提交
1062
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
1063
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
1064 1065
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
1066 1067
  pTmq->resetOffsetCfg = conf->resetOffset;

1068 1069
  pTmq->hbBgEnable = conf->hbBgEnable;

L
Liu Jicong 已提交
1070
  // assign consumerId
L
Liu Jicong 已提交
1071
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
1072

L
Liu Jicong 已提交
1073 1074
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
1075
    tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
S
Shengliang Guan 已提交
1076
             pTmq->groupId);
1077
    goto _failed;
L
Liu Jicong 已提交
1078
  }
L
Liu Jicong 已提交
1079

L
Liu Jicong 已提交
1080 1081 1082
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
1083
    tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
L
Liu Jicong 已提交
1084
    tsem_destroy(&pTmq->rspSem);
1085
    goto _failed;
L
Liu Jicong 已提交
1086
  }
L
Liu Jicong 已提交
1087

1088 1089
  pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
  if (pTmq->refId < 0) {
1090
    goto _failed;
1091 1092
  }

1093
  if (pTmq->hbBgEnable) {
L
Liu Jicong 已提交
1094 1095
    int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
    *pRefId = pTmq->refId;
1096
    pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
1097 1098
  }

1099 1100 1101 1102 1103 1104
  char         buf[80] = {0};
  STqOffsetVal offset = {.type = pTmq->resetOffsetCfg};
  tFormatOffset(buf, tListLen(buf), &offset);
  tscInfo("consumer:0x%" PRIx64 " is setup, groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s, backgroudHB:%d",
          pTmq->consumerId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval, buf,
          pTmq->hbBgEnable);
L
Liu Jicong 已提交
1105

1106
  return pTmq;
1107

1108 1109
_failed:
  tmqFreeImpl(pTmq);
L
Liu Jicong 已提交
1110
  return NULL;
1111 1112
}

L
Liu Jicong 已提交
1113
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
L
Liu Jicong 已提交
1114 1115 1116
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
L
Liu Jicong 已提交
1117
  SMsgSendInfo*   sendInfo = NULL;
L
Liu Jicong 已提交
1118
  SCMSubscribeReq req = {0};
1119
  int32_t         code = 0;
1120

1121
  tscDebug("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz);
L
Liu Jicong 已提交
1122

1123
  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
1124
  tstrncpy(req.clientId, tmq->clientId, 256);
L
Liu Jicong 已提交
1125
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
1126 1127
  req.topicNames = taosArrayInit(sz, sizeof(void*));

1128 1129 1130 1131
  if (req.topicNames == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
1132

L
Liu Jicong 已提交
1133 1134
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
1135 1136

    SName name = {0};
L
Liu Jicong 已提交
1137 1138 1139 1140
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
1141 1142
    }

1143
    tNameExtractFullName(&name, topicFName);
X
Xiaoyu Wang 已提交
1144
    tscDebug("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName);
L
Liu Jicong 已提交
1145 1146

    taosArrayPush(req.topicNames, &topicFName);
1147 1148
  }

L
Liu Jicong 已提交
1149
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
1150

L
Liu Jicong 已提交
1151
  buf = taosMemoryMalloc(tlen);
1152 1153 1154 1155
  if (buf == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
L
Liu Jicong 已提交
1156

1157 1158 1159
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

L
Liu Jicong 已提交
1160
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
1161 1162 1163 1164
  if (sendInfo == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
1165

X
Xiaoyu Wang 已提交
1166
  SMqSubscribeCbParam param = {
L
Liu Jicong 已提交
1167
      .rspErr = 0,
1168 1169
      .refId = tmq->refId,
      .epoch = tmq->epoch,
X
Xiaoyu Wang 已提交
1170
  };
L
Liu Jicong 已提交
1171

1172 1173 1174
  if (tsem_init(&param.rspSem, 0, 0) != 0) {
    goto FAIL;
  }
L
Liu Jicong 已提交
1175 1176

  sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1177 1178 1179 1180
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
1181

L
Liu Jicong 已提交
1182 1183
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1184 1185
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
1186
  sendInfo->msgType = TDMT_MND_TMQ_SUBSCRIBE;
L
Liu Jicong 已提交
1187

1188 1189 1190 1191 1192
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
1193 1194
  // avoid double free if msg is sent
  buf = NULL;
L
Liu Jicong 已提交
1195
  sendInfo = NULL;
L
Liu Jicong 已提交
1196

L
Liu Jicong 已提交
1197 1198
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
1199

1200 1201 1202 1203
  if (param.rspErr != 0) {
    code = param.rspErr;
    goto FAIL;
  }
L
Liu Jicong 已提交
1204

L
Liu Jicong 已提交
1205
  int32_t retryCnt = 0;
L
Liu Jicong 已提交
1206
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
wmmhello's avatar
wmmhello 已提交
1207
    if (retryCnt++ > 40) {
L
Liu Jicong 已提交
1208 1209
      goto FAIL;
    }
1210

X
Xiaoyu Wang 已提交
1211
    tscDebug("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
L
Liu Jicong 已提交
1212 1213
    taosMsleep(500);
  }
1214

1215 1216
  // init ep timer
  if (tmq->epTimer == NULL) {
1217 1218 1219
    int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId1 = tmq->refId;
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, pRefId1, tmqMgmt.timer);
1220
  }
L
Liu Jicong 已提交
1221 1222

  // init auto commit timer
1223
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
1224 1225 1226
    int64_t* pRefId2 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId2 = tmq->refId;
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId2, tmqMgmt.timer);
L
Liu Jicong 已提交
1227 1228
  }

L
Liu Jicong 已提交
1229
FAIL:
L
Liu Jicong 已提交
1230
  taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
1231
  taosMemoryFree(buf);
L
Liu Jicong 已提交
1232
  taosMemoryFree(sendInfo);
L
Liu Jicong 已提交
1233

L
Liu Jicong 已提交
1234
  return code;
1235 1236
}

L
Liu Jicong 已提交
1237
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
1238
  conf->commitCb = cb;
L
Liu Jicong 已提交
1239
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
1240
}
1241

D
dapan1121 已提交
1242
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1243 1244
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
1245
  SMqClientTopic* pTopic = pParam->pTopic;
1246 1247 1248 1249 1250 1251

  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
  if (tmq == NULL) {
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1252
    taosMemoryFree(pMsg->pEpSet);
1253 1254 1255 1256
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

H
Haojun Liao 已提交
1257 1258 1259 1260
  int32_t  epoch = pParam->epoch;
  int32_t  vgId = pParam->vgId;
  uint64_t requestId = pParam->requestId;

L
Liu Jicong 已提交
1261
  taosMemoryFree(pParam);
H
Haojun Liao 已提交
1262

L
Liu Jicong 已提交
1263
  if (code != 0) {
H
Haojun Liao 已提交
1264 1265 1266
    tscWarn("consumer:0x%"PRIx64" msg from vgId:%d discarded, epoch %d, since %s, reqId:0x%"PRIx64, tmq->consumerId, vgId,
        epoch, tstrerror(code), requestId);

L
Liu Jicong 已提交
1267
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1268 1269
    if (pMsg->pEpSet) taosMemoryFree(pMsg->pEpSet);

H
Haojun Liao 已提交
1270
    // in case of consumer mismatch, wait for 500ms and retry
L
Liu Jicong 已提交
1271
    if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
1272
      taosMsleep(500);
L
Liu Jicong 已提交
1273
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
1274
      tscDebug("consumer:0x%" PRIx64" wait for the re-balance, wait for 500ms and set status to be RECOVER", tmq->consumerId);
H
Haojun Liao 已提交
1275
    } else if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
S
Shengliang Guan 已提交
1276
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1277
      if (pRspWrapper == NULL) {
H
Haojun Liao 已提交
1278 1279
        tscWarn("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d since out of memory, reqId:0x%" PRIx64,
                tmq->consumerId, vgId, epoch, requestId);
L
Liu Jicong 已提交
1280 1281
        goto CREATE_MSG_FAIL;
      }
H
Haojun Liao 已提交
1282

L
Liu Jicong 已提交
1283 1284 1285 1286 1287 1288
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      /*pRspWrapper->vgHandle = pVg;*/
      /*pRspWrapper->topicHandle = pTopic;*/
      taosWriteQitem(tmq->mqueue, pRspWrapper);
      tsem_post(&tmq->rspSem);
    }
H
Haojun Liao 已提交
1289

L
fix txn  
Liu Jicong 已提交
1290
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1291 1292
  }

X
Xiaoyu Wang 已提交
1293 1294 1295
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
1296
    // do not write into queue since updating epoch reset
H
Haojun Liao 已提交
1297 1298 1299
    tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d, reqId:0x%"PRIx64,
            tmq->consumerId, vgId, msgEpoch, tmqEpoch, requestId);

1300
    tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1301
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1302
    taosMemoryFree(pMsg->pEpSet);
X
Xiaoyu Wang 已提交
1303 1304 1305 1306
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
H
Haojun Liao 已提交
1307 1308
    tscWarn("consumer:0x%"PRIx64" mismatch rsp from vgId:%d, epoch %d, current epoch %d, reqId:0x%"PRIx64, tmq->consumerId, vgId,
        msgEpoch, tmqEpoch, requestId);
X
Xiaoyu Wang 已提交
1309 1310
  }

L
Liu Jicong 已提交
1311 1312 1313
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

S
Shengliang Guan 已提交
1314
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1315
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1316
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1317
    taosMemoryFree(pMsg->pEpSet);
H
Haojun Liao 已提交
1318
    tscWarn("consumer:0x%"PRIx64" msg discard from vgId:%d, epoch %d since out of memory", tmq->consumerId, vgId, epoch);
L
fix txn  
Liu Jicong 已提交
1319
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1320
  }
L
Liu Jicong 已提交
1321

L
Liu Jicong 已提交
1322
  pRspWrapper->tmqRspType = rspType;
L
Liu Jicong 已提交
1323 1324
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
1325

L
Liu Jicong 已提交
1326
  if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1327 1328 1329
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
wmmhello's avatar
wmmhello 已提交
1330
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1331
    memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1332

H
Haojun Liao 已提交
1333 1334
    tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req:%" PRId64 ", rsp:%" PRId64
             " type %d, reqId:0x%" PRIx64,
H
Haojun Liao 已提交
1335
             tmq->consumerId, vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version, rspType, requestId);
H
Haojun Liao 已提交
1336

L
Liu Jicong 已提交
1337
  } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
1338 1339 1340 1341
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqMetaRsp(&decoder, &pRspWrapper->metaRsp);
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1342
    memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1343 1344 1345 1346 1347 1348
  } else if (rspType == TMQ_MSG_TYPE__TAOSX_RSP) {
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp);
    tDecoderClear(&decoder);
    memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1349
  }
L
Liu Jicong 已提交
1350

L
Liu Jicong 已提交
1351
  taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1352
  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
1353

H
Haojun Liao 已提交
1354 1355
  tscDebug("consumer:0x%" PRIx64 ", put poll res into mqueue, total in queue:%d, reqId:0x%" PRIx64, tmq->consumerId,
           tmq->mqueue->numOfItems, requestId);
L
Liu Jicong 已提交
1356

L
Liu Jicong 已提交
1357
  taosWriteQitem(tmq->mqueue, pRspWrapper);
1358
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1359

L
Liu Jicong 已提交
1360
  return 0;
H
Haojun Liao 已提交
1361

L
fix txn  
Liu Jicong 已提交
1362
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1363
  if (epoch == tmq->epoch) {
L
Liu Jicong 已提交
1364 1365
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
H
Haojun Liao 已提交
1366

1367
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1368
  return -1;
1369 1370
}

H
Haojun Liao 已提交
1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387
static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap,
                                   tmq_t* tmq) {
  pTopic->schema = pTopicEp->schema;
  pTopicEp->schema.nCols = 0;
  pTopicEp->schema.pSchema = NULL;

  char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
  int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);

  tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN);

  tscDebug("consumer:0x%" PRIx64 ", update topic:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet);
  pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));

  for (int32_t j = 0; j < vgNumGet; j++) {
    SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
H
Haojun Liao 已提交
1388 1389

    makeTopicVgroupKey(vgKey, pTopic->topicName, pVgEp->vgId);
H
Haojun Liao 已提交
1390
    STqOffsetVal* pOffset = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
H
Haojun Liao 已提交
1391

H
Haojun Liao 已提交
1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419
    STqOffsetVal  offsetNew = {.type = tmq->resetOffsetCfg};
    if (pOffset != NULL) {
      offsetNew = *pOffset;
    }

    SMqClientVg clientVg = {
        .pollCnt = 0,
        .currentOffset = offsetNew,
        .vgId = pVgEp->vgId,
        .epSet = pVgEp->epSet,
        .vgStatus = TMQ_VG_STATUS__IDLE,
        .vgSkipCnt = 0,
    };

    taosArrayPush(pTopic->vgs, &clientVg);
  }
}

static void freeClientVgInfo(void* param) {
  SMqClientTopic* pTopic = param;
  if (pTopic->schema.nCols) {
    taosMemoryFreeClear(pTopic->schema.pSchema);
  }

  taosArrayDestroy(pTopic->vgs);
}

static bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
1420 1421
  bool set = false;

1422
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
1423
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
1424

X
Xiaoyu Wang 已提交
1425 1426
  char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
  tscDebug("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
1427
           tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur);
1428 1429 1430 1431 1432 1433

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

H
Haojun Liao 已提交
1434 1435
  SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pVgOffsetHashMap == NULL) {
1436 1437 1438
    taosArrayDestroy(newTopics);
    return false;
  }
1439

H
Haojun Liao 已提交
1440
  // todo extract method
1441 1442 1443 1444 1445
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
1446
      tscDebug("consumer:0x%" PRIx64 ", new vg num: %d", tmq->consumerId, vgNumCur);
1447 1448
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
H
Haojun Liao 已提交
1449 1450
        makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId);

L
Liu Jicong 已提交
1451
        char buf[80];
L
Liu Jicong 已提交
1452
        tFormatOffset(buf, 80, &pVgCur->currentOffset);
H
Haojun Liao 已提交
1453
        tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch,
L
Liu Jicong 已提交
1454
                 pVgCur->vgId, vgKey, buf);
H
Haojun Liao 已提交
1455
        taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal));
1456 1457 1458 1459 1460 1461 1462
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
H
Haojun Liao 已提交
1463
    initClientTopicFromRsp(&topic, pTopicEp, pVgOffsetHashMap, tmq);
1464 1465
    taosArrayPush(newTopics, &topic);
  }
1466

H
Haojun Liao 已提交
1467 1468 1469
  taosHashCleanup(pVgOffsetHashMap);

  taosThreadMutexLock(&tmq->lock);
1470
  // destroy current buffered existed topics info
1471
  if (tmq->clientTopics) {
H
Haojun Liao 已提交
1472
    taosArrayDestroyEx(tmq->clientTopics, freeClientVgInfo);
X
Xiaoyu Wang 已提交
1473
  }
1474

H
Haojun Liao 已提交
1475 1476
  tmq->clientTopics = newTopics;
  taosThreadMutexUnlock(&tmq->lock);
1477

H
Haojun Liao 已提交
1478 1479
  int8_t flag = (topicNumGet == 0)? TMQ_CONSUMER_STATUS__NO_TOPIC:TMQ_CONSUMER_STATUS__READY;
  atomic_store_8(&tmq->status, flag);
X
Xiaoyu Wang 已提交
1480
  atomic_store_32(&tmq->epoch, epoch);
H
Haojun Liao 已提交
1481

1482
  tscDebug("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId);
X
Xiaoyu Wang 已提交
1483 1484 1485
  return set;
}

H
Haojun Liao 已提交
1486
static int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
1487
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1488
  int8_t           async = pParam->async;
1489 1490 1491 1492 1493 1494 1495 1496 1497
  tmq_t*           tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);

  if (tmq == NULL) {
    if (!async) {
      tsem_destroy(&pParam->rspSem);
    } else {
      taosMemoryFree(pParam);
    }
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1498
    taosMemoryFree(pMsg->pEpSet);
1499 1500 1501 1502
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

L
Liu Jicong 已提交
1503
  pParam->code = code;
H
Haojun Liao 已提交
1504
  if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
1505 1506
    tscError("consumer:0x%" PRIx64 ", get topic endpoint error, async:%d, code:%s", tmq->consumerId, pParam->async,
             tstrerror(code));
L
Liu Jicong 已提交
1507
    goto END;
1508
  }
L
Liu Jicong 已提交
1509

L
Liu Jicong 已提交
1510
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1511
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1512
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1513 1514 1515
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
  if (head->epoch <= epoch) {
1516 1517
    tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep",
             tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1518
    goto END;
1519
  }
L
Liu Jicong 已提交
1520

1521 1522 1523
  tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId,
           head->epoch, epoch);

L
Liu Jicong 已提交
1524
  if (!async) {
L
Liu Jicong 已提交
1525 1526
    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
L
Liu Jicong 已提交
1527
    tmqUpdateEp(tmq, head->epoch, &rsp);
L
Liu Jicong 已提交
1528
    tDeleteSMqAskEpRsp(&rsp);
X
Xiaoyu Wang 已提交
1529
  } else {
S
Shengliang Guan 已提交
1530
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1531
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1532
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1533 1534
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1535
    }
1536

L
Liu Jicong 已提交
1537 1538 1539
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1540
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1541

L
Liu Jicong 已提交
1542
    taosWriteQitem(tmq->mqueue, pWrapper);
1543
    tsem_post(&tmq->rspSem);
1544
  }
L
Liu Jicong 已提交
1545 1546

END:
L
Liu Jicong 已提交
1547
  if (!async) {
L
Liu Jicong 已提交
1548
    tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
1549 1550
  } else {
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
1551
  }
dengyihao's avatar
dengyihao 已提交
1552 1553

  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
1554
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1555
  return code;
1556 1557
}

L
Liu Jicong 已提交
1558
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1559 1560 1561 1562
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1563

1564
  pReq->withTbName = tmq->withTbName;
L
Liu Jicong 已提交
1565
  pReq->consumerId = tmq->consumerId;
1566
  pReq->timeout = timeout;
X
Xiaoyu Wang 已提交
1567
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1568
  /*pReq->currentOffset = reqOffset;*/
L
Liu Jicong 已提交
1569
  pReq->reqOffset = pVg->currentOffset;
D
dapan1121 已提交
1570
  pReq->head.vgId = pVg->vgId;
1571 1572
  pReq->useSnapshot = tmq->useSnapshot;
  pReq->reqId = generateRequestId();
1573 1574
}

L
Liu Jicong 已提交
1575 1576
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
L
Liu Jicong 已提交
1577
  pRspObj->resType = RES_TYPE__TMQ_META;
L
Liu Jicong 已提交
1578 1579 1580 1581 1582 1583 1584 1585
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

L
Liu Jicong 已提交
1586 1587 1588
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1589 1590
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1591
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1592
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1593
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
L
Liu Jicong 已提交
1594

L
Liu Jicong 已提交
1595 1596
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
L
Liu Jicong 已提交
1597
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1598 1599
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1600

L
Liu Jicong 已提交
1601
  return pRspObj;
X
Xiaoyu Wang 已提交
1602 1603
}

L
Liu Jicong 已提交
1604 1605
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
1606
  pRspObj->resType = RES_TYPE__TMQ_METADATA;
L
Liu Jicong 已提交
1607 1608 1609 1610
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;
  pRspObj->resIter = -1;
1611
  memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp));
L
Liu Jicong 已提交
1612 1613 1614

  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
1615
  if (!pWrapper->taosxRsp.withSchema) {
L
Liu Jicong 已提交
1616 1617 1618 1619 1620 1621
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }

  return pRspObj;
}

1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654
static int32_t handleErrorBeforePoll(SMqClientVg* pVg, tmq_t* pTmq) {
  atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  tsem_post(&pTmq->rspSem);
  return -1;
}

static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg, int64_t timeout) {
  SMqPollReq req = {0};
  tmqBuildConsumeReqImpl(&req, pTmq, timeout, pTopic, pVg);

  int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
  if (msgSize < 0) {
    return handleErrorBeforePoll(pVg, pTmq);
  }

  char* msg = taosMemoryCalloc(1, msgSize);
  if (NULL == msg) {
    return handleErrorBeforePoll(pVg, pTmq);
  }

  if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
    taosMemoryFree(msg);
    return handleErrorBeforePoll(pVg, pTmq);
  }

  SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
  if (pParam == NULL) {
    taosMemoryFree(msg);
    return handleErrorBeforePoll(pVg, pTmq);
  }

  pParam->refId = pTmq->refId;
  pParam->epoch = pTmq->epoch;
X
Xiaoyu Wang 已提交
1655
  pParam->pVg = pVg;  // pVg may be released,fix it
1656 1657
  pParam->pTopic = pTopic;
  pParam->vgId = pVg->vgId;
H
Haojun Liao 已提交
1658
  pParam->requestId = req.reqId;
1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pParam);
    taosMemoryFree(msg);
    return handleErrorBeforePoll(pVg, pTmq);
  }

  sendInfo->msgInfo = (SDataBuf){
      .pData = msg,
      .len = msgSize,
      .handle = NULL,
  };

  sendInfo->requestId = req.reqId;
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqPollCb;
  sendInfo->msgType = TDMT_VND_TMQ_CONSUME;

  int64_t transporterId = 0;
  char    offsetFormatBuf[80];
  tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->currentOffset);

H
Haojun Liao 已提交
1683
  tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64,
1684 1685 1686 1687 1688 1689 1690 1691 1692
           pTmq->consumerId, pTopic->topicName, pVg->vgId, pTmq->epoch, offsetFormatBuf, req.reqId);
  asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);

  pVg->pollCnt++;
  pTmq->pollCnt++;

  return TSDB_CODE_SUCCESS;
}

1693
// broadcast the poll request to all related vnodes
H
Haojun Liao 已提交
1694
static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
1695
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
X
Xiaoyu Wang 已提交
1696
  tscDebug("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics);
1697 1698

  for (int i = 0; i < numOfTopics; i++) {
X
Xiaoyu Wang 已提交
1699
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
X
Xiaoyu Wang 已提交
1700
    int32_t         numOfVg = taosArrayGetSize(pTopic->vgs);
1701 1702

    for (int j = 0; j < numOfVg; j++) {
X
Xiaoyu Wang 已提交
1703 1704
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
1705
      if (vgStatus == TMQ_VG_STATUS__WAIT) {
L
Liu Jicong 已提交
1706
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
H
Haojun Liao 已提交
1707
        tscTrace("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
X
Xiaoyu Wang 已提交
1708
                 pVg->vgId, vgSkipCnt);
X
Xiaoyu Wang 已提交
1709
        continue;
L
temp  
Liu Jicong 已提交
1710 1711 1712 1713
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
1714
        tscDebug("consumer:0x%" PRIx64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId);
L
temp  
Liu Jicong 已提交
1715 1716
        }
#endif
X
Xiaoyu Wang 已提交
1717
      }
1718

L
Liu Jicong 已提交
1719
      atomic_store_32(&pVg->vgSkipCnt, 0);
1720 1721 1722
      int32_t code = doTmqPollImpl(tmq, pTopic, pVg, timeout);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
D
dapan1121 已提交
1723
      }
X
Xiaoyu Wang 已提交
1724 1725
    }
  }
1726

X
Xiaoyu Wang 已提交
1727 1728 1729
  return 0;
}

H
Haojun Liao 已提交
1730
static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
L
Liu Jicong 已提交
1731
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1732
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1733 1734
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1735
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
L
Liu Jicong 已提交
1736
      tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1737
      /*tmqClearUnhandleMsg(tmq);*/
L
Liu Jicong 已提交
1738
      tDeleteSMqAskEpRsp(rspMsg);
X
Xiaoyu Wang 已提交
1739 1740
      *pReset = true;
    } else {
L
Liu Jicong 已提交
1741
      tmqFreeRspWrapper(rspWrapper);
X
Xiaoyu Wang 已提交
1742 1743 1744 1745 1746 1747 1748 1749
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

H
Haojun Liao 已提交
1750
static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
H
Haojun Liao 已提交
1751
  tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems);
1752

X
Xiaoyu Wang 已提交
1753
  while (1) {
L
Liu Jicong 已提交
1754 1755
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
1756

L
Liu Jicong 已提交
1757
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1758
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1759
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
L
Liu Jicong 已提交
1760 1761 1762 1763

      if (rspWrapper == NULL) {
        return NULL;
      }
X
Xiaoyu Wang 已提交
1764 1765
    }

L
Liu Jicong 已提交
1766 1767 1768
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(rspWrapper);
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
H
Haojun Liao 已提交
1769
      tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(terrno));
L
Liu Jicong 已提交
1770 1771
      return NULL;
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1772
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
H
Haojun Liao 已提交
1773

L
Liu Jicong 已提交
1774
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
1775
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1776
      if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1777
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
L
Liu Jicong 已提交
1778
        pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;
X
Xiaoyu Wang 已提交
1779
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
H
Haojun Liao 已提交
1780

L
Liu Jicong 已提交
1781
        if (pollRspWrapper->dataRsp.blockNum == 0) {
H
Haojun Liao 已提交
1782
          tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d", tmq->consumerId, pVg->vgId);
L
Liu Jicong 已提交
1783 1784
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1785 1786
          continue;
        }
H
Haojun Liao 已提交
1787

L
Liu Jicong 已提交
1788
        // build rsp
H
Haojun Liao 已提交
1789 1790
        char buf[80];
        tFormatOffset(buf, 80, &pVg->currentOffset);
L
Liu Jicong 已提交
1791
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
H
Haojun Liao 已提交
1792 1793 1794
        tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d", tmq->consumerId,
                     pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum);

L
Liu Jicong 已提交
1795
        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1796
        return pRsp;
X
Xiaoyu Wang 已提交
1797
      } else {
X
Xiaoyu Wang 已提交
1798
        tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
1799
                 tmq->consumerId, pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1800
        tmqFreeRspWrapper(rspWrapper);
L
Liu Jicong 已提交
1801 1802 1803 1804 1805
        taosFreeQitem(pollRspWrapper);
      }
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
1806 1807 1808

      tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId);

L
Liu Jicong 已提交
1809
      if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1810
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1811
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1812
         * rspMsg->msg.rspOffset);*/
wmmhello's avatar
wmmhello 已提交
1813
        pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset;
L
Liu Jicong 已提交
1814 1815
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // build rsp
L
Liu Jicong 已提交
1816
        SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1817 1818 1819
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
X
Xiaoyu Wang 已提交
1820
        tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
1821
                 tmq->consumerId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1822
        tmqFreeRspWrapper(rspWrapper);
L
Liu Jicong 已提交
1823
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1824
      }
L
Liu Jicong 已提交
1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
      if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
         * rspMsg->msg.rspOffset);*/
        pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset;
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        if (pollRspWrapper->taosxRsp.blockNum == 0) {
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
          continue;
        }
wmmhello's avatar
wmmhello 已提交
1840

L
Liu Jicong 已提交
1841
        // build rsp
wmmhello's avatar
wmmhello 已提交
1842
        void* pRsp = NULL;
L
Liu Jicong 已提交
1843
        if (pollRspWrapper->taosxRsp.createTableNum == 0) {
wmmhello's avatar
wmmhello 已提交
1844
          pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1845
        } else {
wmmhello's avatar
wmmhello 已提交
1846 1847
          pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper);
        }
L
Liu Jicong 已提交
1848 1849 1850
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
X
Xiaoyu Wang 已提交
1851
        tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
1852
                 tmq->consumerId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1853
        tmqFreeRspWrapper(rspWrapper);
L
Liu Jicong 已提交
1854 1855
        taosFreeQitem(pollRspWrapper);
      }
X
Xiaoyu Wang 已提交
1856 1857
    } else {
      bool reset = false;
L
Liu Jicong 已提交
1858 1859
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1860
      if (pollIfReset && reset) {
1861
        tscDebug("consumer:0x%" PRIx64 ", reset and repoll", tmq->consumerId);
1862
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
1863 1864 1865 1866 1867
      }
    }
  }
}

1868
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1869 1870
  void*   rspObj;
  int64_t startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1871

1872
  tscDebug("consumer:0x%" PRIx64 " start to poll at %" PRId64, tmq->consumerId, startTime);
L
Liu Jicong 已提交
1873

1874 1875 1876
#if 0
  tmqHandleAllDelayedTask(tmq);
  tmqPollImpl(tmq, timeout);
1877
  rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1878 1879
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1880
  }
1881
#endif
X
Xiaoyu Wang 已提交
1882

1883
  // in no topic status, delayed task also need to be processed
L
Liu Jicong 已提交
1884
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1885
    tscDebug("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId);
1886
    taosMsleep(500);  //     sleep for a while
1887 1888 1889
    return NULL;
  }

L
Liu Jicong 已提交
1890 1891 1892
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
    int32_t retryCnt = 0;
    while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
H
Haojun Liao 已提交
1893
      if (retryCnt++ > 40) {
L
Liu Jicong 已提交
1894 1895
        return NULL;
      }
1896

H
Haojun Liao 已提交
1897
      tscDebug("consumer:0x%" PRIx64 " not ready, retry:%d/40 in 500ms", tmq->consumerId, retryCnt);
L
Liu Jicong 已提交
1898 1899 1900 1901
      taosMsleep(500);
    }
  }

X
Xiaoyu Wang 已提交
1902
  while (1) {
L
Liu Jicong 已提交
1903
    tmqHandleAllDelayedTask(tmq);
1904

L
Liu Jicong 已提交
1905
    if (tmqPollImpl(tmq, timeout) < 0) {
1906
      tscDebug("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId);
L
Liu Jicong 已提交
1907 1908
      /*return NULL;*/
    }
L
Liu Jicong 已提交
1909

1910
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1911
    if (rspObj) {
1912
      tscDebug("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj);
L
Liu Jicong 已提交
1913
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
1914
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
1915
      tscDebug("consumer:0x%" PRIx64 " return null since no committed offset", tmq->consumerId);
L
Liu Jicong 已提交
1916
      return NULL;
X
Xiaoyu Wang 已提交
1917
    }
1918

1919
    if (timeout != -1) {
L
Liu Jicong 已提交
1920
      int64_t currentTime = taosGetTimestampMs();
1921 1922 1923
      int64_t elapsedTime = currentTime - startTime;
      if (elapsedTime > timeout) {
        tscDebug("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
L
Liu Jicong 已提交
1924
                 tmq->consumerId, tmq->epoch, startTime, currentTime);
X
Xiaoyu Wang 已提交
1925 1926
        return NULL;
      }
1927
      /*tscInfo("consumer:0x%" PRIx64 ", (epoch %d) wait, start time %" PRId64 ", current time %" PRId64*/
L
Liu Jicong 已提交
1928
      /*", left time %" PRId64,*/
1929 1930
      /*tmq->consumerId, tmq->epoch, startTime, currentTime, (timeout - elapsedTime));*/
      tsem_timewait(&tmq->rspSem, (timeout - elapsedTime));
L
Liu Jicong 已提交
1931 1932
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
L
Liu Jicong 已提交
1933
      tsem_timewait(&tmq->rspSem, 1000);
X
Xiaoyu Wang 已提交
1934 1935 1936 1937
    }
  }
}

L
Liu Jicong 已提交
1938
int32_t tmq_consumer_close(tmq_t* tmq) {
1939
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
L
Liu Jicong 已提交
1940 1941
    int32_t rsp = tmq_commit_sync(tmq, NULL);
    if (rsp != 0) {
L
Liu Jicong 已提交
1942
      return rsp;
1943 1944
    }

L
Liu Jicong 已提交
1945
    int32_t     retryCnt = 0;
1946
    tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
1947 1948 1949 1950 1951 1952 1953 1954 1955 1956
    while (1) {
      rsp = tmq_subscribe(tmq, lst);
      if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
        break;
      } else {
        retryCnt++;
        taosMsleep(500);
      }
    }

1957
    tmq_list_destroy(lst);
L
Liu Jicong 已提交
1958
  }
H
Haojun Liao 已提交
1959

1960
  taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
L
Liu Jicong 已提交
1961
  return 0;
1962
}
L
Liu Jicong 已提交
1963

L
Liu Jicong 已提交
1964 1965
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
1966
    return "success";
L
Liu Jicong 已提交
1967
  } else if (err == -1) {
L
Liu Jicong 已提交
1968 1969 1970
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
1971 1972
  }
}
L
Liu Jicong 已提交
1973

L
Liu Jicong 已提交
1974 1975 1976 1977 1978
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
    return TMQ_RES_TABLE_META;
1979 1980
  } else if (TD_RES_TMQ_METADATA(res)) {
    return TMQ_RES_METADATA;
L
Liu Jicong 已提交
1981 1982 1983 1984 1985
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
1986
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
1987 1988
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
1989
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1990 1991 1992
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
1993 1994 1995
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1996 1997 1998 1999 2000
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
2001 2002 2003 2004
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
2005 2006 2007
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
2008 2009 2010
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
2011 2012 2013 2014 2015
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
2016 2017 2018 2019
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
2020 2021 2022
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
2023
  } else if (TD_RES_TMQ_METADATA(res)) {
2024 2025
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
2026 2027 2028 2029
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
2030 2031 2032 2033 2034 2035 2036 2037

const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
2038
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
2039 2040
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
L
Liu Jicong 已提交
2041 2042 2043
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
2044
    }
L
Liu Jicong 已提交
2045 2046
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
  }
L
Liu Jicong 已提交
2047 2048
  return NULL;
}
2049

L
Liu Jicong 已提交
2050
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
2051
  tmqCommitInner(tmq, msg, 0, 1, cb, param);
L
Liu Jicong 已提交
2052 2053
}

2054
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) {
L
Liu Jicong 已提交
2055
  return tmqCommitInner(tmq, msg, 0, 0, NULL, NULL);
2056
}
2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161

int32_t tmqAskEp(tmq_t* tmq, bool async) {
  int32_t code = TSDB_CODE_SUCCESS;
#if 0
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
  if (epStatus == 1) {
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
    tscTrace("consumer:0x%" PRIx64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
    if (epSkipCnt < 5000) return 0;
  }
  atomic_store_32(&tmq->epSkipCnt, 0);
#endif

  SMqAskEpReq req = {0};
  req.consumerId = tmq->consumerId;
  req.epoch = tmq->epoch;
  strcpy(req.cgroup, tmq->groupId);

  int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
  if (tlen < 0) {
    tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", tmq->consumerId);
    return -1;
  }

  void* pReq = taosMemoryCalloc(1, tlen);
  if (pReq == NULL) {
    tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", tmq->consumerId, tlen);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
    tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", tmq->consumerId, tlen);
    taosMemoryFree(pReq);
    return -1;
  }

  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
  if (pParam == NULL) {
    tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", tmq->consumerId);
    taosMemoryFree(pReq);
    return -1;
  }

  pParam->refId = tmq->refId;
  pParam->epoch = tmq->epoch;
  pParam->async = async;
  tsem_init(&pParam->rspSem, 0, 0);

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
    taosMemoryFree(pReq);
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
      .pData = pReq,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
  sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
  tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, async:%d, reqId:0x%" PRIx64, tmq->consumerId, async,
           sendInfo->requestId);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

  if (!async) {
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }

  return code;
}

int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParamSet->refId);
  if (tmq == NULL) {
    if (!pParamSet->async) {
      tsem_destroy(&pParamSet->rspSem);
    }
    taosMemoryFree(pParamSet);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  // if no more waiting rsp
  if (pParamSet->async) {
    // call async cb func
    if (pParamSet->automatic && tmq->commitCb) {
      tmq->commitCb(tmq, pParamSet->rspErr, tmq->commitCbUserParam);
2162
    } else if (!pParamSet->automatic && pParamSet->userCb) { // sem post
2163 2164
      pParamSet->userCb(tmq, pParamSet->rspErr, pParamSet->userParam);
    }
2165

2166 2167 2168 2169 2170 2171 2172 2173 2174 2175
    taosMemoryFree(pParamSet);
  } else {
    tsem_post(&pParamSet->rspSem);
  }

#if 0
  taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
#endif
  return 0;
2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186
}

void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) {
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
  tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId,
           waitingRspNum);

  if (waitingRspNum == 0) {
    tmqCommitDone(pParamSet);
  }
}