tmq.c 53.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "cJSON.h"
17 18 19
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22 23
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
24
#include "tqueue.h"
25
#include "tref.h"
L
Liu Jicong 已提交
26 27
#include "ttimer.h"

L
Liu Jicong 已提交
28 29 30
int32_t tmqAskEp(tmq_t* tmq, bool async);

typedef struct {
31 32 33
  int8_t  inited;
  tmr_h   timer;
  int32_t rsetId;
L
Liu Jicong 已提交
34 35 36
} SMqMgmt;

static SMqMgmt tmqMgmt = {0};
37

L
Liu Jicong 已提交
38 39 40 41 42 43
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
44 45 46
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
47 48
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
49
struct tmq_list_t {
L
Liu Jicong 已提交
50
  SArray container;
L
Liu Jicong 已提交
51
};
L
Liu Jicong 已提交
52

L
Liu Jicong 已提交
53
struct tmq_conf_t {
54 55 56 57 58
  char    clientId[256];
  char    groupId[TSDB_CGROUP_LEN];
  int8_t  autoCommit;
  int8_t  resetOffset;
  int8_t  withTbName;
L
Liu Jicong 已提交
59 60
  int8_t  snapEnable;
  int32_t snapBatchSize;
61 62 63

  bool hbBgEnable;

64 65 66 67 68
  uint16_t       port;
  int32_t        autoCommitInterval;
  char*          ip;
  char*          user;
  char*          pass;
69
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
70
  void*          commitCbUserParam;
L
Liu Jicong 已提交
71 72 73
};

struct tmq_t {
74
  int64_t refId;
L
Liu Jicong 已提交
75
  // conf
76 77 78 79 80 81 82 83 84 85 86
  char    groupId[TSDB_CGROUP_LEN];
  char    clientId[256];
  int8_t  withTbName;
  int8_t  useSnapshot;
  int8_t  autoCommit;
  int32_t autoCommitInterval;
  int32_t resetOffsetCfg;
  int64_t consumerId;

  bool hbBgEnable;

L
Liu Jicong 已提交
87 88
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
89 90 91 92

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
93 94
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
95
  int32_t epSkipCnt;
L
Liu Jicong 已提交
96
#endif
L
Liu Jicong 已提交
97 98
  int64_t pollCnt;

L
Liu Jicong 已提交
99
  // timer
100 101
  tmr_h hbLiveTimer;
  tmr_h epTimer;
L
Liu Jicong 已提交
102 103 104
  tmr_h reportTimer;
  tmr_h commitTimer;

L
Liu Jicong 已提交
105 106 107 108
  // connection
  STscObj* pTscObj;

  // container
L
Liu Jicong 已提交
109
  SArray*     clientTopics;  // SArray<SMqClientTopic>
L
Liu Jicong 已提交
110
  STaosQueue* mqueue;        // queue of rsp
L
Liu Jicong 已提交
111
  STaosQall*  qall;
L
Liu Jicong 已提交
112 113 114 115
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit

  // ctl
  tsem_t rspSem;
L
Liu Jicong 已提交
116 117
};

X
Xiaoyu Wang 已提交
118 119 120 121 122 123 124 125
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
126
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
127
  TMQ_CONSUMER_STATUS__RECOVER,
L
Liu Jicong 已提交
128 129
};

