clientTmq.c 55.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "cJSON.h"
17 18 19
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22 23
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
24
#include "tqueue.h"
25
#include "tref.h"
L
Liu Jicong 已提交
26 27
#include "ttimer.h"

L
Liu Jicong 已提交
28 29 30
int32_t tmqAskEp(tmq_t* tmq, bool async);

typedef struct {
31 32 33
  int8_t  inited;
  tmr_h   timer;
  int32_t rsetId;
L
Liu Jicong 已提交
34 35 36
} SMqMgmt;

static SMqMgmt tmqMgmt = {0};
37

L
Liu Jicong 已提交
38 39 40 41 42 43
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
44 45 46
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
47 48
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
49
struct tmq_list_t {
L
Liu Jicong 已提交
50
  SArray container;
L
Liu Jicong 已提交
51
};
L
Liu Jicong 已提交
52

L
Liu Jicong 已提交
53
struct tmq_conf_t {
54 55 56 57 58
  char    clientId[256];
  char    groupId[TSDB_CGROUP_LEN];
  int8_t  autoCommit;
  int8_t  resetOffset;
  int8_t  withTbName;
L
Liu Jicong 已提交
59 60
  int8_t  snapEnable;
  int32_t snapBatchSize;
61 62 63

  bool hbBgEnable;

64 65 66 67 68
  uint16_t       port;
  int32_t        autoCommitInterval;
  char*          ip;
  char*          user;
  char*          pass;
69
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
70
  void*          commitCbUserParam;
L
Liu Jicong 已提交
71 72 73
};

struct tmq_t {
74
  int64_t refId;
L
Liu Jicong 已提交
75
  // conf
76 77 78 79 80 81 82 83 84 85 86
  char    groupId[TSDB_CGROUP_LEN];
  char    clientId[256];
  int8_t  withTbName;
  int8_t  useSnapshot;
  int8_t  autoCommit;
  int32_t autoCommitInterval;
  int32_t resetOffsetCfg;
  int64_t consumerId;

  bool hbBgEnable;

L
Liu Jicong 已提交
87 88
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
89 90 91 92

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
93 94
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
95
  int32_t epSkipCnt;
L
Liu Jicong 已提交
96
#endif
L
Liu Jicong 已提交
97 98
  int64_t pollCnt;

L
Liu Jicong 已提交
99
  // timer
100 101
  tmr_h hbLiveTimer;
  tmr_h epTimer;
L
Liu Jicong 已提交
102 103 104
  tmr_h reportTimer;
  tmr_h commitTimer;

L
Liu Jicong 已提交
105 106 107 108
  // connection
  STscObj* pTscObj;

  // container
L
Liu Jicong 已提交
109
  SArray*     clientTopics;  // SArray<SMqClientTopic>
L
Liu Jicong 已提交
110
  STaosQueue* mqueue;        // queue of rsp
L
Liu Jicong 已提交
111
  STaosQall*  qall;
L
Liu Jicong 已提交
112 113 114 115
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit

  // ctl
  tsem_t rspSem;
L
Liu Jicong 已提交
116 117
};

X
Xiaoyu Wang 已提交
118 119 120 121 122 123 124 125
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
126
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
127
  TMQ_CONSUMER_STATUS__RECOVER,
L
Liu Jicong 已提交
128 129
};

