clientTmq.c 54.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "cJSON.h"
17 18 19
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22 23
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
24
#include "tqueue.h"
25
#include "tref.h"
L
Liu Jicong 已提交
26 27
#include "ttimer.h"

L
Liu Jicong 已提交
28 29 30
int32_t tmqAskEp(tmq_t* tmq, bool async);

typedef struct {
31 32 33
  int8_t  inited;
  tmr_h   timer;
  int32_t rsetId;
L
Liu Jicong 已提交
34 35 36
} SMqMgmt;

static SMqMgmt tmqMgmt = {0};
37

L
Liu Jicong 已提交
38 39 40 41 42 43
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
44 45 46
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
47 48
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
49
struct tmq_list_t {
L
Liu Jicong 已提交
50
  SArray container;
L
Liu Jicong 已提交
51
};
L
Liu Jicong 已提交
52

L
Liu Jicong 已提交
53
struct tmq_conf_t {
54 55 56 57 58
  char    clientId[256];
  char    groupId[TSDB_CGROUP_LEN];
  int8_t  autoCommit;
  int8_t  resetOffset;
  int8_t  withTbName;
L
Liu Jicong 已提交
59 60
  int8_t  snapEnable;
  int32_t snapBatchSize;
61 62 63

  bool hbBgEnable;

64 65 66 67 68
  uint16_t       port;
  int32_t        autoCommitInterval;
  char*          ip;
  char*          user;
  char*          pass;
69
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
70
  void*          commitCbUserParam;
L
Liu Jicong 已提交
71 72 73
};

struct tmq_t {
74
  int64_t refId;
L
Liu Jicong 已提交
75
  // conf
76 77 78 79 80 81 82 83 84 85 86
  char    groupId[TSDB_CGROUP_LEN];
  char    clientId[256];
  int8_t  withTbName;
  int8_t  useSnapshot;
  int8_t  autoCommit;
  int32_t autoCommitInterval;
  int32_t resetOffsetCfg;
  int64_t consumerId;

  bool hbBgEnable;

L
Liu Jicong 已提交
87 88
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
89 90 91 92

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
93 94
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
95
  int32_t epSkipCnt;
L
Liu Jicong 已提交
96
#endif
L
Liu Jicong 已提交
97 98
  int64_t pollCnt;

L
Liu Jicong 已提交
99
  // timer
100 101
  tmr_h hbLiveTimer;
  tmr_h epTimer;
L
Liu Jicong 已提交
102 103 104
  tmr_h reportTimer;
  tmr_h commitTimer;

L
Liu Jicong 已提交
105 106 107 108
  // connection
  STscObj* pTscObj;

  // container
L
Liu Jicong 已提交
109
  SArray*     clientTopics;  // SArray<SMqClientTopic>
L
Liu Jicong 已提交
110
  STaosQueue* mqueue;        // queue of rsp
L
Liu Jicong 已提交
111
  STaosQall*  qall;
L
Liu Jicong 已提交
112 113 114 115
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit

  // ctl
  tsem_t rspSem;
L
Liu Jicong 已提交
116 117
};

X
Xiaoyu Wang 已提交
118 119 120 121 122 123 124 125
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
126
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
127
  TMQ_CONSUMER_STATUS__RECOVER,
L
Liu Jicong 已提交
128 129
};