L
Liu Jicong 已提交
130
enum {
131
  TMQ_DELAYED_TASK__ASK_EP = 1,
L
Liu Jicong 已提交
132 133 134 135
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

L
Liu Jicong 已提交
136
typedef struct {
137 138 139
  // statistics
  int64_t pollCnt;
  // offset
L
Liu Jicong 已提交
140 141
  STqOffsetVal committedOffset;
  STqOffsetVal currentOffset;
L
Liu Jicong 已提交
142
  // connection info
143
  int32_t vgId;
X
Xiaoyu Wang 已提交
144
  int32_t vgStatus;
L
Liu Jicong 已提交
145
  int32_t vgSkipCnt;
146 147 148
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
149
typedef struct {
150
  // subscribe info
151 152
  char topicName[TSDB_TOPIC_FNAME_LEN];
  char db[TSDB_DB_FNAME_LEN];
L
Liu Jicong 已提交
153 154 155

  SArray* vgs;  // SArray<SMqClientVg>

L
Liu Jicong 已提交
156
  SSchemaWrapper schema;
157 158
} SMqClientTopic;

L
Liu Jicong 已提交
159 160 161 162 163
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
164
  union {
L
Liu Jicong 已提交
165 166
    SMqDataRsp dataRsp;
    SMqMetaRsp metaRsp;
L
Liu Jicong 已提交
167
    STaosxRsp  taosxRsp;
L
Liu Jicong 已提交
168
  };
L
Liu Jicong 已提交
169 170
} SMqPollRspWrapper;

L
Liu Jicong 已提交
171
typedef struct {
172 173
  int64_t refId;
  int32_t epoch;
L
Liu Jicong 已提交
174 175
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
176
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
177

L
Liu Jicong 已提交
178
typedef struct {
179 180
  int64_t refId;
  int32_t epoch;
L
Liu Jicong 已提交
181
  int32_t code;
L
Liu Jicong 已提交
182
  int32_t async;
X
Xiaoyu Wang 已提交
183
  tsem_t  rspSem;
184 185
} SMqAskEpCbParam;

L
Liu Jicong 已提交
186
typedef struct {
187 188
  int64_t         refId;
  int32_t         epoch;
L
Liu Jicong 已提交
189
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
190
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
191
  int32_t         vgId;
L
Liu Jicong 已提交
192
  tsem_t          rspSem;
X
Xiaoyu Wang 已提交
193
} SMqPollCbParam;
194

195
typedef struct {
196 197
  int64_t        refId;
  int32_t        epoch;
L
Liu Jicong 已提交
198 199
  int8_t         automatic;
  int8_t         async;
L
Liu Jicong 已提交
200 201
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
L
Liu Jicong 已提交
202
  int32_t        rspErr;
203
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
204 205 206 207
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
  void*  userParam;
  tsem_t rspSem;
208 209 210 211 212
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
  STqOffset*           pOffset;
213
} SMqCommitCbParam;
214

215
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
216
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
217
  conf->withTbName = false;
L
Liu Jicong 已提交
218
  conf->autoCommit = true;
L
Liu Jicong 已提交
219
  conf->autoCommitInterval = 5000;
X
Xiaoyu Wang 已提交
220
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
221
  conf->hbBgEnable = true;
222 223 224
  return conf;
}

L
Liu Jicong 已提交
225
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
226 227 228 229 230 231
  if (conf) {
    if (conf->ip) taosMemoryFree(conf->ip);
    if (conf->user) taosMemoryFree(conf->user);
    if (conf->pass) taosMemoryFree(conf->pass);
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
232 233 234
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
235 236
  if (strcmp(key, "group.id") == 0) {
    strcpy(conf->groupId, value);
L
Liu Jicong 已提交
237
    return TMQ_CONF_OK;
238
  }
L
Liu Jicong 已提交
239

240 241
  if (strcmp(key, "client.id") == 0) {
    strcpy(conf->clientId, value);
L
Liu Jicong 已提交
242 243
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
244

L
Liu Jicong 已提交
245 246
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
247
      conf->autoCommit = true;
L
Liu Jicong 已提交
248 249
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
250
      conf->autoCommit = false;
L
Liu Jicong 已提交
251 252 253 254
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
255
  }
L
Liu Jicong 已提交
256

L
Liu Jicong 已提交
257 258 259 260 261
  if (strcmp(key, "auto.commit.interval.ms") == 0) {
    conf->autoCommitInterval = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
262 263 264 265 266 267 268 269 270 271 272 273 274 275
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
276

277 278
  if (strcmp(key, "msg.with.table.name") == 0) {
    if (strcmp(value, "true") == 0) {
279
      conf->withTbName = true;
L
Liu Jicong 已提交
280
      return TMQ_CONF_OK;
281
    } else if (strcmp(value, "false") == 0) {
282
      conf->withTbName = false;
L
Liu Jicong 已提交
283
      return TMQ_CONF_OK;
284 285 286 287 288
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
289
  if (strcmp(key, "experimental.snapshot.enable") == 0) {
L
Liu Jicong 已提交
290
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
291
      conf->snapEnable = true;
292 293
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
294
      conf->snapEnable = false;
295 296 297 298 299 300
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
301 302 303 304 305
  if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
    conf->snapBatchSize = atoi(value);
    return TMQ_CONF_OK;
  }

306 307 308
  if (strcmp(key, "enable.heartbeat.background") == 0) {
    if (strcmp(value, "true") == 0) {
      conf->hbBgEnable = true;
L
Liu Jicong 已提交
309 310
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
311
      conf->hbBgEnable = false;
L
Liu Jicong 已提交
312 313 314 315
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
316
    return TMQ_CONF_OK;
L
Liu Jicong 已提交
317 318
  }

L
Liu Jicong 已提交
319
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
320 321 322
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
323
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
324 325 326
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
327
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
328 329 330
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
331
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
332 333 334
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
335
  if (strcmp(key, "td.connect.db") == 0) {
336
    /*conf->db = strdup(value);*/
L
Liu Jicong 已提交
337 338 339
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
340
  return TMQ_CONF_UNKNOWN;
341 342 343
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
344 345
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
346 347
}

L
Liu Jicong 已提交
348 349
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
350 351 352 353 354
  if (src == NULL || src[0] == 0) return -1;
  char* topic = strdup(src);
  if (topic[0] != '`') {
    strtolower(topic, src);
  }
L
fix  
Liu Jicong 已提交
355
  if (taosArrayPush(container, &topic) == NULL) return -1;
356 357 358
  return 0;
}

L
Liu Jicong 已提交
359
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
360
  SArray* container = &list->container;
L
Liu Jicong 已提交
361
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
362 363
}

L
Liu Jicong 已提交
364 365 366 367 368 369 370 371 372 373
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

L
Liu Jicong 已提交
374 375 376 377
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409
int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParamSet->refId);
  if (tmq == NULL) {
    if (!pParamSet->async) {
      tsem_destroy(&pParamSet->rspSem);
    }
    taosMemoryFree(pParamSet);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  // if no more waiting rsp
  if (pParamSet->async) {
    // call async cb func
    if (pParamSet->automatic && tmq->commitCb) {
      tmq->commitCb(tmq, pParamSet->rspErr, tmq->commitCbUserParam);
    } else if (!pParamSet->automatic && pParamSet->userCb) {
      // sem post
      pParamSet->userCb(tmq, pParamSet->rspErr, pParamSet->userParam);
    }
    taosMemoryFree(pParamSet);
  } else {
    tsem_post(&pParamSet->rspSem);
  }

#if 0
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
#endif
  return 0;
}

410 411
int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
412 413
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
  // push into array
L
Liu Jicong 已提交
414
#if 0
415 416 417 418 419
  if (code == 0) {
    taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset);
  } else {
    taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
  }
L
Liu Jicong 已提交
420
#endif
L
Liu Jicong 已提交
421

L
Liu Jicong 已提交
422 423 424
  taosMemoryFree(pParam->pOffset);
  if (pBuf->pData) taosMemoryFree(pBuf->pData);

S
Shengliang Guan 已提交
425
  /*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
L
Liu Jicong 已提交
426 427
   * pOffset->version);*/

428
  // count down waiting rsp
L
Liu Jicong 已提交
429
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
430 431 432
  ASSERT(waitingRspNum >= 0);

  if (waitingRspNum == 0) {
433
    tmqCommitDone(pParamSet);
434 435 436 437
  }
  return 0;
}

L
Liu Jicong 已提交
438 439 440 441 442 443
static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pTopic, SMqCommitCbParamSet* pParamSet) {
  STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
  if (pOffset == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
444
  pOffset->val = pVg->currentOffset;
445

L
Liu Jicong 已提交
446 447 448 449
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pOffset->subKey, tmq->groupId, groupLen);
  pOffset->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);
L
Liu Jicong 已提交
450

L
Liu Jicong 已提交
451 452 453 454 455 456 457 458 459 460
  int32_t len;
  int32_t code;
  tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
  if (code < 0) {
    ASSERT(0);
    return -1;
  }
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
  if (buf == NULL) return -1;
  ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
461

L
Liu Jicong 已提交
462 463 464 465 466 467 468
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
  tEncodeSTqOffset(&encoder, pOffset);

  // build param
469
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
470 471 472 473 474 475 476 477 478 479 480 481 482 483
  pParam->params = pParamSet;
  pParam->pOffset = pOffset;

  // build send info
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
    return -1;
  }
  pMsgSendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = sizeof(SMsgHead) + len,
      .handle = NULL,
  };

S
Shengliang Guan 已提交
484 485
  tscDebug("consumer:%" PRId64 ", commit offset of %s on vgId:%d, offset is %" PRId64, tmq->consumerId, pOffset->subKey,
           pVg->vgId, pOffset->val.version);