L
Liu Jicong 已提交
130
enum {
131
  TMQ_DELAYED_TASK__ASK_EP = 1,
L
Liu Jicong 已提交
132 133 134 135
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

L
Liu Jicong 已提交
136
typedef struct {
137 138 139
  // statistics
  int64_t pollCnt;
  // offset
L
Liu Jicong 已提交
140 141
  STqOffsetVal committedOffset;
  STqOffsetVal currentOffset;
L
Liu Jicong 已提交
142
  // connection info
143
  int32_t vgId;
X
Xiaoyu Wang 已提交
144
  int32_t vgStatus;
L
Liu Jicong 已提交
145
  int32_t vgSkipCnt;
146 147 148
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
149
typedef struct {
150
  // subscribe info
151 152
  char topicName[TSDB_TOPIC_FNAME_LEN];
  char db[TSDB_DB_FNAME_LEN];
L
Liu Jicong 已提交
153 154 155

  SArray* vgs;  // SArray<SMqClientVg>

L
Liu Jicong 已提交
156
  SSchemaWrapper schema;
157 158
} SMqClientTopic;

L
Liu Jicong 已提交
159 160 161 162 163
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
164
  union {
L
Liu Jicong 已提交
165 166
    SMqDataRsp dataRsp;
    SMqMetaRsp metaRsp;
L
Liu Jicong 已提交
167
    STaosxRsp  taosxRsp;
L
Liu Jicong 已提交
168
  };
L
Liu Jicong 已提交
169 170
} SMqPollRspWrapper;

L
Liu Jicong 已提交
171
typedef struct {
172 173
  int64_t refId;
  int32_t epoch;
L
Liu Jicong 已提交
174 175
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
176
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
177

L
Liu Jicong 已提交
178
typedef struct {
179 180
  int64_t refId;
  int32_t epoch;
L
Liu Jicong 已提交
181
  int32_t code;
L
Liu Jicong 已提交
182
  int32_t async;
X
Xiaoyu Wang 已提交
183
  tsem_t  rspSem;
184 185
} SMqAskEpCbParam;

L
Liu Jicong 已提交
186
typedef struct {
187 188
  int64_t         refId;
  int32_t         epoch;
L
Liu Jicong 已提交
189
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
190
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
191
  int32_t         vgId;
L
Liu Jicong 已提交
192
  tsem_t          rspSem;
X
Xiaoyu Wang 已提交
193
} SMqPollCbParam;
194

195
typedef struct {
196 197
  int64_t        refId;
  int32_t        epoch;
L
Liu Jicong 已提交
198 199
  int8_t         automatic;
  int8_t         async;
L
Liu Jicong 已提交
200 201
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
L
Liu Jicong 已提交
202
  int32_t        rspErr;
203
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
204 205 206 207
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
  void*  userParam;
  tsem_t rspSem;
208 209 210 211 212
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
  STqOffset*           pOffset;
213
} SMqCommitCbParam;
214

215
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
216
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
217
  conf->withTbName = false;
L
Liu Jicong 已提交
218
  conf->autoCommit = true;
L
Liu Jicong 已提交
219
  conf->autoCommitInterval = 5000;
X
Xiaoyu Wang 已提交
220
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
221
  conf->hbBgEnable = true;
222 223 224
  return conf;
}

L
Liu Jicong 已提交
225
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
226 227 228 229 230 231
  if (conf) {
    if (conf->ip) taosMemoryFree(conf->ip);
    if (conf->user) taosMemoryFree(conf->user);
    if (conf->pass) taosMemoryFree(conf->pass);
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
232 233 234
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
235 236
  if (strcmp(key, "group.id") == 0) {
    strcpy(conf->groupId, value);
L
Liu Jicong 已提交
237
    return TMQ_CONF_OK;
238
  }
L
Liu Jicong 已提交
239

240 241
  if (strcmp(key, "client.id") == 0) {
    strcpy(conf->clientId, value);
L
Liu Jicong 已提交
242 243
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
244

L
Liu Jicong 已提交
245 246
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
247
      conf->autoCommit = true;
L
Liu Jicong 已提交
248 249
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
250
      conf->autoCommit = false;
L
Liu Jicong 已提交
251 252 253 254
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
255
  }
L
Liu Jicong 已提交
256

L
Liu Jicong 已提交
257 258 259 260 261
  if (strcmp(key, "auto.commit.interval.ms") == 0) {
    conf->autoCommitInterval = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
262 263 264 265 266 267 268 269 270 271 272 273 274 275
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
276

277 278
  if (strcmp(key, "msg.with.table.name") == 0) {
    if (strcmp(value, "true") == 0) {
279
      conf->withTbName = true;
L
Liu Jicong 已提交
280
      return TMQ_CONF_OK;
281
    } else if (strcmp(value, "false") == 0) {
282
      conf->withTbName = false;
L
Liu Jicong 已提交
283
      return TMQ_CONF_OK;
284 285 286 287 288
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
289
  if (strcmp(key, "experimental.snapshot.enable") == 0) {
L
Liu Jicong 已提交
290
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
291
      conf->snapEnable = true;
292 293
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
294
      conf->snapEnable = false;
295 296 297 298 299 300
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
301 302 303 304 305
  if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
    conf->snapBatchSize = atoi(value);
    return TMQ_CONF_OK;
  }

306 307 308
  if (strcmp(key, "enable.heartbeat.background") == 0) {
    if (strcmp(value, "true") == 0) {
      conf->hbBgEnable = true;
L
Liu Jicong 已提交
309 310
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
311
      conf->hbBgEnable = false;
L
Liu Jicong 已提交
312 313 314 315
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
316
    return TMQ_CONF_OK;
L
Liu Jicong 已提交
317 318
  }

L
Liu Jicong 已提交
319
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
320 321 322
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
323
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
324 325 326
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
327
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
328 329 330
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
331
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
332 333 334
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
335
  if (strcmp(key, "td.connect.db") == 0) {
336
    /*conf->db = strdup(value);*/
L
Liu Jicong 已提交
337 338 339
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
340
  return TMQ_CONF_UNKNOWN;
341 342 343
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
344 345
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
346 347
}

L
Liu Jicong 已提交
348 349
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
350 351 352 353 354
  if (src == NULL || src[0] == 0) return -1;
  char* topic = strdup(src);
  if (topic[0] != '`') {
    strtolower(topic, src);
  }
L
fix  
Liu Jicong 已提交
355
  if (taosArrayPush(container, &topic) == NULL) return -1;
356 357 358
  return 0;
}

L
Liu Jicong 已提交
359
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
360
  SArray* container = &list->container;
L
Liu Jicong 已提交
361
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
362 363
}

L
Liu Jicong 已提交
364 365 366 367 368 369 370 371 372 373
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

L
Liu Jicong 已提交
374 375 376 377
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409
int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParamSet->refId);
  if (tmq == NULL) {
    if (!pParamSet->async) {
      tsem_destroy(&pParamSet->rspSem);
    }
    taosMemoryFree(pParamSet);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  // if no more waiting rsp
  if (pParamSet->async) {
    // call async cb func
    if (pParamSet->automatic && tmq->commitCb) {
      tmq->commitCb(tmq, pParamSet->rspErr, tmq->commitCbUserParam);
    } else if (!pParamSet->automatic && pParamSet->userCb) {
      // sem post
      pParamSet->userCb(tmq, pParamSet->rspErr, pParamSet->userParam);
    }
    taosMemoryFree(pParamSet);
  } else {
    tsem_post(&pParamSet->rspSem);
  }

#if 0
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
#endif
  return 0;
}

410 411
int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
412 413
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
  // push into array
L
Liu Jicong 已提交
414
#if 0
415 416 417 418 419
  if (code == 0) {
    taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset);
  } else {
    taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
  }
L
Liu Jicong 已提交
420
#endif
L
Liu Jicong 已提交
421

L
Liu Jicong 已提交
422 423 424
  taosMemoryFree(pParam->pOffset);
  if (pBuf->pData) taosMemoryFree(pBuf->pData);

S
Shengliang Guan 已提交
425
  /*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
L
Liu Jicong 已提交
426 427
   * pOffset->version);*/

428
  // count down waiting rsp
L
Liu Jicong 已提交
429
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
430 431 432
  ASSERT(waitingRspNum >= 0);

  if (waitingRspNum == 0) {
433
    tmqCommitDone(pParamSet);
434 435 436 437
  }
  return 0;
}

L
Liu Jicong 已提交
438 439 440 441 442 443
static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pTopic, SMqCommitCbParamSet* pParamSet) {
  STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
  if (pOffset == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
444
  pOffset->val = pVg->currentOffset;
445

L
Liu Jicong 已提交
446 447 448 449
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pOffset->subKey, tmq->groupId, groupLen);
  pOffset->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);
L
Liu Jicong 已提交
450

L
Liu Jicong 已提交
451 452 453 454 455 456 457 458 459 460
  int32_t len;
  int32_t code;
  tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
  if (code < 0) {
    ASSERT(0);
    return -1;
  }
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
  if (buf == NULL) return -1;
  ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
461

L
Liu Jicong 已提交
462 463 464 465 466 467 468
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
  tEncodeSTqOffset(&encoder, pOffset);

  // build param
469
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
470 471 472 473 474 475 476 477 478 479 480 481 482 483
  pParam->params = pParamSet;
  pParam->pOffset = pOffset;

  // build send info
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
    return -1;
  }
  pMsgSendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = sizeof(SMsgHead) + len,
      .handle = NULL,
  };

S
Shengliang Guan 已提交
484 485
  tscDebug("consumer:%" PRId64 ", commit offset of %s on vgId:%d, offset is %" PRId64, tmq->consumerId, pOffset->subKey,
           pVg->vgId, pOffset->val.version);
L
Liu Jicong 已提交
486 487

  // TODO: put into cb
L
Liu Jicong 已提交
488
  pVg->committedOffset = pVg->currentOffset;
L
Liu Jicong 已提交
489 490 491 492

  pMsgSendInfo->requestId = generateRequestId();
  pMsgSendInfo->requestObjRefId = 0;
  pMsgSendInfo->param = pParam;
L
Liu Jicong 已提交
493
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
494
  pMsgSendInfo->fp = tmqCommitCb;
L
Liu Jicong 已提交
495 496 497
  pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
  // send msg