L
Liu Jicong 已提交
130
enum {
131
  TMQ_DELAYED_TASK__ASK_EP = 1,
L
Liu Jicong 已提交
132 133 134 135
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

L
Liu Jicong 已提交
136
typedef struct {
137 138 139
  // statistics
  int64_t pollCnt;
  // offset
L
Liu Jicong 已提交
140 141
  STqOffsetVal committedOffset;
  STqOffsetVal currentOffset;
L
Liu Jicong 已提交
142
  // connection info
143
  int32_t vgId;
X
Xiaoyu Wang 已提交
144
  int32_t vgStatus;
L
Liu Jicong 已提交
145
  int32_t vgSkipCnt;
146 147 148
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
149
typedef struct {
150
  // subscribe info
151 152
  char topicName[TSDB_TOPIC_FNAME_LEN];
  char db[TSDB_DB_FNAME_LEN];
L
Liu Jicong 已提交
153 154 155

  SArray* vgs;  // SArray<SMqClientVg>

L
Liu Jicong 已提交
156
  SSchemaWrapper schema;
157 158
} SMqClientTopic;

L
Liu Jicong 已提交
159 160 161 162 163
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
164
  union {
L
Liu Jicong 已提交
165 166
    SMqDataRsp dataRsp;
    SMqMetaRsp metaRsp;
L
Liu Jicong 已提交
167
    STaosxRsp  taosxRsp;
L
Liu Jicong 已提交
168
  };
L
Liu Jicong 已提交
169 170
} SMqPollRspWrapper;

L
Liu Jicong 已提交
171
typedef struct {
172 173
  int64_t refId;
  int32_t epoch;
L
Liu Jicong 已提交
174 175
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
176
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
177

L
Liu Jicong 已提交
178
typedef struct {
179 180
  int64_t refId;
  int32_t epoch;
L
Liu Jicong 已提交
181
  int32_t code;
L
Liu Jicong 已提交
182
  int32_t async;
X
Xiaoyu Wang 已提交
183
  tsem_t  rspSem;
184 185
} SMqAskEpCbParam;

L
Liu Jicong 已提交
186
typedef struct {
187 188
  int64_t         refId;
  int32_t         epoch;
L
Liu Jicong 已提交
189
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
190
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
191
  int32_t         vgId;
L
Liu Jicong 已提交
192
  tsem_t          rspSem;
X
Xiaoyu Wang 已提交
193
} SMqPollCbParam;
194

195
typedef struct {
196 197
  int64_t        refId;
  int32_t        epoch;
L
Liu Jicong 已提交
198 199
  int8_t         automatic;
  int8_t         async;
L
Liu Jicong 已提交
200 201
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
L
Liu Jicong 已提交
202
  int32_t        rspErr;
203
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
204 205 206 207
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
  void*  userParam;
  tsem_t rspSem;
208 209 210 211 212
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
  STqOffset*           pOffset;
213
} SMqCommitCbParam;
214

215
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
216
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
217
  conf->withTbName = false;
L
Liu Jicong 已提交
218
  conf->autoCommit = true;
L
Liu Jicong 已提交
219
  conf->autoCommitInterval = 5000;
X
Xiaoyu Wang 已提交
220
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
221
  conf->hbBgEnable = true;
222 223 224
  return conf;
}

L
Liu Jicong 已提交
225
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
226 227 228 229 230 231
  if (conf) {
    if (conf->ip) taosMemoryFree(conf->ip);
    if (conf->user) taosMemoryFree(conf->user);
    if (conf->pass) taosMemoryFree(conf->pass);
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
232 233 234
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
235 236
  if (strcmp(key, "group.id") == 0) {
    strcpy(conf->groupId, value);
L
Liu Jicong 已提交
237
    return TMQ_CONF_OK;
238
  }
L
Liu Jicong 已提交
239

240 241
  if (strcmp(key, "client.id") == 0) {
    strcpy(conf->clientId, value);
L
Liu Jicong 已提交
242 243
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
244

L
Liu Jicong 已提交
245 246
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
247
      conf->autoCommit = true;
L
Liu Jicong 已提交
248 249
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
250
      conf->autoCommit = false;
L
Liu Jicong 已提交
251 252 253 254
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
255
  }
L
Liu Jicong 已提交
256

L
Liu Jicong 已提交
257 258 259 260 261
  if (strcmp(key, "auto.commit.interval.ms") == 0) {
    conf->autoCommitInterval = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
262 263 264 265 266 267 268 269 270 271 272 273 274 275
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
276

277 278
  if (strcmp(key, "msg.with.table.name") == 0) {
    if (strcmp(value, "true") == 0) {
279
      conf->withTbName = true;
L
Liu Jicong 已提交
280
      return TMQ_CONF_OK;
281
    } else if (strcmp(value, "false") == 0) {
282
      conf->withTbName = false;
L
Liu Jicong 已提交
283
      return TMQ_CONF_OK;
284 285 286 287 288
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
289
  if (strcmp(key, "experimental.snapshot.enable") == 0) {
L
Liu Jicong 已提交
290
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
291
      conf->snapEnable = true;
292 293
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
294
      conf->snapEnable = false;
295 296 297 298 299 300
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
301 302 303 304 305
  if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
    conf->snapBatchSize = atoi(value);
    return TMQ_CONF_OK;
  }

306 307 308
  if (strcmp(key, "enable.heartbeat.background") == 0) {
    if (strcmp(value, "true") == 0) {
      conf->hbBgEnable = true;
L
Liu Jicong 已提交
309 310
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
311
      conf->hbBgEnable = false;
L
Liu Jicong 已提交
312 313 314 315
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
316
    return TMQ_CONF_OK;
L
Liu Jicong 已提交
317 318
  }

L
Liu Jicong 已提交
319
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
320 321 322
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
323
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
324 325 326
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
327
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
328 329 330
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
331
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
332 333 334
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
335
  if (strcmp(key, "td.connect.db") == 0) {
336
    /*conf->db = strdup(value);*/
L
Liu Jicong 已提交
337 338 339
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
340
  return TMQ_CONF_UNKNOWN;
341 342 343
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
344 345
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
346 347
}

L
Liu Jicong 已提交
348 349
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
350 351 352 353 354
  if (src == NULL || src[0] == 0) return -1;
  char* topic = strdup(src);
  if (topic[0] != '`') {
    strtolower(topic, src);
  }
L
fix  
Liu Jicong 已提交
355
  if (taosArrayPush(container, &topic) == NULL) return -1;
356 357 358
  return 0;
}

L
Liu Jicong 已提交
359
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
360
  SArray* container = &list->container;
L
Liu Jicong 已提交
361
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
362 363
}

L
Liu Jicong 已提交
364 365 366 367 368 369 370 371 372 373
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

L
Liu Jicong 已提交
374 375 376 377
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409
int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParamSet->refId);
  if (tmq == NULL) {
    if (!pParamSet->async) {
      tsem_destroy(&pParamSet->rspSem);
    }
    taosMemoryFree(pParamSet);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  // if no more waiting rsp
  if (pParamSet->async) {
    // call async cb func
    if (pParamSet->automatic && tmq->commitCb) {
      tmq->commitCb(tmq, pParamSet->rspErr, tmq->commitCbUserParam);
    } else if (!pParamSet->automatic && pParamSet->userCb) {
      // sem post
      pParamSet->userCb(tmq, pParamSet->rspErr, pParamSet->userParam);
    }
    taosMemoryFree(pParamSet);
  } else {
    tsem_post(&pParamSet->rspSem);
  }

#if 0
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
#endif
  return 0;
}

410 411
int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
412 413
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
  // push into array
L
Liu Jicong 已提交
414
#if 0
415 416 417 418 419
  if (code == 0) {
    taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset);
  } else {
    taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
  }
L
Liu Jicong 已提交
420
#endif
L
Liu Jicong 已提交
421

L
Liu Jicong 已提交
422 423 424
  taosMemoryFree(pParam->pOffset);
  if (pBuf->pData) taosMemoryFree(pBuf->pData);

S
Shengliang Guan 已提交
425
  /*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
L
Liu Jicong 已提交
426 427
   * pOffset->version);*/

428
  // count down waiting rsp
L
Liu Jicong 已提交
429
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
430 431 432
  ASSERT(waitingRspNum >= 0);

  if (waitingRspNum == 0) {
433
    tmqCommitDone(pParamSet);
434 435 436 437
  }
  return 0;
}

L
Liu Jicong 已提交
438 439 440 441 442 443
static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pTopic, SMqCommitCbParamSet* pParamSet) {
  STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
  if (pOffset == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
444
  pOffset->val = pVg->currentOffset;
445

L
Liu Jicong 已提交
446 447 448 449
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pOffset->subKey, tmq->groupId, groupLen);
  pOffset->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);
L
Liu Jicong 已提交
450

L
Liu Jicong 已提交
451 452 453 454 455 456 457 458 459 460
  int32_t len;
  int32_t code;
  tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
  if (code < 0) {
    ASSERT(0);
    return -1;
  }
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
  if (buf == NULL) return -1;
  ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
461

L
Liu Jicong 已提交
462 463 464 465 466 467 468
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
  tEncodeSTqOffset(&encoder, pOffset);

  // build param
469
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
470 471 472 473 474 475 476 477 478 479 480 481 482 483
  pParam->params = pParamSet;
  pParam->pOffset = pOffset;

  // build send info
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
    return -1;
  }
  pMsgSendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = sizeof(SMsgHead) + len,
      .handle = NULL,
  };

S
Shengliang Guan 已提交
484 485
  tscDebug("consumer:%" PRId64 ", commit offset of %s on vgId:%d, offset is %" PRId64, tmq->consumerId, pOffset->subKey,
           pVg->vgId, pOffset->val.version);
L
Liu Jicong 已提交
486 487

  // TODO: put into cb
L
Liu Jicong 已提交
488
  pVg->committedOffset = pVg->currentOffset;
L
Liu Jicong 已提交
489 490 491 492

  pMsgSendInfo->requestId = generateRequestId();
  pMsgSendInfo->requestObjRefId = 0;
  pMsgSendInfo->param = pParam;