L
Liu Jicong 已提交
486 487

  // TODO: put into cb
L
Liu Jicong 已提交
488
  pVg->committedOffset = pVg->currentOffset;
L
Liu Jicong 已提交
489 490 491 492

  pMsgSendInfo->requestId = generateRequestId();
  pMsgSendInfo->requestObjRefId = 0;
  pMsgSendInfo->param = pParam;
L
Liu Jicong 已提交
493
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
494
  pMsgSendInfo->fp = tmqCommitCb;
L
Liu Jicong 已提交
495 496 497
  pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
  // send msg

L
Liu Jicong 已提交
498 499 500
  atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
  atomic_add_fetch_32(&pParamSet->totalRspNum, 1);

L
Liu Jicong 已提交
501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
  return 0;
}

int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
  char*   topic;
  int32_t vgId;
  ASSERT(msg != NULL);
  if (TD_RES_TMQ(msg)) {
    SMqRspObj* pRspObj = (SMqRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
  } else if (TD_RES_TMQ_META(msg)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
    topic = pMetaRspObj->topic;
    vgId = pMetaRspObj->vgId;
  } else {
    return TSDB_CODE_TMQ_INVALID_MSG;
  }

  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
527 528
  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;
L
Liu Jicong 已提交
529 530 531 532 533 534
  pParamSet->automatic = 0;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

L
Liu Jicong 已提交
535 536
  int32_t code = -1;

L
Liu Jicong 已提交
537 538 539 540 541 542 543
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(pTopic->topicName, topic) != 0) continue;
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      if (pVg->vgId != vgId) continue;

L
Liu Jicong 已提交
544
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
L
Liu Jicong 已提交
545 546
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          goto FAIL;
L
Liu Jicong 已提交
547
        }
L
Liu Jicong 已提交
548
        goto HANDLE_RSP;
L
Liu Jicong 已提交
549 550
      }
    }
L
Liu Jicong 已提交
551
  }
L
Liu Jicong 已提交
552

L
Liu Jicong 已提交
553 554 555 556
HANDLE_RSP:
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
557 558 559
    return 0;
  }

L
Liu Jicong 已提交
560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
    return code;
  } else {
    code = 0;
  }

FAIL:
  if (code != 0 && async) {
    userCb(tmq, code, userParam);
  }
  return 0;
}

L
Liu Jicong 已提交
576 577
int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                       void* userParam) {
L
Liu Jicong 已提交
578 579 580 581 582 583
  int32_t code = -1;

  if (msg != NULL) {
    return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
  }

584 585 586 587 588
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
589 590 591 592

  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;

593 594 595 596 597 598
  pParamSet->automatic = automatic;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

599 600 601
  // init as 1 to prevent concurrency issue
  pParamSet->waitingRspNum = 1;

602 603
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
604

S
Shengliang Guan 已提交
605
    tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgNum %d", tmq->consumerId, pTopic->topicName,
L
Liu Jicong 已提交
606 607
             (int32_t)taosArrayGetSize(pTopic->vgs));

608
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
L
Liu Jicong 已提交
609 610
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);

S
Shengliang Guan 已提交
611 612
      tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgId:%d", tmq->consumerId, pTopic->topicName,
               pVg->vgId);
L
Liu Jicong 已提交
613

L
Liu Jicong 已提交
614 615 616
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
        tscDebug("consumer: %ld, vg:%d, current %ld, committed %ld", tmq->consumerId, pVg->vgId,
                 pVg->currentOffset.version, pVg->committedOffset.version);
L
Liu Jicong 已提交
617 618 619
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          continue;
        }
620 621 622 623
      }
    }
  }

L
Liu Jicong 已提交
624 625 626 627 628 629
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
    return 0;
  }

630 631 632 633 634 635
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
  ASSERT(waitingRspNum >= 0);
  if (waitingRspNum == 0) {
    tmqCommitDone(pParamSet);
  }

636 637 638 639
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
640
    taosMemoryFree(pParamSet);
641 642 643 644 645 646
  } else {
    code = 0;
  }

  if (code != 0 && async) {
    if (automatic) {
L
Liu Jicong 已提交
647
      tmq->commitCb(tmq, code, tmq->commitCbUserParam);
648
    } else {
L
Liu Jicong 已提交
649
      userCb(tmq, code, userParam);
650 651 652
    }
  }

L
Liu Jicong 已提交
653
#if 0
654 655 656 657
  if (!async) {
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
  }
L
Liu Jicong 已提交
658
#endif
659 660 661 662

  return 0;
}

663
void tmqAssignAskEpTask(void* param, void* tmrId) {
664 665 666 667 668 669 670 671 672
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
    *pTaskType = TMQ_DELAYED_TASK__ASK_EP;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
673 674 675
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
676 677 678 679 680 681 682 683 684
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
    *pTaskType = TMQ_DELAYED_TASK__COMMIT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
685 686 687
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
688 689 690 691 692 693 694 695 696
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
    *pTaskType = TMQ_DELAYED_TASK__REPORT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
697 698
}

699 700 701 702 703 704
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
  if (pMsg && pMsg->pData) taosMemoryFree(pMsg->pData);
  return 0;
}

void tmqSendHbReq(void* param, void* tmrId) {
705 706 707 708 709
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq == NULL) {
    return;
  }
710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738
  int64_t   consumerId = tmq->consumerId;
  int32_t   epoch = tmq->epoch;
  SMqHbReq* pReq = taosMemoryMalloc(sizeof(SMqHbReq));
  if (pReq == NULL) goto OVER;
  pReq->consumerId = consumerId;
  pReq->epoch = epoch;

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pReq);
  }
  sendInfo->msgInfo = (SDataBuf){
      .pData = pReq,
      .len = sizeof(SMqHbReq),
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = NULL;
  sendInfo->fp = tmqHbCb;
  sendInfo->msgType = TDMT_MND_MQ_HB;

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

OVER:
739
  taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer);
740 741
}

L
Liu Jicong 已提交
742 743 744 745 746 747 748 749
int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
  STaosQall* qall = taosAllocateQall();
  taosReadAllQitems(tmq->delayedTask, qall);
  while (1) {
    int8_t* pTaskType = NULL;
    taosGetQitem(qall, (void**)&pTaskType);
    if (pTaskType == NULL) break;

750
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
L
Liu Jicong 已提交
751
      tmqAskEp(tmq, true);
752 753 754 755 756

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
      *pRefId = tmq->refId;

      taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &tmq->epTimer);
L
Liu Jicong 已提交
757
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
L
Liu Jicong 已提交
758
      tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