L
Liu Jicong 已提交
498 499 500
  atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
  atomic_add_fetch_32(&pParamSet->totalRspNum, 1);

L
Liu Jicong 已提交
501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
  return 0;
}

int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
  char*   topic;
  int32_t vgId;
  ASSERT(msg != NULL);
  if (TD_RES_TMQ(msg)) {
    SMqRspObj* pRspObj = (SMqRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
  } else if (TD_RES_TMQ_META(msg)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
    topic = pMetaRspObj->topic;
    vgId = pMetaRspObj->vgId;
L
Liu Jicong 已提交
518
  } else if (TD_RES_TMQ_METADATA(msg)) {
519 520 521
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
L
Liu Jicong 已提交
522 523 524 525 526 527 528 529 530
  } else {
    return TSDB_CODE_TMQ_INVALID_MSG;
  }

  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
531 532
  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;
L
Liu Jicong 已提交
533 534 535 536 537 538
  pParamSet->automatic = 0;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

L
Liu Jicong 已提交
539 540
  int32_t code = -1;

L
Liu Jicong 已提交
541 542 543 544 545 546 547
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(pTopic->topicName, topic) != 0) continue;
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      if (pVg->vgId != vgId) continue;

L
Liu Jicong 已提交
548
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
L
Liu Jicong 已提交
549 550
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          goto FAIL;
L
Liu Jicong 已提交
551
        }
L
Liu Jicong 已提交
552
        goto HANDLE_RSP;
L
Liu Jicong 已提交
553 554
      }
    }
L
Liu Jicong 已提交
555
  }
L
Liu Jicong 已提交
556

L
Liu Jicong 已提交
557 558 559 560
HANDLE_RSP:
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
561 562 563
    return 0;
  }

L
Liu Jicong 已提交
564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
    return code;
  } else {
    code = 0;
  }

FAIL:
  if (code != 0 && async) {
    userCb(tmq, code, userParam);
  }
  return 0;
}

L
Liu Jicong 已提交
580 581
int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                       void* userParam) {
L
Liu Jicong 已提交
582 583 584 585 586 587
  int32_t code = -1;

  if (msg != NULL) {
    return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
  }

588 589 590 591 592
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
593 594 595 596

  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;

597 598 599 600 601 602
  pParamSet->automatic = automatic;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

603 604 605
  // init as 1 to prevent concurrency issue
  pParamSet->waitingRspNum = 1;

606 607
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
608

S
Shengliang Guan 已提交
609
    tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgNum %d", tmq->consumerId, pTopic->topicName,
L
Liu Jicong 已提交
610 611
             (int32_t)taosArrayGetSize(pTopic->vgs));

612
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
L
Liu Jicong 已提交
613 614
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);

S
Shengliang Guan 已提交
615 616
      tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgId:%d", tmq->consumerId, pTopic->topicName,
               pVg->vgId);
L
Liu Jicong 已提交
617

L
Liu Jicong 已提交
618 619 620
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
        tscDebug("consumer: %ld, vg:%d, current %ld, committed %ld", tmq->consumerId, pVg->vgId,
                 pVg->currentOffset.version, pVg->committedOffset.version);
L
Liu Jicong 已提交
621 622 623
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          continue;
        }
624 625 626 627
      }
    }
  }

L
Liu Jicong 已提交
628 629 630 631 632 633
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
    return 0;
  }

634 635 636 637 638 639
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
  ASSERT(waitingRspNum >= 0);
  if (waitingRspNum == 0) {
    tmqCommitDone(pParamSet);
  }

640 641 642 643
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
644
    taosMemoryFree(pParamSet);
645 646 647 648 649 650
  } else {
    code = 0;
  }

  if (code != 0 && async) {
    if (automatic) {
L
Liu Jicong 已提交
651
      tmq->commitCb(tmq, code, tmq->commitCbUserParam);
652
    } else {
L
Liu Jicong 已提交
653
      userCb(tmq, code, userParam);
654 655 656
    }
  }

L
Liu Jicong 已提交
657
#if 0
658 659 660 661
  if (!async) {
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
  }
L
Liu Jicong 已提交
662
#endif
663 664 665 666

  return 0;
}

667
void tmqAssignAskEpTask(void* param, void* tmrId) {
668 669 670 671 672 673 674 675 676
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
    *pTaskType = TMQ_DELAYED_TASK__ASK_EP;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
677 678 679
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
680 681 682 683 684 685 686 687 688
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
    *pTaskType = TMQ_DELAYED_TASK__COMMIT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
689 690 691
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
692 693 694 695 696 697 698 699 700
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
    *pTaskType = TMQ_DELAYED_TASK__REPORT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
701 702
}

703 704 705 706 707 708
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
  if (pMsg && pMsg->pData) taosMemoryFree(pMsg->pData);
  return 0;
}

void tmqSendHbReq(void* param, void* tmrId) {
709 710 711
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq == NULL) {
L
Liu Jicong 已提交
712
    taosMemoryFree(param);
713 714
    return;
  }
715 716 717 718
  int64_t   consumerId = tmq->consumerId;
  int32_t   epoch = tmq->epoch;
  SMqHbReq* pReq = taosMemoryMalloc(sizeof(SMqHbReq));
  if (pReq == NULL) goto OVER;
L
Liu Jicong 已提交
719
  pReq->consumerId = htobe64(consumerId);
720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743
  pReq->epoch = epoch;

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pReq);
  }
  sendInfo->msgInfo = (SDataBuf){
      .pData = pReq,
      .len = sizeof(SMqHbReq),
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = NULL;
  sendInfo->fp = tmqHbCb;
  sendInfo->msgType = TDMT_MND_MQ_HB;

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

OVER:
744
  taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer);
745 746
}

L
Liu Jicong 已提交
747 748 749 750 751 752 753 754
int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
  STaosQall* qall = taosAllocateQall();
  taosReadAllQitems(tmq->delayedTask, qall);
  while (1) {
    int8_t* pTaskType = NULL;
    taosGetQitem(qall, (void**)&pTaskType);
    if (pTaskType == NULL) break;

755
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
L
Liu Jicong 已提交
756
      tmqAskEp(tmq, true);
757 758 759 760 761

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
      *pRefId = tmq->refId;

      taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &tmq->epTimer);
L
Liu Jicong 已提交
762
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
L
Liu Jicong 已提交
763
      tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
764 765 766 767 768

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
      *pRefId = tmq->refId;

      taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId, tmqMgmt.timer, &tmq->commitTimer);
L
Liu Jicong 已提交
769 770 771 772
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    } else {
      ASSERT(0);
    }
L
Liu Jicong 已提交
773
    taosFreeQitem(pTaskType);
L
Liu Jicong 已提交
774 775 776 777 778
  }
  taosFreeQall(qall);
  return 0;
}

L
Liu Jicong 已提交
779
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
780
  SMqRspWrapper* msg = NULL;