L
Liu Jicong 已提交
493
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
494
  pMsgSendInfo->fp = tmqCommitCb;
L
Liu Jicong 已提交
495 496 497
  pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
  // send msg

L
Liu Jicong 已提交
498 499 500
  atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
  atomic_add_fetch_32(&pParamSet->totalRspNum, 1);

L
Liu Jicong 已提交
501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
  return 0;
}

int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
  char*   topic;
  int32_t vgId;
  ASSERT(msg != NULL);
  if (TD_RES_TMQ(msg)) {
    SMqRspObj* pRspObj = (SMqRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
  } else if (TD_RES_TMQ_META(msg)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
    topic = pMetaRspObj->topic;
    vgId = pMetaRspObj->vgId;
518 519 520 521
  } else if(TD_RES_TMQ_METADATA(msg)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
L
Liu Jicong 已提交
522 523 524 525 526 527 528 529 530
  } else {
    return TSDB_CODE_TMQ_INVALID_MSG;
  }

  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
531 532
  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;
L
Liu Jicong 已提交
533 534 535 536 537 538
  pParamSet->automatic = 0;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

L
Liu Jicong 已提交
539 540
  int32_t code = -1;

L
Liu Jicong 已提交
541 542 543 544 545 546 547
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(pTopic->topicName, topic) != 0) continue;
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      if (pVg->vgId != vgId) continue;

L
Liu Jicong 已提交
548
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
L
Liu Jicong 已提交
549 550
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          goto FAIL;
L
Liu Jicong 已提交
551
        }
L
Liu Jicong 已提交
552
        goto HANDLE_RSP;
L
Liu Jicong 已提交
553 554
      }
    }
L
Liu Jicong 已提交
555
  }
L
Liu Jicong 已提交
556

L
Liu Jicong 已提交
557 558 559 560
HANDLE_RSP:
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
561 562 563
    return 0;
  }

L
Liu Jicong 已提交
564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
    return code;
  } else {
    code = 0;
  }

FAIL:
  if (code != 0 && async) {
    userCb(tmq, code, userParam);
  }
  return 0;
}

L
Liu Jicong 已提交
580 581
int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                       void* userParam) {
L
Liu Jicong 已提交
582 583 584 585 586 587
  int32_t code = -1;

  if (msg != NULL) {
    return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
  }

588 589 590 591 592
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
593 594 595 596

  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;

597 598 599 600 601 602
  pParamSet->automatic = automatic;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

603 604 605
  // init as 1 to prevent concurrency issue
  pParamSet->waitingRspNum = 1;

606 607
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
608

S
Shengliang Guan 已提交
609
    tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgNum %d", tmq->consumerId, pTopic->topicName,
L
Liu Jicong 已提交
610 611
             (int32_t)taosArrayGetSize(pTopic->vgs));

612
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
L
Liu Jicong 已提交
613 614
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);

S
Shengliang Guan 已提交
615 616
      tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgId:%d", tmq->consumerId, pTopic->topicName,
               pVg->vgId);
L
Liu Jicong 已提交
617

L
Liu Jicong 已提交
618 619 620
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
        tscDebug("consumer: %ld, vg:%d, current %ld, committed %ld", tmq->consumerId, pVg->vgId,
                 pVg->currentOffset.version, pVg->committedOffset.version);
L
Liu Jicong 已提交
621 622 623
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          continue;
        }
624 625 626 627
      }
    }
  }

L
Liu Jicong 已提交
628 629 630 631 632 633
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
    return 0;
  }

634 635 636 637 638 639
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
  ASSERT(waitingRspNum >= 0);
  if (waitingRspNum == 0) {
    tmqCommitDone(pParamSet);
  }

640 641 642 643
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
644
    taosMemoryFree(pParamSet);
645 646 647 648 649 650
  } else {
    code = 0;
  }

  if (code != 0 && async) {
    if (automatic) {
L
Liu Jicong 已提交
651
      tmq->commitCb(tmq, code, tmq->commitCbUserParam);
652
    } else {
L
Liu Jicong 已提交
653
      userCb(tmq, code, userParam);
654 655 656
    }
  }

L
Liu Jicong 已提交
657
#if 0
658 659 660 661
  if (!async) {
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
  }
L
Liu Jicong 已提交
662
#endif
663 664 665 666

  return 0;
}

667
void tmqAssignAskEpTask(void* param, void* tmrId) {
668 669 670 671 672 673 674 675 676
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
    *pTaskType = TMQ_DELAYED_TASK__ASK_EP;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
677 678 679
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
680 681 682 683 684 685 686 687 688
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
    *pTaskType = TMQ_DELAYED_TASK__COMMIT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
689 690 691
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
692 693 694 695 696 697 698 699 700
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
    *pTaskType = TMQ_DELAYED_TASK__REPORT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
701 702
}

703 704 705 706 707 708
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
  if (pMsg && pMsg->pData) taosMemoryFree(pMsg->pData);
  return 0;
}

void tmqSendHbReq(void* param, void* tmrId) {
709 710 711 712 713
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq == NULL) {
    return;
  }
714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742
  int64_t   consumerId = tmq->consumerId;
  int32_t   epoch = tmq->epoch;
  SMqHbReq* pReq = taosMemoryMalloc(sizeof(SMqHbReq));
  if (pReq == NULL) goto OVER;
  pReq->consumerId = consumerId;
  pReq->epoch = epoch;

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pReq);
  }
  sendInfo->msgInfo = (SDataBuf){
      .pData = pReq,
      .len = sizeof(SMqHbReq),
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = NULL;
  sendInfo->fp = tmqHbCb;
  sendInfo->msgType = TDMT_MND_MQ_HB;

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

OVER:
743
  taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer);
744 745
}

L
Liu Jicong 已提交
746 747 748 749 750 751 752 753
int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
  STaosQall* qall = taosAllocateQall();
  taosReadAllQitems(tmq->delayedTask, qall);
  while (1) {
    int8_t* pTaskType = NULL;
    taosGetQitem(qall, (void**)&pTaskType);
    if (pTaskType == NULL) break;

754
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
L
Liu Jicong 已提交
755
      tmqAskEp(tmq, true);
756 757 758 759 760

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
      *pRefId = tmq->refId;

      taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &tmq->epTimer);
L
Liu Jicong 已提交
761
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
L
Liu Jicong 已提交
762
      tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
763 764 765 766 767

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
      *pRefId = tmq->refId;

      taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId, tmqMgmt.timer, &tmq->commitTimer);
L
Liu Jicong 已提交
768 769 770 771
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    } else {
      ASSERT(0);
    }
L
Liu Jicong 已提交
772
    taosFreeQitem(pTaskType);
L
Liu Jicong 已提交
773 774 775 776 777
  }
  taosFreeQall(qall);
  return 0;
}