759 760 761 762 763

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
      *pRefId = tmq->refId;

      taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId, tmqMgmt.timer, &tmq->commitTimer);
L
Liu Jicong 已提交
764 765 766 767
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    } else {
      ASSERT(0);
    }
L
Liu Jicong 已提交
768
    taosFreeQitem(pTaskType);
L
Liu Jicong 已提交
769 770 771 772 773
  }
  taosFreeQall(qall);
  return 0;
}

L
Liu Jicong 已提交
774
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
775
  SMqRspWrapper* msg = NULL;
L
Liu Jicong 已提交
776 777 778 779 780 781 782 783
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }

L
fix  
Liu Jicong 已提交
784
  msg = NULL;
L
Liu Jicong 已提交
785 786 787 788 789 790 791 792 793 794
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }
}

D
dapan1121 已提交
795
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
796 797 798 799 800
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
  tsem_post(&pParam->rspSem);
  return 0;
}
801

L
Liu Jicong 已提交
802
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
X
Xiaoyu Wang 已提交
803 804 805 806
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
807
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
808
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
809
  }
L
Liu Jicong 已提交
810
  return 0;
X
Xiaoyu Wang 已提交
811 812
}

L
Liu Jicong 已提交
813 814 815
int32_t tmq_unsubscribe(tmq_t* tmq) {
  tmq_list_t* lst = tmq_list_new();
  int32_t     rsp = tmq_subscribe(tmq, lst);
L
Liu Jicong 已提交
816 817
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
818 819
}

820 821
void tmqFreeImpl(void* handle) {
  tmq_t* tmq = (tmq_t*)handle;
L
Liu Jicong 已提交
822

823 824 825 826
  // TODO stop timer
  if (tmq->mqueue) taosCloseQueue(tmq->mqueue);
  if (tmq->delayedTask) taosCloseQueue(tmq->delayedTask);
  if (tmq->qall) taosFreeQall(tmq->qall);
L
Liu Jicong 已提交
827

828
  tsem_destroy(&tmq->rspSem);
L
Liu Jicong 已提交
829

830 831 832 833 834 835 836 837 838 839
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (pTopic->schema.nCols) taosMemoryFree(pTopic->schema.pSchema);
    int32_t vgSz = taosArrayGetSize(pTopic->vgs);
    taosArrayDestroy(pTopic->vgs);
  }
  taosArrayDestroy(tmq->clientTopics);
  taos_close_internal(tmq->pTscObj);
  taosMemoryFree(tmq);
L
Liu Jicong 已提交
840 841
}

L
Liu Jicong 已提交
842
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
L
Liu Jicong 已提交
843 844 845 846 847 848 849 850 851
  // init timer
  int8_t inited = atomic_val_compare_exchange_8(&tmqMgmt.inited, 0, 1);
  if (inited == 0) {
    tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
    if (tmqMgmt.timer == NULL) {
      atomic_store_8(&tmqMgmt.inited, 0);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
852
    tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
L
Liu Jicong 已提交
853 854
  }

L
Liu Jicong 已提交
855 856
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
L
Liu Jicong 已提交
857
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
858 859
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
860 861
    return NULL;
  }
L
Liu Jicong 已提交
862

L
Liu Jicong 已提交
863 864 865 866 867
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

  ASSERT(user);
  ASSERT(pass);
L
Liu Jicong 已提交
868
  ASSERT(conf->groupId[0]);
L
Liu Jicong 已提交
869

L
Liu Jicong 已提交
870 871 872 873 874 875
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
  pTmq->delayedTask = taosOpenQueue();

  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
L
Liu Jicong 已提交
876
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
877 878
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
879 880
    goto FAIL;
  }
L
Liu Jicong 已提交
881

L
Liu Jicong 已提交
882 883
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
884 885
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
886 887
  /*pTmq->epStatus = 0;*/
  /*pTmq->epSkipCnt = 0;*/
L
Liu Jicong 已提交
888

L
Liu Jicong 已提交
889 890 891
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
892
  pTmq->withTbName = conf->withTbName;
L
Liu Jicong 已提交
893
  pTmq->useSnapshot = conf->snapEnable;
L
Liu Jicong 已提交
894
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
895
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
896 897
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
898 899
  pTmq->resetOffsetCfg = conf->resetOffset;

900 901
  pTmq->hbBgEnable = conf->hbBgEnable;

L
Liu Jicong 已提交
902
  // assign consumerId
L
Liu Jicong 已提交
903
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
904

L
Liu Jicong 已提交
905 906
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
S
Shengliang Guan 已提交
907 908
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
909 910
    goto FAIL;
  }
L
Liu Jicong 已提交
911

L
Liu Jicong 已提交
912 913 914
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
S
Shengliang Guan 已提交
915 916
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
917 918 919
    tsem_destroy(&pTmq->rspSem);
    goto FAIL;
  }
L
Liu Jicong 已提交
920

921 922 923 924 925 926 927 928 929
  pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
  if (pTmq->refId < 0) {
    tmqFreeImpl(pTmq);
    return NULL;
  }

  int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
  *pRefId = pTmq->refId;

930
  if (pTmq->hbBgEnable) {
931
    pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
932 933
  }

S
Shengliang Guan 已提交
934
  tscInfo("consumer %" PRId64 " is setup, consumer group %s", pTmq->consumerId, pTmq->groupId);
L
Liu Jicong 已提交
935

936
  return pTmq;
L
Liu Jicong 已提交
937 938 939 940 941 942 943 944

FAIL:
  if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
  if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
  if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
  if (pTmq->qall) taosFreeQall(pTmq->qall);
  taosMemoryFree(pTmq);
  return NULL;
945 946
}

L
Liu Jicong 已提交
947
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
L
Liu Jicong 已提交
948 949 950 951 952
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
  SCMSubscribeReq req = {0};
  int32_t         code = -1;
953 954

  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
955
  tstrncpy(req.clientId, tmq->clientId, 256);
L
Liu Jicong 已提交
956
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
957
  req.topicNames = taosArrayInit(sz, sizeof(void*));
L
Liu Jicong 已提交
958
  if (req.topicNames == NULL) goto FAIL;
959

L
Liu Jicong 已提交
960 961
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
962 963

    SName name = {0};
L
Liu Jicong 已提交
964
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
965

L
Liu Jicong 已提交
966 967 968
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
969
    }
L
Liu Jicong 已提交
970
    tNameExtractFullName(&name, topicFName);
971

L
Liu Jicong 已提交
972 973 974
    tscDebug("subscribe topic: %s", topicFName);

    taosArrayPush(req.topicNames, &topicFName);