L
Liu Jicong 已提交
781 782 783 784 785 786 787 788
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }

L
fix  
Liu Jicong 已提交
789
  msg = NULL;
L
Liu Jicong 已提交
790 791 792 793 794 795 796 797 798 799
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }
}

D
dapan1121 已提交
800
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
801 802 803 804 805
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
  tsem_post(&pParam->rspSem);
  return 0;
}
806

L
Liu Jicong 已提交
807
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
X
Xiaoyu Wang 已提交
808 809 810 811
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
812
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
813
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
814
  }
L
Liu Jicong 已提交
815
  return 0;
X
Xiaoyu Wang 已提交
816 817
}

L
Liu Jicong 已提交
818
int32_t tmq_unsubscribe(tmq_t* tmq) {
L
Liu Jicong 已提交
819 820
  int32_t     rsp;
  int32_t     retryCnt = 0;
L
Liu Jicong 已提交
821
  tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
822 823 824 825 826 827 828 829 830 831
  while (1) {
    rsp = tmq_subscribe(tmq, lst);
    if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
      break;
    } else {
      retryCnt++;
      taosMsleep(500);
    }
  }

L
Liu Jicong 已提交
832 833
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
834 835
}

836 837
void tmqFreeImpl(void* handle) {
  tmq_t* tmq = (tmq_t*)handle;
L
Liu Jicong 已提交
838

839 840 841 842
  // TODO stop timer
  if (tmq->mqueue) taosCloseQueue(tmq->mqueue);
  if (tmq->delayedTask) taosCloseQueue(tmq->delayedTask);
  if (tmq->qall) taosFreeQall(tmq->qall);
L
Liu Jicong 已提交
843

844
  tsem_destroy(&tmq->rspSem);
L
Liu Jicong 已提交
845

846 847 848
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
849
    if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
850 851 852 853 854 855
    int32_t vgSz = taosArrayGetSize(pTopic->vgs);
    taosArrayDestroy(pTopic->vgs);
  }
  taosArrayDestroy(tmq->clientTopics);
  taos_close_internal(tmq->pTscObj);
  taosMemoryFree(tmq);
L
Liu Jicong 已提交
856 857
}

L
Liu Jicong 已提交
858
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
L
Liu Jicong 已提交
859 860 861 862 863 864 865 866 867
  // init timer
  int8_t inited = atomic_val_compare_exchange_8(&tmqMgmt.inited, 0, 1);
  if (inited == 0) {
    tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
    if (tmqMgmt.timer == NULL) {
      atomic_store_8(&tmqMgmt.inited, 0);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
868
    tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
L
Liu Jicong 已提交
869 870
  }

L
Liu Jicong 已提交
871 872
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
L
Liu Jicong 已提交
873
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
874 875
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
876 877
    return NULL;
  }
L
Liu Jicong 已提交
878

L
Liu Jicong 已提交
879 880 881 882 883
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

  ASSERT(user);
  ASSERT(pass);
L
Liu Jicong 已提交
884
  ASSERT(conf->groupId[0]);
L
Liu Jicong 已提交
885

L
Liu Jicong 已提交
886 887 888 889 890 891
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
  pTmq->delayedTask = taosOpenQueue();

  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
L
Liu Jicong 已提交
892
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
893 894
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
895 896
    goto FAIL;
  }
L
Liu Jicong 已提交
897

L
Liu Jicong 已提交
898 899
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
900 901
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
902 903
  /*pTmq->epStatus = 0;*/
  /*pTmq->epSkipCnt = 0;*/
L
Liu Jicong 已提交
904

L
Liu Jicong 已提交
905 906 907
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
908
  pTmq->withTbName = conf->withTbName;
L
Liu Jicong 已提交
909
  pTmq->useSnapshot = conf->snapEnable;
L
Liu Jicong 已提交
910
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
911
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
912 913
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
914 915
  pTmq->resetOffsetCfg = conf->resetOffset;

916 917
  pTmq->hbBgEnable = conf->hbBgEnable;

L
Liu Jicong 已提交
918
  // assign consumerId
L
Liu Jicong 已提交
919
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
920

L
Liu Jicong 已提交
921 922
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
S
Shengliang Guan 已提交
923 924
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
925 926
    goto FAIL;
  }
L
Liu Jicong 已提交
927

L
Liu Jicong 已提交
928 929 930
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
S
Shengliang Guan 已提交
931 932
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
933 934 935
    tsem_destroy(&pTmq->rspSem);
    goto FAIL;
  }
L
Liu Jicong 已提交
936

937 938 939 940 941 942
  pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
  if (pTmq->refId < 0) {
    tmqFreeImpl(pTmq);
    return NULL;
  }

943
  if (pTmq->hbBgEnable) {
L
Liu Jicong 已提交
944 945
    int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
    *pRefId = pTmq->refId;
946
    pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
947 948
  }

S
Shengliang Guan 已提交
949
  tscInfo("consumer %" PRId64 " is setup, consumer group %s", pTmq->consumerId, pTmq->groupId);
L
Liu Jicong 已提交
950

951
  return pTmq;
L
Liu Jicong 已提交
952 953 954 955 956 957 958 959

FAIL:
  if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
  if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
  if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
  if (pTmq->qall) taosFreeQall(pTmq->qall);
  taosMemoryFree(pTmq);
  return NULL;
960 961
}

L
Liu Jicong 已提交
962
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
L
Liu Jicong 已提交
963 964 965 966 967
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
  SCMSubscribeReq req = {0};
  int32_t         code = -1;
968

L
Liu Jicong 已提交
969 970
  tscDebug("call tmq subscribe, consumer: %ld, topic num %d", tmq->consumerId, sz);

971
  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
972
  tstrncpy(req.clientId, tmq->clientId, 256);
L
Liu Jicong 已提交
973
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
974
  req.topicNames = taosArrayInit(sz, sizeof(void*));
L
Liu Jicong 已提交
975
  if (req.topicNames == NULL) goto FAIL;
976

L
Liu Jicong 已提交
977 978
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
979 980

    SName name = {0};
L
Liu Jicong 已提交
981
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
982

L
Liu Jicong 已提交
983 984 985
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
986
    }
L
Liu Jicong 已提交
987
    tNameExtractFullName(&name, topicFName);
988

L
Liu Jicong 已提交
989 990 991
    tscDebug("subscribe topic: %s", topicFName);

    taosArrayPush(req.topicNames, &topicFName);
992 993
  }

L
Liu Jicong 已提交
994 995 996 997
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) goto FAIL;

998 999 1000
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

1001
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1002
  if (sendInfo == NULL) goto FAIL;
1003

X
Xiaoyu Wang 已提交
1004
  SMqSubscribeCbParam param = {
L
Liu Jicong 已提交
1005
      .rspErr = 0,
1006 1007
      .refId = tmq->refId,
      .epoch = tmq->epoch,
X
Xiaoyu Wang 已提交
1008
  };