L
Liu Jicong 已提交
778
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
779
  SMqRspWrapper* msg = NULL;
L
Liu Jicong 已提交
780 781 782 783 784 785 786 787
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }

L
fix  
Liu Jicong 已提交
788
  msg = NULL;
L
Liu Jicong 已提交
789 790 791 792 793 794 795 796 797 798
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }
}

D
dapan1121 已提交
799
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
800 801 802 803 804
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
  tsem_post(&pParam->rspSem);
  return 0;
}
805

L
Liu Jicong 已提交
806
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
X
Xiaoyu Wang 已提交
807 808 809 810
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
811
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
812
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
813
  }
L
Liu Jicong 已提交
814
  return 0;
X
Xiaoyu Wang 已提交
815 816
}

L
Liu Jicong 已提交
817
int32_t tmq_unsubscribe(tmq_t* tmq) {
L
Liu Jicong 已提交
818 819
  int32_t     rsp;
  int32_t     retryCnt = 0;
L
Liu Jicong 已提交
820
  tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
821 822 823 824 825 826 827 828 829 830
  while (1) {
    rsp = tmq_subscribe(tmq, lst);
    if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
      break;
    } else {
      retryCnt++;
      taosMsleep(500);
    }
  }

L
Liu Jicong 已提交
831 832
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
833 834
}

835 836
void tmqFreeImpl(void* handle) {
  tmq_t* tmq = (tmq_t*)handle;
L
Liu Jicong 已提交
837

838 839 840 841
  // TODO stop timer
  if (tmq->mqueue) taosCloseQueue(tmq->mqueue);
  if (tmq->delayedTask) taosCloseQueue(tmq->delayedTask);
  if (tmq->qall) taosFreeQall(tmq->qall);
L
Liu Jicong 已提交
842

843
  tsem_destroy(&tmq->rspSem);
L
Liu Jicong 已提交
844

845 846 847
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
848
    if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
849 850 851 852 853 854
    int32_t vgSz = taosArrayGetSize(pTopic->vgs);
    taosArrayDestroy(pTopic->vgs);
  }
  taosArrayDestroy(tmq->clientTopics);
  taos_close_internal(tmq->pTscObj);
  taosMemoryFree(tmq);
L
Liu Jicong 已提交
855 856
}

L
Liu Jicong 已提交
857
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
L
Liu Jicong 已提交
858 859 860 861 862 863 864 865 866
  // init timer
  int8_t inited = atomic_val_compare_exchange_8(&tmqMgmt.inited, 0, 1);
  if (inited == 0) {
    tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
    if (tmqMgmt.timer == NULL) {
      atomic_store_8(&tmqMgmt.inited, 0);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
867
    tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
L
Liu Jicong 已提交
868 869
  }

L
Liu Jicong 已提交
870 871
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
L
Liu Jicong 已提交
872
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
873 874
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
875 876
    return NULL;
  }
L
Liu Jicong 已提交
877

L
Liu Jicong 已提交
878 879 880 881 882
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

  ASSERT(user);
  ASSERT(pass);
L
Liu Jicong 已提交
883
  ASSERT(conf->groupId[0]);
L
Liu Jicong 已提交
884

L
Liu Jicong 已提交
885 886 887 888 889 890
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
  pTmq->delayedTask = taosOpenQueue();

  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
L
Liu Jicong 已提交
891
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
892 893
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
894 895
    goto FAIL;
  }
L
Liu Jicong 已提交
896

L
Liu Jicong 已提交
897 898
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
899 900
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
901 902
  /*pTmq->epStatus = 0;*/
  /*pTmq->epSkipCnt = 0;*/
L
Liu Jicong 已提交
903

L
Liu Jicong 已提交
904 905 906
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
907
  pTmq->withTbName = conf->withTbName;
L
Liu Jicong 已提交
908
  pTmq->useSnapshot = conf->snapEnable;
L
Liu Jicong 已提交
909
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
910
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
911 912
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
913 914
  pTmq->resetOffsetCfg = conf->resetOffset;

915 916
  pTmq->hbBgEnable = conf->hbBgEnable;

L
Liu Jicong 已提交
917
  // assign consumerId
L
Liu Jicong 已提交
918
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
919

L
Liu Jicong 已提交
920 921
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
S
Shengliang Guan 已提交
922 923
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
924 925
    goto FAIL;
  }
L
Liu Jicong 已提交
926

L
Liu Jicong 已提交
927 928 929
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
S
Shengliang Guan 已提交
930 931
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
932 933 934
    tsem_destroy(&pTmq->rspSem);
    goto FAIL;
  }
L
Liu Jicong 已提交
935

936 937 938 939 940 941 942 943 944
  pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
  if (pTmq->refId < 0) {
    tmqFreeImpl(pTmq);
    return NULL;
  }

  int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
  *pRefId = pTmq->refId;

945
  if (pTmq->hbBgEnable) {
946
    pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
947 948
  }

S
Shengliang Guan 已提交
949
  tscInfo("consumer %" PRId64 " is setup, consumer group %s", pTmq->consumerId, pTmq->groupId);
L
Liu Jicong 已提交
950

951
  return pTmq;
L
Liu Jicong 已提交
952 953 954 955 956 957 958 959

FAIL:
  if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
  if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
  if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
  if (pTmq->qall) taosFreeQall(pTmq->qall);
  taosMemoryFree(pTmq);
  return NULL;
960 961
}

L
Liu Jicong 已提交
962
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
L
Liu Jicong 已提交
963 964 965 966 967
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
  SCMSubscribeReq req = {0};
  int32_t         code = -1;
968 969

  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
970
  tstrncpy(req.clientId, tmq->clientId, 256);
L
Liu Jicong 已提交
971
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
972
  req.topicNames = taosArrayInit(sz, sizeof(void*));
L
Liu Jicong 已提交
973
  if (req.topicNames == NULL) goto FAIL;
974

L
Liu Jicong 已提交
975 976
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
977 978

    SName name = {0};
L
Liu Jicong 已提交
979
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
980

L
Liu Jicong 已提交
981 982 983
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
984
    }
L
Liu Jicong 已提交
985
    tNameExtractFullName(&name, topicFName);
986

L
Liu Jicong 已提交
987 988 989
    tscDebug("subscribe topic: %s", topicFName);

    taosArrayPush(req.topicNames, &topicFName);
990 991
  }

L
Liu Jicong 已提交
992 993 994 995
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) goto FAIL;

996 997 998
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

999
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1000
  if (sendInfo == NULL) goto FAIL;
1001

X
Xiaoyu Wang 已提交
1002
  SMqSubscribeCbParam param = {
L
Liu Jicong 已提交
1003
      .rspErr = 0,
1004 1005
      .refId = tmq->refId,
      .epoch = tmq->epoch,
X
Xiaoyu Wang 已提交
1006
  };