975 976
  }

L
Liu Jicong 已提交
977 978 979 980
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) goto FAIL;

981 982 983
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

984
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
985
  if (sendInfo == NULL) goto FAIL;
986

X
Xiaoyu Wang 已提交
987
  SMqSubscribeCbParam param = {
L
Liu Jicong 已提交
988
      .rspErr = 0,
989 990
      .refId = tmq->refId,
      .epoch = tmq->epoch,
X
Xiaoyu Wang 已提交
991
  };
L
Liu Jicong 已提交
992

L
Liu Jicong 已提交
993 994 995
  if (tsem_init(&param.rspSem, 0, 0) != 0) goto FAIL;

  sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
996 997 998 999
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
1000

L
Liu Jicong 已提交
1001 1002
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1003 1004
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
1005 1006
  sendInfo->msgType = TDMT_MND_SUBSCRIBE;

1007 1008 1009 1010 1011
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
1012 1013 1014
  // avoid double free if msg is sent
  buf = NULL;

L
Liu Jicong 已提交
1015 1016
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
1017

L
Liu Jicong 已提交
1018 1019 1020
  code = param.rspErr;
  if (code != 0) goto FAIL;

L
Liu Jicong 已提交
1021
  int32_t retryCnt = 0;
L
Liu Jicong 已提交
1022
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
L
Liu Jicong 已提交
1023 1024 1025
    if (retryCnt++ > 10) {
      goto FAIL;
    }
L
fix  
Liu Jicong 已提交
1026
    tscDebug("consumer not ready, retry");
L
Liu Jicong 已提交
1027 1028
    taosMsleep(500);
  }
1029

1030 1031
  // init ep timer
  if (tmq->epTimer == NULL) {
1032 1033 1034
    int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId1 = tmq->refId;
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, pRefId1, tmqMgmt.timer);
1035
  }
L
Liu Jicong 已提交
1036 1037

  // init auto commit timer
1038
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
1039 1040 1041
    int64_t* pRefId2 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId2 = tmq->refId;
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId2, tmqMgmt.timer);
L
Liu Jicong 已提交
1042 1043
  }

L
Liu Jicong 已提交
1044 1045 1046
  code = 0;
FAIL:
  if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
1047
  if (code != 0 && buf) {
L
Liu Jicong 已提交
1048 1049 1050
    taosMemoryFree(buf);
  }
  return code;
1051 1052
}

L
Liu Jicong 已提交
1053
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
1054
  //
1055
  conf->commitCb = cb;
L
Liu Jicong 已提交
1056
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
1057
}
1058

D
dapan1121 已提交
1059
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1060 1061
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
1062
  SMqClientTopic* pTopic = pParam->pTopic;
1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074

  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
  if (tmq == NULL) {
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
    taosMemoryFree(pMsg->pData);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  int32_t epoch = pParam->epoch;
  int32_t vgId = pParam->vgId;
L
Liu Jicong 已提交
1075
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
1076
  if (code != 0) {
L
Liu Jicong 已提交
1077
    tscWarn("msg discard from vgId:%d, epoch %d, since %s", vgId, epoch, terrstr());
L
Liu Jicong 已提交
1078
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1079 1080 1081 1082
    if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
      goto CREATE_MSG_FAIL;
    }
L
Liu Jicong 已提交
1083 1084 1085 1086
    if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
      if (pRspWrapper == NULL) {
        taosMemoryFree(pMsg->pData);
S
Shengliang Guan 已提交
1087
        tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
Liu Jicong 已提交
1088 1089 1090 1091 1092 1093 1094 1095
        goto CREATE_MSG_FAIL;
      }
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      /*pRspWrapper->vgHandle = pVg;*/
      /*pRspWrapper->topicHandle = pTopic;*/
      taosWriteQitem(tmq->mqueue, pRspWrapper);
      tsem_post(&tmq->rspSem);
    }
L
fix txn  
Liu Jicong 已提交
1096
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1097 1098
  }

X
Xiaoyu Wang 已提交
1099 1100 1101
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
1102
    // do not write into queue since updating epoch reset
S
Shengliang Guan 已提交
1103
    tscWarn("msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d", vgId, msgEpoch,
L
Liu Jicong 已提交
1104
            tmqEpoch);
1105
    tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1106
    taosMemoryFree(pMsg->pData);
X
Xiaoyu Wang 已提交
1107 1108 1109 1110
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
S
Shengliang Guan 已提交
1111
    tscWarn("mismatch rsp from vgId:%d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
X
Xiaoyu Wang 已提交
1112 1113
  }

L
Liu Jicong 已提交
1114 1115 1116
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

1117
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1118
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1119
    taosMemoryFree(pMsg->pData);
S
Shengliang Guan 已提交
1120
    tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
fix txn  
Liu Jicong 已提交
1121
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1122
  }
L
Liu Jicong 已提交
1123

L
Liu Jicong 已提交
1124
  pRspWrapper->tmqRspType = rspType;
L
Liu Jicong 已提交
1125 1126
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
1127

L
Liu Jicong 已提交
1128
  if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1129 1130 1131
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
wmmhello's avatar
wmmhello 已提交
1132
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1133
    memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1134 1135 1136 1137 1138 1139

    tscDebug("consumer:%" PRId64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d",
             tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version,
             rspType);

  } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
1140 1141 1142 1143
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqMetaRsp(&decoder, &pRspWrapper->metaRsp);
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1144
    memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1145 1146 1147 1148 1149 1150 1151 1152
  } else if (rspType == TMQ_MSG_TYPE__TAOSX_RSP) {
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp);
    tDecoderClear(&decoder);
    memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead));
  } else {
    ASSERT(0);
L
Liu Jicong 已提交
1153
  }
L
Liu Jicong 已提交
1154

L
Liu Jicong 已提交
1155
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1156

L
Liu Jicong 已提交
1157
  taosWriteQitem(tmq->mqueue, pRspWrapper);
1158
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1159

L
Liu Jicong 已提交
1160
  return 0;
L
fix txn  
Liu Jicong 已提交
1161
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1162
  if (epoch == tmq->epoch) {
L
Liu Jicong 已提交
1163 1164
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
1165
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1166
  return -1;
1167 1168
}

L
Liu Jicong 已提交
1169
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
1170 1171 1172 1173
  bool set = false;

  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
S
Shengliang Guan 已提交
1174
  tscDebug("consumer:%" PRId64 ", update ep epoch %d to epoch %d, topic num:%d", tmq->consumerId, tmq->epoch, epoch,
1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192
           topicNumGet);

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
S
Shengliang Guan 已提交
1193
      tscDebug("consumer:%" PRId64 ", new vg num: %d", tmq->consumerId, vgNumCur);