L
Liu Jicong 已提交
1009

L
Liu Jicong 已提交
1010 1011 1012
  if (tsem_init(&param.rspSem, 0, 0) != 0) goto FAIL;

  sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1013 1014 1015 1016
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
1017

L
Liu Jicong 已提交
1018 1019
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1020 1021
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
1022 1023
  sendInfo->msgType = TDMT_MND_SUBSCRIBE;

1024 1025 1026 1027 1028
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
1029 1030 1031
  // avoid double free if msg is sent
  buf = NULL;

L
Liu Jicong 已提交
1032 1033
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
1034

L
Liu Jicong 已提交
1035 1036 1037
  code = param.rspErr;
  if (code != 0) goto FAIL;

L
Liu Jicong 已提交
1038
  int32_t retryCnt = 0;
L
Liu Jicong 已提交
1039
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
L
Liu Jicong 已提交
1040 1041 1042
    if (retryCnt++ > 10) {
      goto FAIL;
    }
L
fix  
Liu Jicong 已提交
1043
    tscDebug("consumer not ready, retry");
L
Liu Jicong 已提交
1044 1045
    taosMsleep(500);
  }
1046

1047 1048
  // init ep timer
  if (tmq->epTimer == NULL) {
1049 1050 1051
    int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId1 = tmq->refId;
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, pRefId1, tmqMgmt.timer);
1052
  }
L
Liu Jicong 已提交
1053 1054

  // init auto commit timer
1055
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
1056 1057 1058
    int64_t* pRefId2 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId2 = tmq->refId;
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId2, tmqMgmt.timer);
L
Liu Jicong 已提交
1059 1060
  }

L
Liu Jicong 已提交
1061 1062 1063
  code = 0;
FAIL:
  if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
1064
  if (code != 0 && buf) {
L
Liu Jicong 已提交
1065 1066 1067
    taosMemoryFree(buf);
  }
  return code;
1068 1069
}

L
Liu Jicong 已提交
1070
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
1071
  //
1072
  conf->commitCb = cb;
L
Liu Jicong 已提交
1073
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
1074
}
1075

D
dapan1121 已提交
1076
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1077 1078
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
1079
  SMqClientTopic* pTopic = pParam->pTopic;
1080 1081 1082 1083 1084 1085

  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
  if (tmq == NULL) {
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1086
    taosMemoryFree(pMsg->pEpSet);
1087 1088 1089 1090 1091 1092
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  int32_t epoch = pParam->epoch;
  int32_t vgId = pParam->vgId;
L
Liu Jicong 已提交
1093
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
1094
  if (code != 0) {
L
Liu Jicong 已提交
1095
    tscWarn("msg discard from vgId:%d, epoch %d, since %s", vgId, epoch, terrstr());
L
Liu Jicong 已提交
1096
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1097 1098 1099 1100
    if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
      goto CREATE_MSG_FAIL;
    }
L
Liu Jicong 已提交
1101 1102 1103 1104
    if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
      if (pRspWrapper == NULL) {
        taosMemoryFree(pMsg->pData);
S
Shengliang Guan 已提交
1105
        tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
Liu Jicong 已提交
1106 1107 1108 1109 1110 1111 1112 1113
        goto CREATE_MSG_FAIL;
      }
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      /*pRspWrapper->vgHandle = pVg;*/
      /*pRspWrapper->topicHandle = pTopic;*/
      taosWriteQitem(tmq->mqueue, pRspWrapper);
      tsem_post(&tmq->rspSem);
    }
L
fix txn  
Liu Jicong 已提交
1114
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1115 1116
  }

X
Xiaoyu Wang 已提交
1117 1118 1119
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
1120
    // do not write into queue since updating epoch reset
S
Shengliang Guan 已提交
1121
    tscWarn("msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d", vgId, msgEpoch,
L
Liu Jicong 已提交
1122
            tmqEpoch);
1123
    tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1124
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1125
    taosMemoryFree(pMsg->pEpSet);
X
Xiaoyu Wang 已提交
1126 1127 1128 1129
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
S
Shengliang Guan 已提交
1130
    tscWarn("mismatch rsp from vgId:%d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
X
Xiaoyu Wang 已提交
1131 1132
  }

L
Liu Jicong 已提交
1133 1134 1135
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

1136
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1137
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1138
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1139
    taosMemoryFree(pMsg->pEpSet);
S
Shengliang Guan 已提交
1140
    tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
fix txn  
Liu Jicong 已提交
1141
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1142
  }
L
Liu Jicong 已提交
1143

L
Liu Jicong 已提交
1144
  pRspWrapper->tmqRspType = rspType;
L
Liu Jicong 已提交
1145 1146
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
1147

L
Liu Jicong 已提交
1148
  if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1149 1150 1151
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
wmmhello's avatar
wmmhello 已提交
1152
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1153
    memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1154 1155 1156 1157 1158 1159

    tscDebug("consumer:%" PRId64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d",
             tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version,
             rspType);

  } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
1160 1161 1162 1163
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqMetaRsp(&decoder, &pRspWrapper->metaRsp);
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1164
    memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1165 1166 1167 1168 1169 1170 1171 1172
  } else if (rspType == TMQ_MSG_TYPE__TAOSX_RSP) {
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp);
    tDecoderClear(&decoder);
    memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead));
  } else {
    ASSERT(0);
L
Liu Jicong 已提交
1173
  }
L
Liu Jicong 已提交
1174

L
Liu Jicong 已提交
1175
  taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1176
  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
1177

L
Liu Jicong 已提交
1178
  taosWriteQitem(tmq->mqueue, pRspWrapper);
1179
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1180

L
Liu Jicong 已提交
1181
  return 0;
L
fix txn  
Liu Jicong 已提交
1182
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1183
  if (epoch == tmq->epoch) {
L
Liu Jicong 已提交
1184 1185
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
1186
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1187
  return -1;
1188 1189
}

L
Liu Jicong 已提交
1190
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
1191 1192 1193 1194
  bool set = false;

  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
S
Shengliang Guan 已提交
1195
  tscDebug("consumer:%" PRId64 ", update ep epoch %d to epoch %d, topic num:%d", tmq->consumerId, tmq->epoch, epoch,
1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213
           topicNumGet);

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
S
Shengliang Guan 已提交
1214
      tscDebug("consumer:%" PRId64 ", new vg num: %d", tmq->consumerId, vgNumCur);
1215 1216 1217
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
        sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
L
Liu Jicong 已提交
1218
        char buf[80];
L
Liu Jicong 已提交
1219
        tFormatOffset(buf, 80, &pVgCur->currentOffset);
L
Liu Jicong 已提交
1220 1221
        tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch,
                 pVgCur->vgId, vgKey, buf);