L
Liu Jicong 已提交
1007

L
Liu Jicong 已提交
1008 1009 1010
  if (tsem_init(&param.rspSem, 0, 0) != 0) goto FAIL;

  sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1011 1012 1013 1014
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
1015

L
Liu Jicong 已提交
1016 1017
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1018 1019
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
1020 1021
  sendInfo->msgType = TDMT_MND_SUBSCRIBE;

1022 1023 1024 1025 1026
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
1027 1028 1029
  // avoid double free if msg is sent
  buf = NULL;

L
Liu Jicong 已提交
1030 1031
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
1032

L
Liu Jicong 已提交
1033 1034 1035
  code = param.rspErr;
  if (code != 0) goto FAIL;

L
Liu Jicong 已提交
1036
  int32_t retryCnt = 0;
L
Liu Jicong 已提交
1037
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
L
Liu Jicong 已提交
1038 1039 1040
    if (retryCnt++ > 10) {
      goto FAIL;
    }
L
fix  
Liu Jicong 已提交
1041
    tscDebug("consumer not ready, retry");
L
Liu Jicong 已提交
1042 1043
    taosMsleep(500);
  }
1044

1045 1046
  // init ep timer
  if (tmq->epTimer == NULL) {
1047 1048 1049
    int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId1 = tmq->refId;
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, pRefId1, tmqMgmt.timer);
1050
  }
L
Liu Jicong 已提交
1051 1052

  // init auto commit timer
1053
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
1054 1055 1056
    int64_t* pRefId2 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId2 = tmq->refId;
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId2, tmqMgmt.timer);
L
Liu Jicong 已提交
1057 1058
  }

L
Liu Jicong 已提交
1059 1060 1061
  code = 0;
FAIL:
  if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
1062
  if (code != 0 && buf) {
L
Liu Jicong 已提交
1063 1064 1065
    taosMemoryFree(buf);
  }
  return code;
1066 1067
}

L
Liu Jicong 已提交
1068
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
1069
  //
1070
  conf->commitCb = cb;
L
Liu Jicong 已提交
1071
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
1072
}
1073

D
dapan1121 已提交
1074
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1075 1076
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
1077
  SMqClientTopic* pTopic = pParam->pTopic;
1078 1079 1080 1081 1082 1083

  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
  if (tmq == NULL) {
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1084
    taosMemoryFree(pMsg->pEpSet);
1085 1086 1087 1088 1089 1090
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  int32_t epoch = pParam->epoch;
  int32_t vgId = pParam->vgId;
L
Liu Jicong 已提交
1091
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
1092
  if (code != 0) {
L
Liu Jicong 已提交
1093
    tscWarn("msg discard from vgId:%d, epoch %d, since %s", vgId, epoch, terrstr());
L
Liu Jicong 已提交
1094
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1095 1096 1097 1098
    if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
      goto CREATE_MSG_FAIL;
    }
L
Liu Jicong 已提交
1099 1100 1101 1102
    if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
      if (pRspWrapper == NULL) {
        taosMemoryFree(pMsg->pData);
S
Shengliang Guan 已提交
1103
        tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
Liu Jicong 已提交
1104 1105 1106 1107 1108 1109 1110 1111
        goto CREATE_MSG_FAIL;
      }
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      /*pRspWrapper->vgHandle = pVg;*/
      /*pRspWrapper->topicHandle = pTopic;*/
      taosWriteQitem(tmq->mqueue, pRspWrapper);
      tsem_post(&tmq->rspSem);
    }
L
fix txn  
Liu Jicong 已提交
1112
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1113 1114
  }

X
Xiaoyu Wang 已提交
1115 1116 1117
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
1118
    // do not write into queue since updating epoch reset
S
Shengliang Guan 已提交
1119
    tscWarn("msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d", vgId, msgEpoch,
L
Liu Jicong 已提交
1120
            tmqEpoch);
1121
    tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1122
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1123
    taosMemoryFree(pMsg->pEpSet);
X
Xiaoyu Wang 已提交
1124 1125 1126 1127
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
S
Shengliang Guan 已提交
1128
    tscWarn("mismatch rsp from vgId:%d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
X
Xiaoyu Wang 已提交
1129 1130
  }

L
Liu Jicong 已提交
1131 1132 1133
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

1134
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1135
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1136
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1137
    taosMemoryFree(pMsg->pEpSet);
S
Shengliang Guan 已提交
1138
    tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
fix txn  
Liu Jicong 已提交
1139
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1140
  }
L
Liu Jicong 已提交
1141

L
Liu Jicong 已提交
1142
  pRspWrapper->tmqRspType = rspType;
L
Liu Jicong 已提交
1143 1144
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
1145

L
Liu Jicong 已提交
1146
  if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1147 1148 1149
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
wmmhello's avatar
wmmhello 已提交
1150
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1151
    memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1152 1153 1154 1155 1156 1157

    tscDebug("consumer:%" PRId64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d",
             tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version,
             rspType);

  } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
1158 1159 1160 1161
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqMetaRsp(&decoder, &pRspWrapper->metaRsp);
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1162
    memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1163 1164 1165 1166 1167 1168 1169 1170
  } else if (rspType == TMQ_MSG_TYPE__TAOSX_RSP) {
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp);
    tDecoderClear(&decoder);
    memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead));
  } else {
    ASSERT(0);
L
Liu Jicong 已提交
1171
  }
L
Liu Jicong 已提交
1172

L
Liu Jicong 已提交
1173
  taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1174
  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
1175

L
Liu Jicong 已提交
1176
  taosWriteQitem(tmq->mqueue, pRspWrapper);
1177
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1178

L
Liu Jicong 已提交
1179
  return 0;
L
fix txn  
Liu Jicong 已提交
1180
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1181
  if (epoch == tmq->epoch) {
L
Liu Jicong 已提交
1182 1183
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
1184
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1185
  return -1;
1186 1187
}

L
Liu Jicong 已提交
1188
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
1189 1190 1191 1192
  bool set = false;

  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
S
Shengliang Guan 已提交
1193
  tscDebug("consumer:%" PRId64 ", update ep epoch %d to epoch %d, topic num:%d", tmq->consumerId, tmq->epoch, epoch,
1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211
           topicNumGet);

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
S
Shengliang Guan 已提交
1212
      tscDebug("consumer:%" PRId64 ", new vg num: %d", tmq->consumerId, vgNumCur);
1213 1214 1215
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
        sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
L
Liu Jicong 已提交
1216
        char buf[80];
L
Liu Jicong 已提交
1217
        tFormatOffset(buf, 80, &pVgCur->currentOffset);