1194 1195 1196
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
        sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
L
Liu Jicong 已提交
1197
        char buf[80];
L
Liu Jicong 已提交
1198
        tFormatOffset(buf, 80, &pVgCur->currentOffset);
L
Liu Jicong 已提交
1199 1200
        tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch,
                 pVgCur->vgId, vgKey, buf);
L
Liu Jicong 已提交
1201
        taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal));
1202 1203 1204 1205 1206 1207 1208 1209
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
    topic.schema = pTopicEp->schema;
1210
    tstrncpy(topic.topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
1211 1212
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);

S
Shengliang Guan 已提交
1213
    tscDebug("consumer:%" PRId64 ", update topic: %s", tmq->consumerId, topic.topicName);
1214 1215 1216 1217 1218 1219

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
L
Liu Jicong 已提交
1220 1221
      STqOffsetVal* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      STqOffsetVal  offsetNew = {.type = tmq->resetOffsetCfg};
1222
      if (pOffset != NULL) {
L
Liu Jicong 已提交
1223
        offsetNew = *pOffset;
1224 1225 1226 1227
      }

      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1228
          .currentOffset = offsetNew,
1229 1230 1231 1232 1233 1234 1235 1236 1237 1238
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
          .vgSkipCnt = 0,
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
    taosArrayPush(newTopics, &topic);
  }
1239 1240 1241 1242 1243 1244 1245
  if (tmq->clientTopics) {
    int32_t sz = taosArrayGetSize(tmq->clientTopics);
    for (int32_t i = 0; i < sz; i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
      if (pTopic->schema.nCols) taosMemoryFree(pTopic->schema.pSchema);
      int32_t vgSz = taosArrayGetSize(pTopic->vgs);
      taosArrayDestroy(pTopic->vgs);
L
Liu Jicong 已提交
1246
    }
1247
    taosArrayDestroy(tmq->clientTopics);
X
Xiaoyu Wang 已提交
1248
  }
L
Liu Jicong 已提交
1249
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
1250
  tmq->clientTopics = newTopics;
1251

1252 1253 1254 1255
  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
1256

X
Xiaoyu Wang 已提交
1257 1258 1259 1260
  atomic_store_32(&tmq->epoch, epoch);
  return set;
}

D
dapan1121 已提交
1261
int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
1262
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1263
  int8_t           async = pParam->async;
1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276
  tmq_t*           tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);

  if (tmq == NULL) {
    if (!async) {
      tsem_destroy(&pParam->rspSem);
    } else {
      taosMemoryFree(pParam);
    }
    taosMemoryFree(pMsg->pData);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

L
Liu Jicong 已提交
1277
  pParam->code = code;
1278
  if (code != 0) {
S
Shengliang Guan 已提交
1279
    tscError("consumer:%" PRId64 ", get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->async);
L
Liu Jicong 已提交
1280
    goto END;
1281
  }
L
Liu Jicong 已提交
1282

L
Liu Jicong 已提交
1283
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1284
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1285
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1286 1287
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
S
Shengliang Guan 已提交
1288
  tscDebug("consumer:%" PRId64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1289 1290
  if (head->epoch <= epoch) {
    goto END;
1291
  }
L
Liu Jicong 已提交
1292

L
Liu Jicong 已提交
1293
  if (!async) {
L
Liu Jicong 已提交
1294 1295
    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
S
Shengliang Guan 已提交
1296 1297
    /*printf("rsp epoch %" PRId64 " sz %" PRId64 "\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %" PRId64 " sz %" PRId64 "\n", tmq->epoch, tmq->clientTopics->size);*/
L
Liu Jicong 已提交
1298
    tmqUpdateEp(tmq, head->epoch, &rsp);
L
Liu Jicong 已提交
1299
    tDeleteSMqAskEpRsp(&rsp);
X
Xiaoyu Wang 已提交
1300
  } else {
1301
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1302
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1303
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1304 1305
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1306
    }
L
Liu Jicong 已提交
1307 1308 1309
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1310
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1311

L
Liu Jicong 已提交
1312
    taosWriteQitem(tmq->mqueue, pWrapper);
1313
    tsem_post(&tmq->rspSem);
1314
  }
L
Liu Jicong 已提交
1315 1316

END:
L
Liu Jicong 已提交
1317
  /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1318
  if (!async) {
L
Liu Jicong 已提交
1319
    tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
1320 1321
  } else {
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
1322
  }
L
Liu Jicong 已提交
1323
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1324
  return code;
1325 1326
}

L
Liu Jicong 已提交
1327
int32_t tmqAskEp(tmq_t* tmq, bool async) {
L
Liu Jicong 已提交
1328
  int32_t code = 0;
L
Liu Jicong 已提交
1329
#if 0
L
Liu Jicong 已提交
1330
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
L
fix txn  
Liu Jicong 已提交
1331
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
1332
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
S
Shengliang Guan 已提交
1333
    tscTrace("consumer:%" PRId64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
1334
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
1335
  }
L
temp  
Liu Jicong 已提交
1336
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
1337
#endif
L
Liu Jicong 已提交
1338
  int32_t      tlen = sizeof(SMqAskEpReq);
L
Liu Jicong 已提交
1339
  SMqAskEpReq* req = taosMemoryCalloc(1, tlen);
L
Liu Jicong 已提交
1340
  if (req == NULL) {
L
Liu Jicong 已提交
1341
    tscError("failed to malloc get subscribe ep buf");
L
Liu Jicong 已提交
1342
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1343
    return -1;
L
Liu Jicong 已提交
1344
  }
L
Liu Jicong 已提交
1345 1346 1347
  req->consumerId = htobe64(tmq->consumerId);
  req->epoch = htonl(tmq->epoch);
  strcpy(req->cgroup, tmq->groupId);
1348

L
Liu Jicong 已提交
1349
  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
1350 1351
  if (pParam == NULL) {
    tscError("failed to malloc subscribe param");
wafwerar's avatar
wafwerar 已提交
1352
    taosMemoryFree(req);
L
Liu Jicong 已提交
1353
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1354
    return -1;
L
Liu Jicong 已提交
1355
  }
1356 1357
  pParam->refId = tmq->refId;
  pParam->epoch = tmq->epoch;
L
Liu Jicong 已提交
1358
  pParam->async = async;
X
Xiaoyu Wang 已提交
1359
  tsem_init(&pParam->rspSem, 0, 0);
1360

1361
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1362 1363
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1364 1365
    taosMemoryFree(pParam);
    taosMemoryFree(req);
L
Liu Jicong 已提交
1366
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1367 1368 1369 1370 1371 1372 1373 1374 1375 1376
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
      .pData = req,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
L
Liu Jicong 已提交
1377 1378 1379
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1380
  sendInfo->msgType = TDMT_MND_MQ_ASK_EP;
1381

L
Liu Jicong 已提交
1382 1383
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

S
Shengliang Guan 已提交
1384
  tscDebug("consumer:%" PRId64 ", ask ep", tmq->consumerId);
L
add log  
Liu Jicong 已提交
1385

L
Liu Jicong 已提交
1386 1387
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1388

L
Liu Jicong 已提交
1389
  if (!async) {
L
Liu Jicong 已提交
1390 1391 1392 1393 1394
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }
  return code;
1395 1396
}

1397
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1398
  SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq));