L
Liu Jicong 已提交
1222
        taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal));
1223 1224 1225 1226 1227 1228 1229 1230
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
    topic.schema = pTopicEp->schema;
L
Liu Jicong 已提交
1231 1232
    pTopicEp->schema.nCols = 0;
    pTopicEp->schema.pSchema = NULL;
1233
    tstrncpy(topic.topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
1234 1235
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);

S
Shengliang Guan 已提交
1236
    tscDebug("consumer:%" PRId64 ", update topic: %s", tmq->consumerId, topic.topicName);
1237 1238 1239 1240 1241 1242

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
L
Liu Jicong 已提交
1243 1244
      STqOffsetVal* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      STqOffsetVal  offsetNew = {.type = tmq->resetOffsetCfg};
1245
      if (pOffset != NULL) {
L
Liu Jicong 已提交
1246
        offsetNew = *pOffset;
1247 1248 1249 1250
      }

      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1251
          .currentOffset = offsetNew,
1252 1253 1254 1255 1256 1257 1258 1259 1260 1261
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
          .vgSkipCnt = 0,
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
    taosArrayPush(newTopics, &topic);
  }
1262 1263 1264 1265
  if (tmq->clientTopics) {
    int32_t sz = taosArrayGetSize(tmq->clientTopics);
    for (int32_t i = 0; i < sz; i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
1266
      if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
1267 1268
      int32_t vgSz = taosArrayGetSize(pTopic->vgs);
      taosArrayDestroy(pTopic->vgs);
L
Liu Jicong 已提交
1269
    }
1270
    taosArrayDestroy(tmq->clientTopics);
X
Xiaoyu Wang 已提交
1271
  }
L
Liu Jicong 已提交
1272
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
1273
  tmq->clientTopics = newTopics;
1274

1275 1276 1277 1278
  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
1279

X
Xiaoyu Wang 已提交
1280 1281 1282 1283
  atomic_store_32(&tmq->epoch, epoch);
  return set;
}

D
dapan1121 已提交
1284
int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
1285
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1286
  int8_t           async = pParam->async;
1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299
  tmq_t*           tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);

  if (tmq == NULL) {
    if (!async) {
      tsem_destroy(&pParam->rspSem);
    } else {
      taosMemoryFree(pParam);
    }
    taosMemoryFree(pMsg->pData);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

L
Liu Jicong 已提交
1300
  pParam->code = code;
1301
  if (code != 0) {
L
Liu Jicong 已提交
1302 1303
    tscError("consumer:%" PRId64 ", get topic endpoint error, not ready, wait:%d, code %x", tmq->consumerId,
             pParam->async, code);
L
Liu Jicong 已提交
1304
    goto END;
1305
  }
L
Liu Jicong 已提交
1306

L
Liu Jicong 已提交
1307
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1308
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1309
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1310 1311
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
S
Shengliang Guan 已提交
1312
  tscDebug("consumer:%" PRId64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1313 1314
  if (head->epoch <= epoch) {
    goto END;
1315
  }
L
Liu Jicong 已提交
1316

L
Liu Jicong 已提交
1317
  if (!async) {
L
Liu Jicong 已提交
1318 1319
    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
S
Shengliang Guan 已提交
1320 1321
    /*printf("rsp epoch %" PRId64 " sz %" PRId64 "\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %" PRId64 " sz %" PRId64 "\n", tmq->epoch, tmq->clientTopics->size);*/
L
Liu Jicong 已提交
1322
    tmqUpdateEp(tmq, head->epoch, &rsp);
L
Liu Jicong 已提交
1323
    tDeleteSMqAskEpRsp(&rsp);
X
Xiaoyu Wang 已提交
1324
  } else {
1325
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1326
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1327
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1328 1329
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1330
    }
L
Liu Jicong 已提交
1331 1332 1333
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1334
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1335

L
Liu Jicong 已提交
1336
    taosWriteQitem(tmq->mqueue, pWrapper);
1337
    tsem_post(&tmq->rspSem);
1338
  }
L
Liu Jicong 已提交
1339 1340

END:
L
Liu Jicong 已提交
1341
  /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1342
  if (!async) {
L
Liu Jicong 已提交
1343
    tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
1344 1345
  } else {
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
1346
  }
L
Liu Jicong 已提交
1347
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1348
  return code;
1349 1350
}

L
Liu Jicong 已提交
1351
int32_t tmqAskEp(tmq_t* tmq, bool async) {
L
Liu Jicong 已提交
1352
  int32_t code = 0;
L
Liu Jicong 已提交
1353
#if 0
L
Liu Jicong 已提交
1354
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
L
fix txn  
Liu Jicong 已提交
1355
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
1356
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
S
Shengliang Guan 已提交
1357
    tscTrace("consumer:%" PRId64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
1358
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
1359
  }
L
temp  
Liu Jicong 已提交
1360
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
1361
#endif
L
Liu Jicong 已提交
1362
  int32_t      tlen = sizeof(SMqAskEpReq);
L
Liu Jicong 已提交
1363
  SMqAskEpReq* req = taosMemoryCalloc(1, tlen);
L
Liu Jicong 已提交
1364
  if (req == NULL) {
L
Liu Jicong 已提交
1365
    tscError("failed to malloc get subscribe ep buf");
L
Liu Jicong 已提交
1366
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1367
    return -1;
L
Liu Jicong 已提交
1368
  }
L
Liu Jicong 已提交
1369 1370 1371
  req->consumerId = htobe64(tmq->consumerId);
  req->epoch = htonl(tmq->epoch);
  strcpy(req->cgroup, tmq->groupId);
1372

L
Liu Jicong 已提交
1373
  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
1374 1375
  if (pParam == NULL) {
    tscError("failed to malloc subscribe param");
wafwerar's avatar
wafwerar 已提交
1376
    taosMemoryFree(req);
L
Liu Jicong 已提交
1377
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1378
    return -1;
L
Liu Jicong 已提交
1379
  }
1380 1381
  pParam->refId = tmq->refId;
  pParam->epoch = tmq->epoch;
L
Liu Jicong 已提交
1382
  pParam->async = async;
X
Xiaoyu Wang 已提交
1383
  tsem_init(&pParam->rspSem, 0, 0);
1384

1385
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1386 1387
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1388 1389
    taosMemoryFree(pParam);
    taosMemoryFree(req);
L
Liu Jicong 已提交
1390
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1391 1392 1393 1394 1395 1396 1397 1398 1399 1400
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
      .pData = req,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
L
Liu Jicong 已提交
1401 1402 1403
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1404
  sendInfo->msgType = TDMT_MND_MQ_ASK_EP;
1405

L
Liu Jicong 已提交
1406 1407
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

S
Shengliang Guan 已提交
1408
  tscDebug("consumer:%" PRId64 ", ask ep", tmq->consumerId);
L
add log  
Liu Jicong 已提交
1409

L
Liu Jicong 已提交
1410 1411
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1412

L
Liu Jicong 已提交
1413
  if (!async) {
L
Liu Jicong 已提交
1414 1415 1416 1417 1418
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }
  return code;
1419 1420
}