L
Liu Jicong 已提交
1218 1219
        tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch,
                 pVgCur->vgId, vgKey, buf);
L
Liu Jicong 已提交
1220
        taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal));
1221 1222 1223 1224 1225 1226 1227 1228
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
    topic.schema = pTopicEp->schema;
L
Liu Jicong 已提交
1229 1230
    pTopicEp->schema.nCols = 0;
    pTopicEp->schema.pSchema = NULL;
1231
    tstrncpy(topic.topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
1232 1233
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);

S
Shengliang Guan 已提交
1234
    tscDebug("consumer:%" PRId64 ", update topic: %s", tmq->consumerId, topic.topicName);
1235 1236 1237 1238 1239 1240

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
L
Liu Jicong 已提交
1241 1242
      STqOffsetVal* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      STqOffsetVal  offsetNew = {.type = tmq->resetOffsetCfg};
1243
      if (pOffset != NULL) {
L
Liu Jicong 已提交
1244
        offsetNew = *pOffset;
1245 1246 1247 1248
      }

      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1249
          .currentOffset = offsetNew,
1250 1251 1252 1253 1254 1255 1256 1257 1258 1259
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
          .vgSkipCnt = 0,
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
    taosArrayPush(newTopics, &topic);
  }
1260 1261 1262 1263
  if (tmq->clientTopics) {
    int32_t sz = taosArrayGetSize(tmq->clientTopics);
    for (int32_t i = 0; i < sz; i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
1264
      if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
1265 1266
      int32_t vgSz = taosArrayGetSize(pTopic->vgs);
      taosArrayDestroy(pTopic->vgs);
L
Liu Jicong 已提交
1267
    }
1268
    taosArrayDestroy(tmq->clientTopics);
X
Xiaoyu Wang 已提交
1269
  }
L
Liu Jicong 已提交
1270
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
1271
  tmq->clientTopics = newTopics;
1272

1273 1274 1275 1276
  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
1277

X
Xiaoyu Wang 已提交
1278 1279 1280 1281
  atomic_store_32(&tmq->epoch, epoch);
  return set;
}

D
dapan1121 已提交
1282
int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
1283
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1284
  int8_t           async = pParam->async;
1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297
  tmq_t*           tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);

  if (tmq == NULL) {
    if (!async) {
      tsem_destroy(&pParam->rspSem);
    } else {
      taosMemoryFree(pParam);
    }
    taosMemoryFree(pMsg->pData);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

L
Liu Jicong 已提交
1298
  pParam->code = code;
1299
  if (code != 0) {
S
Shengliang Guan 已提交
1300
    tscError("consumer:%" PRId64 ", get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->async);
L
Liu Jicong 已提交
1301
    goto END;
1302
  }
L
Liu Jicong 已提交
1303

L
Liu Jicong 已提交
1304
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1305
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1306
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1307 1308
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
S
Shengliang Guan 已提交
1309
  tscDebug("consumer:%" PRId64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1310 1311
  if (head->epoch <= epoch) {
    goto END;
1312
  }
L
Liu Jicong 已提交
1313

L
Liu Jicong 已提交
1314
  if (!async) {
L
Liu Jicong 已提交
1315 1316
    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
S
Shengliang Guan 已提交
1317 1318
    /*printf("rsp epoch %" PRId64 " sz %" PRId64 "\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %" PRId64 " sz %" PRId64 "\n", tmq->epoch, tmq->clientTopics->size);*/
L
Liu Jicong 已提交
1319
    tmqUpdateEp(tmq, head->epoch, &rsp);
L
Liu Jicong 已提交
1320
    tDeleteSMqAskEpRsp(&rsp);
X
Xiaoyu Wang 已提交
1321
  } else {
1322
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1323
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1324
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1325 1326
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1327
    }
L
Liu Jicong 已提交
1328 1329 1330
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1331
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1332

L
Liu Jicong 已提交
1333
    taosWriteQitem(tmq->mqueue, pWrapper);
1334
    tsem_post(&tmq->rspSem);
1335
  }
L
Liu Jicong 已提交
1336 1337

END:
L
Liu Jicong 已提交
1338
  /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1339
  if (!async) {
L
Liu Jicong 已提交
1340
    tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
1341 1342
  } else {
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
1343
  }
L
Liu Jicong 已提交
1344
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1345
  return code;
1346 1347
}

L
Liu Jicong 已提交
1348
int32_t tmqAskEp(tmq_t* tmq, bool async) {
L
Liu Jicong 已提交
1349
  int32_t code = 0;
L
Liu Jicong 已提交
1350
#if 0
L
Liu Jicong 已提交
1351
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
L
fix txn  
Liu Jicong 已提交
1352
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
1353
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
S
Shengliang Guan 已提交
1354
    tscTrace("consumer:%" PRId64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
1355
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
1356
  }
L
temp  
Liu Jicong 已提交
1357
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
1358
#endif
L
Liu Jicong 已提交
1359
  int32_t      tlen = sizeof(SMqAskEpReq);
L
Liu Jicong 已提交
1360
  SMqAskEpReq* req = taosMemoryCalloc(1, tlen);
L
Liu Jicong 已提交
1361
  if (req == NULL) {
L
Liu Jicong 已提交
1362
    tscError("failed to malloc get subscribe ep buf");
L
Liu Jicong 已提交
1363
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1364
    return -1;
L
Liu Jicong 已提交
1365
  }
L
Liu Jicong 已提交
1366 1367 1368
  req->consumerId = htobe64(tmq->consumerId);
  req->epoch = htonl(tmq->epoch);
  strcpy(req->cgroup, tmq->groupId);
1369

L
Liu Jicong 已提交
1370
  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
1371 1372
  if (pParam == NULL) {
    tscError("failed to malloc subscribe param");
wafwerar's avatar
wafwerar 已提交
1373
    taosMemoryFree(req);
L
Liu Jicong 已提交
1374
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1375
    return -1;
L
Liu Jicong 已提交
1376
  }
1377 1378
  pParam->refId = tmq->refId;
  pParam->epoch = tmq->epoch;
L
Liu Jicong 已提交
1379
  pParam->async = async;
X
Xiaoyu Wang 已提交
1380
  tsem_init(&pParam->rspSem, 0, 0);
1381

1382
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1383 1384
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1385 1386
    taosMemoryFree(pParam);
    taosMemoryFree(req);
L
Liu Jicong 已提交
1387
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1388 1389 1390 1391 1392 1393 1394 1395 1396 1397
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
      .pData = req,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
L
Liu Jicong 已提交
1398 1399 1400
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1401
  sendInfo->msgType = TDMT_MND_MQ_ASK_EP;
1402

L
Liu Jicong 已提交
1403 1404
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

S
Shengliang Guan 已提交
1405
  tscDebug("consumer:%" PRId64 ", ask ep", tmq->consumerId);
L
add log  
Liu Jicong 已提交
1406

L
Liu Jicong 已提交
1407 1408
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1409

L
Liu Jicong 已提交
1410
  if (!async) {
L
Liu Jicong 已提交
1411 1412 1413 1414 1415
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }
  return code;
1416 1417
}

1418
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1419
  SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq));