1399 1400 1401
  if (pReq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
1402

L
Liu Jicong 已提交
1403 1404 1405
  /*strcpy(pReq->topic, pTopic->topicName);*/
  /*strcpy(pReq->cgroup, tmq->groupId);*/

L
Liu Jicong 已提交
1406 1407 1408 1409
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1410

1411
  pReq->withTbName = tmq->withTbName;
1412
  pReq->timeout = timeout;
L
Liu Jicong 已提交
1413
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1414
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1415
  /*pReq->currentOffset = reqOffset;*/
L
Liu Jicong 已提交
1416
  pReq->reqOffset = pVg->currentOffset;
L
Liu Jicong 已提交
1417
  pReq->reqId = generateRequestId();
1418

L
Liu Jicong 已提交
1419 1420
  pReq->useSnapshot = tmq->useSnapshot;

1421
  pReq->head.vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
1422
  pReq->head.contLen = htonl(sizeof(SMqPollReq));
1423 1424 1425
  return pReq;
}

L
Liu Jicong 已提交
1426 1427
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
L
Liu Jicong 已提交
1428
  pRspObj->resType = RES_TYPE__TMQ_META;
L
Liu Jicong 已提交
1429 1430 1431 1432 1433 1434 1435 1436
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

L
Liu Jicong 已提交
1437 1438 1439
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1440 1441
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1442
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1443
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1444
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
L
Liu Jicong 已提交
1445

L
Liu Jicong 已提交
1446 1447
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
L
Liu Jicong 已提交
1448
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1449 1450
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1451

L
Liu Jicong 已提交
1452
  return pRspObj;
X
Xiaoyu Wang 已提交
1453 1454
}

L
Liu Jicong 已提交
1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
  pRspObj->resType = RES_TYPE__TAOSX;
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;
  pRspObj->resIter = -1;
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqTaosxRspObj));

  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
  if (!pWrapper->dataRsp.withSchema) {
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }

  return pRspObj;
}

1473
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1474
  /*tscDebug("call poll");*/
X
Xiaoyu Wang 已提交
1475 1476 1477 1478 1479 1480
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
      if (vgStatus != TMQ_VG_STATUS__IDLE) {
L
Liu Jicong 已提交
1481
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
L
Liu Jicong 已提交
1482 1483
        tscTrace("consumer:%" PRId64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId,
                 vgSkipCnt);
X
Xiaoyu Wang 已提交
1484
        continue;
L
Liu Jicong 已提交
1485
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1486 1487 1488 1489
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
S
Shengliang Guan 已提交
1490
        tscDebug("consumer:%" PRId64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId);
L
temp  
Liu Jicong 已提交
1491 1492
        }
#endif
X
Xiaoyu Wang 已提交
1493
      }
L
Liu Jicong 已提交
1494
      atomic_store_32(&pVg->vgSkipCnt, 0);
1495
      SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, timeout, pTopic, pVg);
X
Xiaoyu Wang 已提交
1496 1497
      if (pReq == NULL) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1498
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1499 1500
        return -1;
      }
wafwerar's avatar
wafwerar 已提交
1501
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1502
      if (pParam == NULL) {
wafwerar's avatar
wafwerar 已提交
1503
        taosMemoryFree(pReq);
X
Xiaoyu Wang 已提交
1504
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1505
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1506 1507
        return -1;
      }
1508 1509 1510
      pParam->refId = tmq->refId;
      pParam->epoch = tmq->epoch;

L
Liu Jicong 已提交
1511
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1512
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1513
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1514

1515
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1516
      if (sendInfo == NULL) {
wafwerar's avatar
wafwerar 已提交
1517 1518
        taosMemoryFree(pReq);
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1519
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1520
        tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1521 1522 1523 1524
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1525
          .pData = pReq,
L
Liu Jicong 已提交
1526
          .len = sizeof(SMqPollReq),
X
Xiaoyu Wang 已提交
1527 1528
          .handle = NULL,
      };
L
Liu Jicong 已提交
1529
      sendInfo->requestId = pReq->reqId;
X
Xiaoyu Wang 已提交
1530
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1531
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1532
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1533
      sendInfo->msgType = TDMT_VND_CONSUME;
X
Xiaoyu Wang 已提交
1534 1535

      int64_t transporterId = 0;
L
fix  
Liu Jicong 已提交
1536
      /*printf("send poll\n");*/
L
Liu Jicong 已提交
1537

1538
      char offsetFormatBuf[80];
L
Liu Jicong 已提交
1539
      tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffset);
L
Liu Jicong 已提交
1540 1541
      tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64,
               tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId);
S
Shengliang Guan 已提交
1542
      /*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/
X
Xiaoyu Wang 已提交
1543 1544 1545 1546 1547 1548 1549 1550
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
  return 0;
}

L
Liu Jicong 已提交
1551 1552
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1553
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1554 1555
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1556
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
L
Liu Jicong 已提交
1557
      tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1558
      /*tmqClearUnhandleMsg(tmq);*/