1421
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1422
  SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq));
1423 1424 1425
  if (pReq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
1426

L
Liu Jicong 已提交
1427 1428 1429
  /*strcpy(pReq->topic, pTopic->topicName);*/
  /*strcpy(pReq->cgroup, tmq->groupId);*/

L
Liu Jicong 已提交
1430 1431 1432 1433
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1434

1435
  pReq->withTbName = tmq->withTbName;
1436
  pReq->timeout = timeout;
L
Liu Jicong 已提交
1437
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1438
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1439
  /*pReq->currentOffset = reqOffset;*/
L
Liu Jicong 已提交
1440
  pReq->reqOffset = pVg->currentOffset;
L
Liu Jicong 已提交
1441
  pReq->reqId = generateRequestId();
1442

L
Liu Jicong 已提交
1443 1444
  pReq->useSnapshot = tmq->useSnapshot;

1445
  pReq->head.vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
1446
  pReq->head.contLen = htonl(sizeof(SMqPollReq));
1447 1448 1449
  return pReq;
}

L
Liu Jicong 已提交
1450 1451
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
L
Liu Jicong 已提交
1452
  pRspObj->resType = RES_TYPE__TMQ_META;
L
Liu Jicong 已提交
1453 1454 1455 1456 1457 1458 1459 1460
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

L
Liu Jicong 已提交
1461 1462 1463
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1464 1465
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1466
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1467
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1468
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
L
Liu Jicong 已提交
1469

L
Liu Jicong 已提交
1470 1471
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
L
Liu Jicong 已提交
1472
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1473 1474
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1475

L
Liu Jicong 已提交
1476
  return pRspObj;
X
Xiaoyu Wang 已提交
1477 1478
}

L
Liu Jicong 已提交
1479 1480
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
1481
  pRspObj->resType = RES_TYPE__TMQ_METADATA;
L
Liu Jicong 已提交
1482 1483 1484 1485
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;
  pRspObj->resIter = -1;
1486
  memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp));
L
Liu Jicong 已提交
1487 1488 1489

  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
1490
  if (!pWrapper->taosxRsp.withSchema) {
L
Liu Jicong 已提交
1491 1492 1493 1494 1495 1496
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }

  return pRspObj;
}

1497
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1498
  /*tscDebug("call poll");*/
X
Xiaoyu Wang 已提交
1499 1500 1501 1502 1503 1504
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
      if (vgStatus != TMQ_VG_STATUS__IDLE) {
L
Liu Jicong 已提交
1505
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
L
Liu Jicong 已提交
1506 1507
        tscTrace("consumer:%" PRId64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId,
                 vgSkipCnt);
X
Xiaoyu Wang 已提交
1508
        continue;
L
Liu Jicong 已提交
1509
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1510 1511 1512 1513
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
S
Shengliang Guan 已提交
1514
        tscDebug("consumer:%" PRId64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId);
L
temp  
Liu Jicong 已提交
1515 1516
        }
#endif
X
Xiaoyu Wang 已提交
1517
      }
L
Liu Jicong 已提交
1518
      atomic_store_32(&pVg->vgSkipCnt, 0);
1519
      SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, timeout, pTopic, pVg);
X
Xiaoyu Wang 已提交
1520 1521
      if (pReq == NULL) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1522
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1523 1524
        return -1;
      }
wafwerar's avatar
wafwerar 已提交
1525
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1526
      if (pParam == NULL) {
wafwerar's avatar
wafwerar 已提交
1527
        taosMemoryFree(pReq);
X
Xiaoyu Wang 已提交
1528
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1529
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1530 1531
        return -1;
      }
1532 1533 1534
      pParam->refId = tmq->refId;
      pParam->epoch = tmq->epoch;

L
Liu Jicong 已提交
1535
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1536
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1537
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1538

1539
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1540
      if (sendInfo == NULL) {
wafwerar's avatar
wafwerar 已提交
1541 1542
        taosMemoryFree(pReq);
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1543
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1544
        tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1545 1546 1547 1548
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1549
          .pData = pReq,
L
Liu Jicong 已提交
1550
          .len = sizeof(SMqPollReq),
X
Xiaoyu Wang 已提交
1551 1552
          .handle = NULL,
      };
L
Liu Jicong 已提交
1553
      sendInfo->requestId = pReq->reqId;
X
Xiaoyu Wang 已提交
1554
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1555
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1556
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1557
      sendInfo->msgType = TDMT_VND_CONSUME;
X
Xiaoyu Wang 已提交
1558 1559

      int64_t transporterId = 0;
L
fix  
Liu Jicong 已提交
1560
      /*printf("send poll\n");*/
L
Liu Jicong 已提交
1561

1562
      char offsetFormatBuf[80];
L
Liu Jicong 已提交
1563
      tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffset);
L
Liu Jicong 已提交
1564 1565
      tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64,
               tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId);
S
Shengliang Guan 已提交
1566
      /*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/
X
Xiaoyu Wang 已提交
1567 1568 1569 1570 1571 1572 1573 1574
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
  return 0;
}

L
Liu Jicong 已提交
1575 1576
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1577
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1578 1579
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1580
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
L
Liu Jicong 已提交
1581
      tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1582
      /*tmqClearUnhandleMsg(tmq);*/
X
Xiaoyu Wang 已提交
1583 1584 1585 1586 1587 1588 1589 1590 1591 1592
      *pReset = true;
    } else {
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
1593
void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
X
Xiaoyu Wang 已提交
1594
  while (1) {
L
Liu Jicong 已提交
1595 1596 1597
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1598
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1599 1600
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
      if (rspWrapper == NULL) return NULL;
X
Xiaoyu Wang 已提交
1601 1602
    }

L
Liu Jicong 已提交
1603 1604 1605 1606 1607
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(rspWrapper);
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
      return NULL;
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1608
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1609
      tscDebug("consumer %ld actual process poll rsp", tmq->consumerId);
L
Liu Jicong 已提交
1610
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
1611
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1612
      if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1613
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1614
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1615
         * rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1616
        pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;
X
Xiaoyu Wang 已提交
1617
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1618
        if (pollRspWrapper->dataRsp.blockNum == 0) {
L
Liu Jicong 已提交
1619 1620
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1621 1622
          continue;
        }
L
Liu Jicong 已提交
1623
        // build rsp
L
Liu Jicong 已提交
1624
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1625
        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1626
        return pRsp;
X
Xiaoyu Wang 已提交
1627
      } else {
L
Liu Jicong 已提交
1628 1629 1630 1631 1632 1633 1634
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
                 pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
        taosFreeQitem(pollRspWrapper);
      }
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1635
      if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1636
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1637
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1638
         * rspMsg->msg.rspOffset);*/