1420 1421 1422
  if (pReq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
1423

L
Liu Jicong 已提交
1424 1425 1426
  /*strcpy(pReq->topic, pTopic->topicName);*/
  /*strcpy(pReq->cgroup, tmq->groupId);*/

L
Liu Jicong 已提交
1427 1428 1429 1430
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1431

1432
  pReq->withTbName = tmq->withTbName;
1433
  pReq->timeout = timeout;
L
Liu Jicong 已提交
1434
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1435
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1436
  /*pReq->currentOffset = reqOffset;*/
L
Liu Jicong 已提交
1437
  pReq->reqOffset = pVg->currentOffset;
L
Liu Jicong 已提交
1438
  pReq->reqId = generateRequestId();
1439

L
Liu Jicong 已提交
1440 1441
  pReq->useSnapshot = tmq->useSnapshot;

1442
  pReq->head.vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
1443
  pReq->head.contLen = htonl(sizeof(SMqPollReq));
1444 1445 1446
  return pReq;
}

L
Liu Jicong 已提交
1447 1448
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
L
Liu Jicong 已提交
1449
  pRspObj->resType = RES_TYPE__TMQ_META;
L
Liu Jicong 已提交
1450 1451 1452 1453 1454 1455 1456 1457
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

L
Liu Jicong 已提交
1458 1459 1460
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1461 1462
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1463
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1464
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1465
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
L
Liu Jicong 已提交
1466

L
Liu Jicong 已提交
1467 1468
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
L
Liu Jicong 已提交
1469
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1470 1471
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1472

L
Liu Jicong 已提交
1473
  return pRspObj;
X
Xiaoyu Wang 已提交
1474 1475
}

L
Liu Jicong 已提交
1476 1477
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
1478
  pRspObj->resType = RES_TYPE__TMQ_METADATA;
L
Liu Jicong 已提交
1479 1480 1481 1482
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;
  pRspObj->resIter = -1;
1483
  memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp));
L
Liu Jicong 已提交
1484 1485 1486

  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
1487
  if (!pWrapper->taosxRsp.withSchema) {
L
Liu Jicong 已提交
1488 1489 1490 1491 1492 1493
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }

  return pRspObj;
}

1494
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1495
  /*tscDebug("call poll");*/
X
Xiaoyu Wang 已提交
1496 1497 1498 1499 1500 1501
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
      if (vgStatus != TMQ_VG_STATUS__IDLE) {
L
Liu Jicong 已提交
1502
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
L
Liu Jicong 已提交
1503 1504
        tscTrace("consumer:%" PRId64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId,
                 vgSkipCnt);
X
Xiaoyu Wang 已提交
1505
        continue;
L
Liu Jicong 已提交
1506
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1507 1508 1509 1510
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
S
Shengliang Guan 已提交
1511
        tscDebug("consumer:%" PRId64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId);
L
temp  
Liu Jicong 已提交
1512 1513
        }
#endif
X
Xiaoyu Wang 已提交
1514
      }
L
Liu Jicong 已提交
1515
      atomic_store_32(&pVg->vgSkipCnt, 0);
1516
      SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, timeout, pTopic, pVg);
X
Xiaoyu Wang 已提交
1517 1518
      if (pReq == NULL) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1519
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1520 1521
        return -1;
      }
wafwerar's avatar
wafwerar 已提交
1522
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1523
      if (pParam == NULL) {
wafwerar's avatar
wafwerar 已提交
1524
        taosMemoryFree(pReq);
X
Xiaoyu Wang 已提交
1525
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1526
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1527 1528
        return -1;
      }
1529 1530 1531
      pParam->refId = tmq->refId;
      pParam->epoch = tmq->epoch;

L
Liu Jicong 已提交
1532
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1533
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1534
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1535

1536
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1537
      if (sendInfo == NULL) {
wafwerar's avatar
wafwerar 已提交
1538 1539
        taosMemoryFree(pReq);
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1540
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1541
        tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1542 1543 1544 1545
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1546
          .pData = pReq,
L
Liu Jicong 已提交
1547
          .len = sizeof(SMqPollReq),
X
Xiaoyu Wang 已提交
1548 1549
          .handle = NULL,
      };
L
Liu Jicong 已提交
1550
      sendInfo->requestId = pReq->reqId;
X
Xiaoyu Wang 已提交
1551
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1552
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1553
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1554
      sendInfo->msgType = TDMT_VND_CONSUME;
X
Xiaoyu Wang 已提交
1555 1556

      int64_t transporterId = 0;
L
fix  
Liu Jicong 已提交
1557
      /*printf("send poll\n");*/
L
Liu Jicong 已提交
1558

1559
      char offsetFormatBuf[80];
L
Liu Jicong 已提交
1560
      tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffset);
L
Liu Jicong 已提交
1561 1562
      tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64,
               tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId);
S
Shengliang Guan 已提交
1563
      /*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/
X
Xiaoyu Wang 已提交
1564 1565 1566 1567 1568 1569 1570 1571
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
  return 0;
}

L
Liu Jicong 已提交
1572 1573
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1574
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1575 1576
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1577
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
L
Liu Jicong 已提交
1578
      tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1579
      /*tmqClearUnhandleMsg(tmq);*/
X
Xiaoyu Wang 已提交
1580 1581 1582 1583 1584 1585 1586 1587 1588 1589
      *pReset = true;
    } else {
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
1590
void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
X
Xiaoyu Wang 已提交
1591
  while (1) {
L
Liu Jicong 已提交
1592 1593 1594
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1595
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1596 1597
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
      if (rspWrapper == NULL) return NULL;
X
Xiaoyu Wang 已提交
1598 1599
    }

L
Liu Jicong 已提交
1600 1601 1602 1603 1604
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(rspWrapper);
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
      return NULL;
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1605
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1606
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
1607
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1608
      if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1609
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1610
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1611
         * rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1612
        pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;
X
Xiaoyu Wang 已提交
1613
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1614
        if (pollRspWrapper->dataRsp.blockNum == 0) {
L
Liu Jicong 已提交
1615 1616
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1617 1618
          continue;
        }
L
Liu Jicong 已提交
1619
        // build rsp
L
Liu Jicong 已提交
1620
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1621
        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1622
        return pRsp;
X
Xiaoyu Wang 已提交
1623
      } else {
L
Liu Jicong 已提交
1624 1625 1626 1627 1628 1629 1630
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
                 pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
        taosFreeQitem(pollRspWrapper);
      }
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1631
      if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1632
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1633
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1634
         * rspMsg->msg.rspOffset);*/