X
Xiaoyu Wang 已提交
1559 1560 1561 1562 1563 1564 1565 1566 1567 1568
      *pReset = true;
    } else {
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
1569
void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
X
Xiaoyu Wang 已提交
1570
  while (1) {
L
Liu Jicong 已提交
1571 1572 1573
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1574
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1575 1576
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
      if (rspWrapper == NULL) return NULL;
X
Xiaoyu Wang 已提交
1577 1578
    }

L
Liu Jicong 已提交
1579 1580 1581 1582 1583
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(rspWrapper);
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
      return NULL;
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1584
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1585
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
1586
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1587
      if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1588
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1589
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1590
         * rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1591
        pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;
X
Xiaoyu Wang 已提交
1592
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1593
        if (pollRspWrapper->dataRsp.blockNum == 0) {
L
Liu Jicong 已提交
1594 1595
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1596 1597
          continue;
        }
L
Liu Jicong 已提交
1598
        // build rsp
L
Liu Jicong 已提交
1599
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1600
        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1601
        return pRsp;
X
Xiaoyu Wang 已提交
1602
      } else {
L
Liu Jicong 已提交
1603 1604 1605 1606 1607 1608 1609
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
                 pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
        taosFreeQitem(pollRspWrapper);
      }
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1610
      if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1611
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1612
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1613
         * rspMsg->msg.rspOffset);*/
wmmhello's avatar
wmmhello 已提交
1614
        pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset;
L
Liu Jicong 已提交
1615 1616
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // build rsp
L
Liu Jicong 已提交
1617
        SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1618 1619 1620 1621
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
L
Liu Jicong 已提交
1622
                 pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1623
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1624
      }
L
Liu Jicong 已提交
1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
      if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
         * rspMsg->msg.rspOffset);*/
        pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset;
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        if (pollRspWrapper->taosxRsp.blockNum == 0) {
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
          continue;
        }
        // build rsp
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
                 pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
        taosFreeQitem(pollRspWrapper);
      }
X
Xiaoyu Wang 已提交
1649
    } else {
L
fix  
Liu Jicong 已提交
1650
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1651
      bool reset = false;
L
Liu Jicong 已提交
1652 1653
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1654
      if (pollIfReset && reset) {
S
Shengliang Guan 已提交
1655
        tscDebug("consumer:%" PRId64 ", reset and repoll", tmq->consumerId);
1656
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
1657 1658 1659 1660 1661
      }
    }
  }
}

1662
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1663
  /*tscDebug("call poll1");*/
L
Liu Jicong 已提交
1664 1665
  void*   rspObj;
  int64_t startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1666

1667 1668 1669
#if 0
  tmqHandleAllDelayedTask(tmq);
  tmqPollImpl(tmq, timeout);
1670
  rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1671 1672
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1673
  }
1674
#endif
X
Xiaoyu Wang 已提交
1675

1676
  // in no topic status, delayed task also need to be processed
L
Liu Jicong 已提交
1677
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1678 1679 1680
    return NULL;
  }

L
Liu Jicong 已提交
1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
    int32_t retryCnt = 0;
    while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
      if (retryCnt++ > 10) {
        return NULL;
      }
      tscDebug("consumer not ready, retry");
      taosMsleep(500);
    }
  }

X
Xiaoyu Wang 已提交
1692
  while (1) {
L
Liu Jicong 已提交
1693
    tmqHandleAllDelayedTask(tmq);
1694
    if (tmqPollImpl(tmq, timeout) < 0) return NULL;
L
Liu Jicong 已提交
1695

1696
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1697 1698
    if (rspObj) {
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
1699 1700
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      return NULL;
X
Xiaoyu Wang 已提交
1701
    }
1702
    if (timeout != -1) {
X
Xiaoyu Wang 已提交
1703
      int64_t endTime = taosGetTimestampMs();
1704
      int64_t leftTime = endTime - startTime;
1705
      if (leftTime > timeout) {
S
Shengliang Guan 已提交
1706
        tscDebug("consumer:%" PRId64 ", (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
X
Xiaoyu Wang 已提交
1707 1708
        return NULL;
      }
1709
      tsem_timewait(&tmq->rspSem, leftTime * 1000);
L
Liu Jicong 已提交
1710 1711 1712
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
      tsem_timewait(&tmq->rspSem, 500 * 1000);
X
Xiaoyu Wang 已提交
1713 1714 1715 1716
    }
  }
}

L
Liu Jicong 已提交
1717
int32_t tmq_consumer_close(tmq_t* tmq) {
1718
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
L
Liu Jicong 已提交
1719 1720
    int32_t rsp = tmq_commit_sync(tmq, NULL);
    if (rsp != 0) {
L
Liu Jicong 已提交
1721
      return rsp;
1722 1723
    }

L
Liu Jicong 已提交
1724
    int32_t     retryCnt = 0;
1725
    tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
1726 1727 1728 1729 1730 1731 1732 1733 1734 1735
    while (1) {
      rsp = tmq_subscribe(tmq, lst);
      if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
        break;
      } else {
        retryCnt++;
        taosMsleep(500);
      }
    }

1736
    tmq_list_destroy(lst);
1737

L
Liu Jicong 已提交
1738 1739
    /*return rsp;*/
    return 0;
L
Liu Jicong 已提交
1740
  }
1741
  taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
L
Liu Jicong 已提交
1742
  return 0;
1743
}
L
Liu Jicong 已提交
1744

L
Liu Jicong 已提交
1745 1746
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
1747
    return "success";
L
Liu Jicong 已提交
1748
  } else if (err == -1) {
L
Liu Jicong 已提交
1749 1750 1751
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
1752 1753
  }
}
L
Liu Jicong 已提交
1754

L
Liu Jicong 已提交
1755 1756 1757 1758
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
wmmhello's avatar
wmmhello 已提交
1759 1760
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DELETE) {
L
Liu Jicong 已提交
1761
      return TMQ_RES_TAOSX;
wmmhello's avatar
wmmhello 已提交
1762
    }
L
Liu Jicong 已提交
1763
    return TMQ_RES_TABLE_META;
L
Liu Jicong 已提交
1764 1765
  } else if (TD_RES_TMQ_TAOSX(res)) {
    return TMQ_RES_TAOSX;
L
Liu Jicong 已提交
1766 1767 1768 1769 1770
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
1771
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
1772 1773
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
1774
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1775 1776 1777
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1778 1779 1780 1781 1782
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1783 1784 1785 1786
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1787 1788 1789
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1790 1791 1792 1793 1794
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1795 1796 1797 1798
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
1799 1800 1801
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
L
Liu Jicong 已提交
1802 1803 1804 1805
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
1806 1807 1808 1809 1810 1811 1812 1813

const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
1814
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
L
Liu Jicong 已提交
1815 1816 1817
  }
  return NULL;
}
1818

L
Liu Jicong 已提交
1819
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
1820
  //
L
Liu Jicong 已提交
1821
  tmqCommitInner(tmq, msg, 0, 1, cb, param);
L
Liu Jicong 已提交
1822 1823
}

1824 1825
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) {
  //
L
Liu Jicong 已提交
1826
  return tmqCommitInner(tmq, msg, 0, 0, NULL, NULL);
1827
}