wmmhello's avatar
wmmhello 已提交
1639
        pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset;
L
Liu Jicong 已提交
1640 1641
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // build rsp
L
Liu Jicong 已提交
1642
        SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1643 1644 1645 1646
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
L
Liu Jicong 已提交
1647
                 pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1648
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1649
      }
L
Liu Jicong 已提交
1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
      if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
         * rspMsg->msg.rspOffset);*/
        pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset;
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        if (pollRspWrapper->taosxRsp.blockNum == 0) {
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
          continue;
        }
wmmhello's avatar
wmmhello 已提交
1665

L
Liu Jicong 已提交
1666
        // build rsp
wmmhello's avatar
wmmhello 已提交
1667
        void* pRsp = NULL;
L
Liu Jicong 已提交
1668
        if (pollRspWrapper->taosxRsp.createTableNum == 0) {
wmmhello's avatar
wmmhello 已提交
1669
          pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1670
        } else {
wmmhello's avatar
wmmhello 已提交
1671 1672
          pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper);
        }
L
Liu Jicong 已提交
1673 1674 1675 1676 1677 1678 1679
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
                 pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
        taosFreeQitem(pollRspWrapper);
      }
X
Xiaoyu Wang 已提交
1680
    } else {
L
fix  
Liu Jicong 已提交
1681
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1682
      bool reset = false;
L
Liu Jicong 已提交
1683 1684
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1685
      if (pollIfReset && reset) {
S
Shengliang Guan 已提交
1686
        tscDebug("consumer:%" PRId64 ", reset and repoll", tmq->consumerId);
1687
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
1688 1689 1690 1691 1692
      }
    }
  }
}

1693
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1694
  /*tscDebug("call poll1");*/
L
Liu Jicong 已提交
1695 1696
  void*   rspObj;
  int64_t startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1697

1698 1699 1700
#if 0
  tmqHandleAllDelayedTask(tmq);
  tmqPollImpl(tmq, timeout);
1701
  rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1702 1703
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1704
  }
1705
#endif
X
Xiaoyu Wang 已提交
1706

1707
  // in no topic status, delayed task also need to be processed
L
Liu Jicong 已提交
1708
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1709 1710 1711
    return NULL;
  }

L
Liu Jicong 已提交
1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
    int32_t retryCnt = 0;
    while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
      if (retryCnt++ > 10) {
        return NULL;
      }
      tscDebug("consumer not ready, retry");
      taosMsleep(500);
    }
  }

X
Xiaoyu Wang 已提交
1723
  while (1) {
L
Liu Jicong 已提交
1724
    tmqHandleAllDelayedTask(tmq);
L
Liu Jicong 已提交
1725 1726 1727 1728
    if (tmqPollImpl(tmq, timeout) < 0) {
      tscDebug("return since poll err");
      /*return NULL;*/
    }
L
Liu Jicong 已提交
1729

1730
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1731 1732
    if (rspObj) {
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
1733 1734
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      return NULL;
X
Xiaoyu Wang 已提交
1735
    }
1736
    if (timeout != -1) {
X
Xiaoyu Wang 已提交
1737
      int64_t endTime = taosGetTimestampMs();
1738
      int64_t leftTime = endTime - startTime;
1739
      if (leftTime > timeout) {
S
Shengliang Guan 已提交
1740
        tscDebug("consumer:%" PRId64 ", (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
X
Xiaoyu Wang 已提交
1741 1742
        return NULL;
      }
1743
      tsem_timewait(&tmq->rspSem, leftTime * 1000);
L
Liu Jicong 已提交
1744 1745 1746
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
      tsem_timewait(&tmq->rspSem, 500 * 1000);
X
Xiaoyu Wang 已提交
1747 1748 1749 1750
    }
  }
}

L
Liu Jicong 已提交
1751
int32_t tmq_consumer_close(tmq_t* tmq) {
1752
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
L
Liu Jicong 已提交
1753 1754
    int32_t rsp = tmq_commit_sync(tmq, NULL);
    if (rsp != 0) {
L
Liu Jicong 已提交
1755
      return rsp;
1756 1757
    }

L
Liu Jicong 已提交
1758
    int32_t     retryCnt = 0;
1759
    tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
1760 1761 1762 1763 1764 1765 1766 1767 1768 1769
    while (1) {
      rsp = tmq_subscribe(tmq, lst);
      if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
        break;
      } else {
        retryCnt++;
        taosMsleep(500);
      }
    }

1770
    tmq_list_destroy(lst);
1771

L
Liu Jicong 已提交
1772 1773
    /*return rsp;*/
    return 0;
L
Liu Jicong 已提交
1774
  }
1775
  taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
L
Liu Jicong 已提交
1776
  return 0;
1777
}
L
Liu Jicong 已提交
1778

L
Liu Jicong 已提交
1779 1780
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
1781
    return "success";
L
Liu Jicong 已提交
1782
  } else if (err == -1) {
L
Liu Jicong 已提交
1783 1784 1785
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
1786 1787
  }
}
L
Liu Jicong 已提交
1788

L
Liu Jicong 已提交
1789 1790 1791 1792
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
wmmhello's avatar
wmmhello 已提交
1793 1794
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DELETE) {
1795
      return TMQ_RES_DATA;
wmmhello's avatar
wmmhello 已提交
1796
    }
L
Liu Jicong 已提交
1797
    return TMQ_RES_TABLE_META;
1798 1799
  } else if (TD_RES_TMQ_METADATA(res)) {
    return TMQ_RES_METADATA;
L
Liu Jicong 已提交
1800 1801 1802 1803 1804
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
1805
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
1806 1807
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
1808
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1809 1810 1811
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
1812 1813 1814
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1815 1816 1817 1818 1819
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1820 1821 1822 1823
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1824 1825 1826
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
1827 1828 1829
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1830 1831 1832 1833 1834
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1835 1836 1837 1838
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
1839 1840 1841
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
1842
  } else if (TD_RES_TMQ_METADATA(res)) {
1843 1844
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
1845 1846 1847 1848
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
1849 1850 1851 1852 1853 1854 1855 1856

const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
1857
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
1858 1859
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
L
Liu Jicong 已提交
1860 1861 1862
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
1863
    }
L
Liu Jicong 已提交
1864 1865
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
  }
L
Liu Jicong 已提交
1866 1867
  return NULL;
}
1868

L
Liu Jicong 已提交
1869
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
1870
  //
L
Liu Jicong 已提交
1871
  tmqCommitInner(tmq, msg, 0, 1, cb, param);
L
Liu Jicong 已提交
1872 1873
}

1874 1875
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) {
  //
L
Liu Jicong 已提交
1876
  return tmqCommitInner(tmq, msg, 0, 0, NULL, NULL);
1877
}