wmmhello's avatar
wmmhello 已提交
1635
        pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset;
L
Liu Jicong 已提交
1636 1637
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // build rsp
L
Liu Jicong 已提交
1638
        SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1639 1640 1641 1642
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
L
Liu Jicong 已提交
1643
                 pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1644
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1645
      }
L
Liu Jicong 已提交
1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
      if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
         * rspMsg->msg.rspOffset);*/
        pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset;
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        if (pollRspWrapper->taosxRsp.blockNum == 0) {
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
          continue;
        }
wmmhello's avatar
wmmhello 已提交
1661

L
Liu Jicong 已提交
1662
        // build rsp
wmmhello's avatar
wmmhello 已提交
1663 1664 1665 1666 1667 1668
        void* pRsp = NULL;
        if(pollRspWrapper->taosxRsp.createTableNum == 0){
          pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
        }else{
          pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper);
        }
L
Liu Jicong 已提交
1669 1670 1671 1672 1673 1674 1675
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
                 pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
        taosFreeQitem(pollRspWrapper);
      }
X
Xiaoyu Wang 已提交
1676
    } else {
L
fix  
Liu Jicong 已提交
1677
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1678
      bool reset = false;
L
Liu Jicong 已提交
1679 1680
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1681
      if (pollIfReset && reset) {
S
Shengliang Guan 已提交
1682
        tscDebug("consumer:%" PRId64 ", reset and repoll", tmq->consumerId);
1683
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
1684 1685 1686 1687 1688
      }
    }
  }
}

1689
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1690
  /*tscDebug("call poll1");*/
L
Liu Jicong 已提交
1691 1692
  void*   rspObj;
  int64_t startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1693

1694 1695 1696
#if 0
  tmqHandleAllDelayedTask(tmq);
  tmqPollImpl(tmq, timeout);
1697
  rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1698 1699
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1700
  }
1701
#endif
X
Xiaoyu Wang 已提交
1702

1703
  // in no topic status, delayed task also need to be processed
L
Liu Jicong 已提交
1704
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1705 1706 1707
    return NULL;
  }

L
Liu Jicong 已提交
1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
    int32_t retryCnt = 0;
    while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
      if (retryCnt++ > 10) {
        return NULL;
      }
      tscDebug("consumer not ready, retry");
      taosMsleep(500);
    }
  }

X
Xiaoyu Wang 已提交
1719
  while (1) {
L
Liu Jicong 已提交
1720
    tmqHandleAllDelayedTask(tmq);
1721
    if (tmqPollImpl(tmq, timeout) < 0) return NULL;
L
Liu Jicong 已提交
1722

1723
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1724 1725
    if (rspObj) {
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
1726 1727
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      return NULL;
X
Xiaoyu Wang 已提交
1728
    }
1729
    if (timeout != -1) {
X
Xiaoyu Wang 已提交
1730
      int64_t endTime = taosGetTimestampMs();
1731
      int64_t leftTime = endTime - startTime;
1732
      if (leftTime > timeout) {
S
Shengliang Guan 已提交
1733
        tscDebug("consumer:%" PRId64 ", (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
X
Xiaoyu Wang 已提交
1734 1735
        return NULL;
      }
1736
      tsem_timewait(&tmq->rspSem, leftTime * 1000);
L
Liu Jicong 已提交
1737 1738 1739
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
      tsem_timewait(&tmq->rspSem, 500 * 1000);
X
Xiaoyu Wang 已提交
1740 1741 1742 1743
    }
  }
}

L
Liu Jicong 已提交
1744
int32_t tmq_consumer_close(tmq_t* tmq) {
1745
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
L
Liu Jicong 已提交
1746 1747
    int32_t rsp = tmq_commit_sync(tmq, NULL);
    if (rsp != 0) {
L
Liu Jicong 已提交
1748
      return rsp;
1749 1750
    }

L
Liu Jicong 已提交
1751
    int32_t     retryCnt = 0;
1752
    tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
1753 1754 1755 1756 1757 1758 1759 1760 1761 1762
    while (1) {
      rsp = tmq_subscribe(tmq, lst);
      if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
        break;
      } else {
        retryCnt++;
        taosMsleep(500);
      }
    }

1763
    tmq_list_destroy(lst);
1764

L
Liu Jicong 已提交
1765 1766
    /*return rsp;*/
    return 0;
L
Liu Jicong 已提交
1767
  }
1768
  taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
L
Liu Jicong 已提交
1769
  return 0;
1770
}
L
Liu Jicong 已提交
1771

L
Liu Jicong 已提交
1772 1773
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
1774
    return "success";
L
Liu Jicong 已提交
1775
  } else if (err == -1) {
L
Liu Jicong 已提交
1776 1777 1778
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
1779 1780
  }
}
L
Liu Jicong 已提交
1781

L
Liu Jicong 已提交
1782 1783 1784 1785
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
wmmhello's avatar
wmmhello 已提交
1786 1787
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DELETE) {
1788
      return TMQ_RES_DATA;
wmmhello's avatar
wmmhello 已提交
1789
    }
L
Liu Jicong 已提交
1790
    return TMQ_RES_TABLE_META;
1791 1792
  } else if (TD_RES_TMQ_METADATA(res)) {
    return TMQ_RES_METADATA;
L
Liu Jicong 已提交
1793 1794 1795 1796 1797
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
1798
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
1799 1800
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
1801
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1802 1803 1804
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
1805 1806 1807
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1808 1809 1810 1811 1812
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1813 1814 1815 1816
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1817 1818 1819
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
1820 1821 1822
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1823 1824 1825 1826 1827
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1828 1829 1830 1831
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
1832 1833 1834
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
1835
  } else if (TD_RES_TMQ_METADATA(res)) {
1836 1837
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
1838 1839 1840 1841
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
1842 1843 1844 1845 1846 1847 1848 1849

const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
1850
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
1851 1852 1853 1854 1855 1856 1857 1858
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
      if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
          pRspObj->resIter >= pRspObj->rsp.blockNum) {
        return NULL;
      }
      return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
    }
L
Liu Jicong 已提交
1859 1860
  return NULL;
}
1861

L
Liu Jicong 已提交
1862
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
1863
  //
L
Liu Jicong 已提交
1864
  tmqCommitInner(tmq, msg, 0, 1, cb, param);
L
Liu Jicong 已提交
1865 1866
}

1867 1868
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) {
  //
L
Liu Jicong 已提交
1869
  return tmqCommitInner(tmq, msg, 0, 0, NULL, NULL);
1870
}