clientTmq.c 55.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "cJSON.h"
17 18 19
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22 23
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
24
#include "tqueue.h"
25
#include "tref.h"
L
Liu Jicong 已提交
26 27
#include "ttimer.h"

L
Liu Jicong 已提交
28 29 30
int32_t tmqAskEp(tmq_t* tmq, bool async);

typedef struct {
31 32 33
  int8_t  inited;
  tmr_h   timer;
  int32_t rsetId;
L
Liu Jicong 已提交
34 35 36
} SMqMgmt;

static SMqMgmt tmqMgmt = {0};
37

L
Liu Jicong 已提交
38 39 40 41 42 43
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
44 45 46
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
47 48
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
49
struct tmq_list_t {
L
Liu Jicong 已提交
50
  SArray container;
L
Liu Jicong 已提交
51
};
L
Liu Jicong 已提交
52

L
Liu Jicong 已提交
53
struct tmq_conf_t {
54 55 56 57 58
  char    clientId[256];
  char    groupId[TSDB_CGROUP_LEN];
  int8_t  autoCommit;
  int8_t  resetOffset;
  int8_t  withTbName;
L
Liu Jicong 已提交
59 60
  int8_t  snapEnable;
  int32_t snapBatchSize;
61 62 63

  bool hbBgEnable;

64 65 66 67 68
  uint16_t       port;
  int32_t        autoCommitInterval;
  char*          ip;
  char*          user;
  char*          pass;
69
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
70
  void*          commitCbUserParam;
L
Liu Jicong 已提交
71 72 73
};

struct tmq_t {
74
  int64_t refId;
L
Liu Jicong 已提交
75
  // conf
76 77 78 79 80 81 82 83 84 85 86
  char    groupId[TSDB_CGROUP_LEN];
  char    clientId[256];
  int8_t  withTbName;
  int8_t  useSnapshot;
  int8_t  autoCommit;
  int32_t autoCommitInterval;
  int32_t resetOffsetCfg;
  int64_t consumerId;

  bool hbBgEnable;

L
Liu Jicong 已提交
87 88
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
89 90 91 92

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
93 94
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
95
  int32_t epSkipCnt;
L
Liu Jicong 已提交
96
#endif
L
Liu Jicong 已提交
97 98
  int64_t pollCnt;

L
Liu Jicong 已提交
99
  // timer
100 101
  tmr_h hbLiveTimer;
  tmr_h epTimer;
L
Liu Jicong 已提交
102 103 104
  tmr_h reportTimer;
  tmr_h commitTimer;

L
Liu Jicong 已提交
105 106 107 108
  // connection
  STscObj* pTscObj;

  // container
L
Liu Jicong 已提交
109
  SArray*     clientTopics;  // SArray<SMqClientTopic>
L
Liu Jicong 已提交
110
  STaosQueue* mqueue;        // queue of rsp
L
Liu Jicong 已提交
111
  STaosQall*  qall;
L
Liu Jicong 已提交
112 113 114 115
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit

  // ctl
  tsem_t rspSem;
L
Liu Jicong 已提交
116 117
};

X
Xiaoyu Wang 已提交
118 119 120 121 122 123 124 125
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
126
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
127
  TMQ_CONSUMER_STATUS__RECOVER,
L
Liu Jicong 已提交
128 129
};

L
Liu Jicong 已提交
130
enum {
131
  TMQ_DELAYED_TASK__ASK_EP = 1,
L
Liu Jicong 已提交
132 133 134 135
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

L
Liu Jicong 已提交
136
typedef struct {
137 138 139
  // statistics
  int64_t pollCnt;
  // offset
L
Liu Jicong 已提交
140 141
  STqOffsetVal committedOffset;
  STqOffsetVal currentOffset;
L
Liu Jicong 已提交
142
  // connection info
143
  int32_t vgId;
X
Xiaoyu Wang 已提交
144
  int32_t vgStatus;
L
Liu Jicong 已提交
145
  int32_t vgSkipCnt;
146 147 148
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
149
typedef struct {
150
  // subscribe info
151 152
  char topicName[TSDB_TOPIC_FNAME_LEN];
  char db[TSDB_DB_FNAME_LEN];
L
Liu Jicong 已提交
153 154 155

  SArray* vgs;  // SArray<SMqClientVg>

L
Liu Jicong 已提交
156
  SSchemaWrapper schema;
157 158
} SMqClientTopic;

L
Liu Jicong 已提交
159 160 161 162 163
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
164
  union {
L
Liu Jicong 已提交
165 166
    SMqDataRsp dataRsp;
    SMqMetaRsp metaRsp;
L
Liu Jicong 已提交
167
    STaosxRsp  taosxRsp;
L
Liu Jicong 已提交
168
  };
L
Liu Jicong 已提交
169 170
} SMqPollRspWrapper;

L
Liu Jicong 已提交
171
typedef struct {
172 173
  int64_t refId;
  int32_t epoch;
L
Liu Jicong 已提交
174 175
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
176
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
177

L
Liu Jicong 已提交
178
typedef struct {
179 180
  int64_t refId;
  int32_t epoch;
L
Liu Jicong 已提交
181
  int32_t code;
L
Liu Jicong 已提交
182
  int32_t async;
X
Xiaoyu Wang 已提交
183
  tsem_t  rspSem;
184 185
} SMqAskEpCbParam;

L
Liu Jicong 已提交
186
typedef struct {
187 188
  int64_t         refId;
  int32_t         epoch;
L
Liu Jicong 已提交
189
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
190
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
191
  int32_t         vgId;
L
Liu Jicong 已提交
192
  tsem_t          rspSem;
X
Xiaoyu Wang 已提交
193
} SMqPollCbParam;
194

195
typedef struct {
196 197
  int64_t        refId;
  int32_t        epoch;
L
Liu Jicong 已提交
198 199
  int8_t         automatic;
  int8_t         async;
L
Liu Jicong 已提交
200 201
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
L
Liu Jicong 已提交
202
  int32_t        rspErr;
203
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
204 205 206 207
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
  void*  userParam;
  tsem_t rspSem;
208 209 210 211 212
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
  STqOffset*           pOffset;
213
} SMqCommitCbParam;
214

215
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
216
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
217
  conf->withTbName = false;
L
Liu Jicong 已提交
218
  conf->autoCommit = true;
L
Liu Jicong 已提交
219
  conf->autoCommitInterval = 5000;
X
Xiaoyu Wang 已提交
220
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
221
  conf->hbBgEnable = true;
222 223 224
  return conf;
}

L
Liu Jicong 已提交
225
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
226 227 228 229 230 231
  if (conf) {
    if (conf->ip) taosMemoryFree(conf->ip);
    if (conf->user) taosMemoryFree(conf->user);
    if (conf->pass) taosMemoryFree(conf->pass);
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
232 233 234
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
235
  if (strcmp(key, "group.id") == 0) {
L
Liu Jicong 已提交
236
    tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN);
L
Liu Jicong 已提交
237
    return TMQ_CONF_OK;
238
  }
L
Liu Jicong 已提交
239

240
  if (strcmp(key, "client.id") == 0) {
L
Liu Jicong 已提交
241
    tstrncpy(conf->clientId, value, 256);
L
Liu Jicong 已提交
242 243
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
244

L
Liu Jicong 已提交
245 246
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
247
      conf->autoCommit = true;
L
Liu Jicong 已提交
248 249
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
250
      conf->autoCommit = false;
L
Liu Jicong 已提交
251 252 253 254
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
255
  }
L
Liu Jicong 已提交
256

L
Liu Jicong 已提交
257 258 259 260 261
  if (strcmp(key, "auto.commit.interval.ms") == 0) {
    conf->autoCommitInterval = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
262 263 264 265 266 267 268 269 270 271 272 273 274 275
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
276

277 278
  if (strcmp(key, "msg.with.table.name") == 0) {
    if (strcmp(value, "true") == 0) {
279
      conf->withTbName = true;
L
Liu Jicong 已提交
280
      return TMQ_CONF_OK;
281
    } else if (strcmp(value, "false") == 0) {
282
      conf->withTbName = false;
L
Liu Jicong 已提交
283
      return TMQ_CONF_OK;
284 285 286 287 288
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
289
  if (strcmp(key, "experimental.snapshot.enable") == 0) {
L
Liu Jicong 已提交
290
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
291
      conf->snapEnable = true;
292 293
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
294
      conf->snapEnable = false;
295 296 297 298 299 300
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
301 302 303 304 305
  if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
    conf->snapBatchSize = atoi(value);
    return TMQ_CONF_OK;
  }

306 307 308
  if (strcmp(key, "enable.heartbeat.background") == 0) {
    if (strcmp(value, "true") == 0) {
      conf->hbBgEnable = true;
L
Liu Jicong 已提交
309 310
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
311
      conf->hbBgEnable = false;
L
Liu Jicong 已提交
312 313 314 315
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
316
    return TMQ_CONF_OK;
L
Liu Jicong 已提交
317 318
  }

L
Liu Jicong 已提交
319
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
320 321 322
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
323
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
324 325 326
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
327
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
328 329 330
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
331
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
332 333 334
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
335
  if (strcmp(key, "td.connect.db") == 0) {
336
    /*conf->db = strdup(value);*/
L
Liu Jicong 已提交
337 338 339
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
340
  return TMQ_CONF_UNKNOWN;
341 342 343
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
344 345
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
346 347
}

L
Liu Jicong 已提交
348 349
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
350 351 352 353 354
  if (src == NULL || src[0] == 0) return -1;
  char* topic = strdup(src);
  if (topic[0] != '`') {
    strtolower(topic, src);
  }
L
fix  
Liu Jicong 已提交
355
  if (taosArrayPush(container, &topic) == NULL) return -1;
356 357 358
  return 0;
}

L
Liu Jicong 已提交
359
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
360
  SArray* container = &list->container;
L
Liu Jicong 已提交
361
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
362 363
}

L
Liu Jicong 已提交
364 365 366 367 368 369 370 371 372 373
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

L
Liu Jicong 已提交
374 375 376 377
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409
int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParamSet->refId);
  if (tmq == NULL) {
    if (!pParamSet->async) {
      tsem_destroy(&pParamSet->rspSem);
    }
    taosMemoryFree(pParamSet);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  // if no more waiting rsp
  if (pParamSet->async) {
    // call async cb func
    if (pParamSet->automatic && tmq->commitCb) {
      tmq->commitCb(tmq, pParamSet->rspErr, tmq->commitCbUserParam);
    } else if (!pParamSet->automatic && pParamSet->userCb) {
      // sem post
      pParamSet->userCb(tmq, pParamSet->rspErr, pParamSet->userParam);
    }
    taosMemoryFree(pParamSet);
  } else {
    tsem_post(&pParamSet->rspSem);
  }

#if 0
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
#endif
  return 0;
}

410 411
int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
412 413
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
  // push into array
L
Liu Jicong 已提交
414
#if 0
415 416 417 418 419
  if (code == 0) {
    taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset);
  } else {
    taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
  }
L
Liu Jicong 已提交
420
#endif
L
Liu Jicong 已提交
421

L
Liu Jicong 已提交
422 423 424
  taosMemoryFree(pParam->pOffset);
  if (pBuf->pData) taosMemoryFree(pBuf->pData);

S
Shengliang Guan 已提交
425
  /*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
L
Liu Jicong 已提交
426 427
   * pOffset->version);*/

428
  // count down waiting rsp
L
Liu Jicong 已提交
429
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
430 431 432
  ASSERT(waitingRspNum >= 0);

  if (waitingRspNum == 0) {
433
    tmqCommitDone(pParamSet);
434 435 436 437
  }
  return 0;
}

L
Liu Jicong 已提交
438 439 440 441 442 443
static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pTopic, SMqCommitCbParamSet* pParamSet) {
  STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
  if (pOffset == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
444
  pOffset->val = pVg->currentOffset;
445

L
Liu Jicong 已提交
446 447 448 449
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pOffset->subKey, tmq->groupId, groupLen);
  pOffset->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);
L
Liu Jicong 已提交
450

L
Liu Jicong 已提交
451 452 453 454 455 456 457 458 459
  int32_t len;
  int32_t code;
  tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
  if (code < 0) {
    return -1;
  }
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
  if (buf == NULL) return -1;
  ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
460

L
Liu Jicong 已提交
461 462 463 464 465
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
  tEncodeSTqOffset(&encoder, pOffset);
L
Liu Jicong 已提交
466
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
467 468

  // build param
469
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
470 471 472 473
  if (pParam == NULL) {
    taosMemoryFree(buf);
    return -1;
  }
L
Liu Jicong 已提交
474 475 476 477 478 479
  pParam->params = pParamSet;
  pParam->pOffset = pOffset;

  // build send info
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
L
Liu Jicong 已提交
480 481
    taosMemoryFree(buf);
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
482 483 484 485 486 487 488 489
    return -1;
  }
  pMsgSendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = sizeof(SMsgHead) + len,
      .handle = NULL,
  };

S
Shengliang Guan 已提交
490 491
  tscDebug("consumer:%" PRId64 ", commit offset of %s on vgId:%d, offset is %" PRId64, tmq->consumerId, pOffset->subKey,
           pVg->vgId, pOffset->val.version);
L
Liu Jicong 已提交
492 493

  // TODO: put into cb
L
Liu Jicong 已提交
494
  pVg->committedOffset = pVg->currentOffset;
L
Liu Jicong 已提交
495 496 497 498

  pMsgSendInfo->requestId = generateRequestId();
  pMsgSendInfo->requestObjRefId = 0;
  pMsgSendInfo->param = pParam;
L
Liu Jicong 已提交
499
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
500
  pMsgSendInfo->fp = tmqCommitCb;
L
Liu Jicong 已提交
501 502 503
  pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
  // send msg

L
Liu Jicong 已提交
504 505 506
  atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
  atomic_add_fetch_32(&pParamSet->totalRspNum, 1);

L
Liu Jicong 已提交
507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
  return 0;
}

int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
  char*   topic;
  int32_t vgId;
  ASSERT(msg != NULL);
  if (TD_RES_TMQ(msg)) {
    SMqRspObj* pRspObj = (SMqRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
  } else if (TD_RES_TMQ_META(msg)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
    topic = pMetaRspObj->topic;
    vgId = pMetaRspObj->vgId;
L
Liu Jicong 已提交
524
  } else if (TD_RES_TMQ_METADATA(msg)) {
525 526 527
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
L
Liu Jicong 已提交
528 529 530 531 532 533 534 535 536
  } else {
    return TSDB_CODE_TMQ_INVALID_MSG;
  }

  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
537 538
  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;
L
Liu Jicong 已提交
539 540 541 542 543 544
  pParamSet->automatic = 0;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

L
Liu Jicong 已提交
545 546
  int32_t code = -1;

L
Liu Jicong 已提交
547 548 549 550 551 552 553
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(pTopic->topicName, topic) != 0) continue;
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      if (pVg->vgId != vgId) continue;

L
Liu Jicong 已提交
554
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
L
Liu Jicong 已提交
555
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
L
Liu Jicong 已提交
556 557
          tsem_destroy(&pParamSet->rspSem);
          taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
558
          goto FAIL;
L
Liu Jicong 已提交
559
        }
L
Liu Jicong 已提交
560
        goto HANDLE_RSP;
L
Liu Jicong 已提交
561 562
      }
    }
L
Liu Jicong 已提交
563
  }
L
Liu Jicong 已提交
564

L
Liu Jicong 已提交
565 566 567 568
HANDLE_RSP:
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
569 570 571
    return 0;
  }

L
Liu Jicong 已提交
572 573 574 575
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
L
Liu Jicong 已提交
576
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
577 578 579 580 581 582 583 584 585 586 587 588
    return code;
  } else {
    code = 0;
  }

FAIL:
  if (code != 0 && async) {
    userCb(tmq, code, userParam);
  }
  return 0;
}

L
Liu Jicong 已提交
589 590
int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                       void* userParam) {
L
Liu Jicong 已提交
591 592 593 594 595 596
  int32_t code = -1;

  if (msg != NULL) {
    return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
  }

597 598
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
L
Liu Jicong 已提交
599 600 601 602 603 604 605 606
    code = TSDB_CODE_OUT_OF_MEMORY;
    if (async) {
      if (automatic) {
        tmq->commitCb(tmq, code, tmq->commitCbUserParam);
      } else {
        userCb(tmq, code, userParam);
      }
    }
607 608
    return -1;
  }
609 610 611 612

  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;

613 614 615 616 617 618
  pParamSet->automatic = automatic;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

619 620 621
  // init as 1 to prevent concurrency issue
  pParamSet->waitingRspNum = 1;

622 623
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
624

S
Shengliang Guan 已提交
625
    tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgNum %d", tmq->consumerId, pTopic->topicName,
L
Liu Jicong 已提交
626 627
             (int32_t)taosArrayGetSize(pTopic->vgs));

628
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
L
Liu Jicong 已提交
629 630
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);

S
Shengliang Guan 已提交
631 632
      tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgId:%d", tmq->consumerId, pTopic->topicName,
               pVg->vgId);
L
Liu Jicong 已提交
633

L
Liu Jicong 已提交
634 635 636
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
        tscDebug("consumer: %ld, vg:%d, current %ld, committed %ld", tmq->consumerId, pVg->vgId,
                 pVg->currentOffset.version, pVg->committedOffset.version);
L
Liu Jicong 已提交
637 638 639
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          continue;
        }
640 641 642 643
      }
    }
  }

L
Liu Jicong 已提交
644 645 646 647 648 649
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
    return 0;
  }

650 651 652 653 654 655
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
  ASSERT(waitingRspNum >= 0);
  if (waitingRspNum == 0) {
    tmqCommitDone(pParamSet);
  }

656 657 658 659
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
660
    taosMemoryFree(pParamSet);
661 662
  }

L
Liu Jicong 已提交
663
#if 0
664 665 666 667
  if (!async) {
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
  }
L
Liu Jicong 已提交
668
#endif
669 670 671 672

  return 0;
}

673
void tmqAssignAskEpTask(void* param, void* tmrId) {
674 675 676 677 678 679 680 681 682
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
    *pTaskType = TMQ_DELAYED_TASK__ASK_EP;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
683 684 685
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
686 687 688 689 690 691 692 693 694
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
    *pTaskType = TMQ_DELAYED_TASK__COMMIT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
695 696 697
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
698 699 700 701 702 703 704 705 706
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
    *pTaskType = TMQ_DELAYED_TASK__REPORT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
707 708
}

709 710 711 712 713 714
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
  if (pMsg && pMsg->pData) taosMemoryFree(pMsg->pData);
  return 0;
}

void tmqSendHbReq(void* param, void* tmrId) {
715 716 717
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq == NULL) {
L
Liu Jicong 已提交
718
    taosMemoryFree(param);
719 720
    return;
  }
721 722 723 724
  int64_t   consumerId = tmq->consumerId;
  int32_t   epoch = tmq->epoch;
  SMqHbReq* pReq = taosMemoryMalloc(sizeof(SMqHbReq));
  if (pReq == NULL) goto OVER;
L
Liu Jicong 已提交
725
  pReq->consumerId = htobe64(consumerId);
726 727 728 729 730
  pReq->epoch = epoch;

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
731
    goto OVER;
732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750
  }
  sendInfo->msgInfo = (SDataBuf){
      .pData = pReq,
      .len = sizeof(SMqHbReq),
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = NULL;
  sendInfo->fp = tmqHbCb;
  sendInfo->msgType = TDMT_MND_MQ_HB;

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

OVER:
751
  taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer);
752 753
}

L
Liu Jicong 已提交
754 755 756 757 758 759 760 761
int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
  STaosQall* qall = taosAllocateQall();
  taosReadAllQitems(tmq->delayedTask, qall);
  while (1) {
    int8_t* pTaskType = NULL;
    taosGetQitem(qall, (void**)&pTaskType);
    if (pTaskType == NULL) break;

762
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
L
Liu Jicong 已提交
763
      tmqAskEp(tmq, true);
764 765 766 767 768

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
      *pRefId = tmq->refId;

      taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &tmq->epTimer);
L
Liu Jicong 已提交
769
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
L
Liu Jicong 已提交
770
      tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
771 772 773 774 775

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
      *pRefId = tmq->refId;

      taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId, tmqMgmt.timer, &tmq->commitTimer);
L
Liu Jicong 已提交
776 777 778 779
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    } else {
      ASSERT(0);
    }
L
Liu Jicong 已提交
780
    taosFreeQitem(pTaskType);
L
Liu Jicong 已提交
781 782 783 784 785
  }
  taosFreeQall(qall);
  return 0;
}

L
Liu Jicong 已提交
786
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
787
  SMqRspWrapper* msg = NULL;
L
Liu Jicong 已提交
788 789 790 791 792 793 794 795
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }

L
fix  
Liu Jicong 已提交
796
  msg = NULL;
L
Liu Jicong 已提交
797 798 799 800 801 802 803 804 805 806
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }
}

D
dapan1121 已提交
807
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
808 809 810 811 812
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
  tsem_post(&pParam->rspSem);
  return 0;
}
813

L
Liu Jicong 已提交
814
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
X
Xiaoyu Wang 已提交
815 816 817 818
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
819
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
820
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
821
  }
L
Liu Jicong 已提交
822
  return 0;
X
Xiaoyu Wang 已提交
823 824
}

L
Liu Jicong 已提交
825
int32_t tmq_unsubscribe(tmq_t* tmq) {
L
Liu Jicong 已提交
826 827
  int32_t     rsp;
  int32_t     retryCnt = 0;
L
Liu Jicong 已提交
828
  tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
829 830 831 832 833 834 835 836 837 838
  while (1) {
    rsp = tmq_subscribe(tmq, lst);
    if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
      break;
    } else {
      retryCnt++;
      taosMsleep(500);
    }
  }

L
Liu Jicong 已提交
839 840
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
841 842
}

843 844
void tmqFreeImpl(void* handle) {
  tmq_t* tmq = (tmq_t*)handle;
L
Liu Jicong 已提交
845

846 847 848 849
  // TODO stop timer
  if (tmq->mqueue) taosCloseQueue(tmq->mqueue);
  if (tmq->delayedTask) taosCloseQueue(tmq->delayedTask);
  if (tmq->qall) taosFreeQall(tmq->qall);
L
Liu Jicong 已提交
850

851
  tsem_destroy(&tmq->rspSem);
L
Liu Jicong 已提交
852

853 854 855
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
856
    if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
857 858 859 860 861 862
    int32_t vgSz = taosArrayGetSize(pTopic->vgs);
    taosArrayDestroy(pTopic->vgs);
  }
  taosArrayDestroy(tmq->clientTopics);
  taos_close_internal(tmq->pTscObj);
  taosMemoryFree(tmq);
L
Liu Jicong 已提交
863 864
}

L
Liu Jicong 已提交
865
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
L
Liu Jicong 已提交
866 867 868 869 870 871 872 873 874
  // init timer
  int8_t inited = atomic_val_compare_exchange_8(&tmqMgmt.inited, 0, 1);
  if (inited == 0) {
    tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
    if (tmqMgmt.timer == NULL) {
      atomic_store_8(&tmqMgmt.inited, 0);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
875
    tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
L
Liu Jicong 已提交
876 877
  }

L
Liu Jicong 已提交
878 879
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
L
Liu Jicong 已提交
880
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
881
    tscError("setting up new consumer failed since %s, consumer group %s", terrstr(), conf->groupId);
L
Liu Jicong 已提交
882 883
    return NULL;
  }
L
Liu Jicong 已提交
884

L
Liu Jicong 已提交
885 886 887 888 889
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

  ASSERT(user);
  ASSERT(pass);
L
Liu Jicong 已提交
890
  ASSERT(conf->groupId[0]);
L
Liu Jicong 已提交
891

L
Liu Jicong 已提交
892 893 894 895 896 897
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
  pTmq->delayedTask = taosOpenQueue();

  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
L
Liu Jicong 已提交
898
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
899 900
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
901 902
    goto FAIL;
  }
L
Liu Jicong 已提交
903

L
Liu Jicong 已提交
904 905
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
906 907
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
908 909
  /*pTmq->epStatus = 0;*/
  /*pTmq->epSkipCnt = 0;*/
L
Liu Jicong 已提交
910

L
Liu Jicong 已提交
911 912 913
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
914
  pTmq->withTbName = conf->withTbName;
L
Liu Jicong 已提交
915
  pTmq->useSnapshot = conf->snapEnable;
L
Liu Jicong 已提交
916
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
917
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
918 919
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
920 921
  pTmq->resetOffsetCfg = conf->resetOffset;

922 923
  pTmq->hbBgEnable = conf->hbBgEnable;

L
Liu Jicong 已提交
924
  // assign consumerId
L
Liu Jicong 已提交
925
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
926

L
Liu Jicong 已提交
927 928
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
S
Shengliang Guan 已提交
929 930
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
931 932
    goto FAIL;
  }
L
Liu Jicong 已提交
933

L
Liu Jicong 已提交
934 935 936
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
S
Shengliang Guan 已提交
937 938
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
939 940 941
    tsem_destroy(&pTmq->rspSem);
    goto FAIL;
  }
L
Liu Jicong 已提交
942

943 944 945 946 947 948
  pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
  if (pTmq->refId < 0) {
    tmqFreeImpl(pTmq);
    return NULL;
  }

949
  if (pTmq->hbBgEnable) {
L
Liu Jicong 已提交
950 951
    int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
    *pRefId = pTmq->refId;
952
    pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
953 954
  }

S
Shengliang Guan 已提交
955
  tscInfo("consumer %" PRId64 " is setup, consumer group %s", pTmq->consumerId, pTmq->groupId);
L
Liu Jicong 已提交
956

957
  return pTmq;
L
Liu Jicong 已提交
958 959 960 961 962 963 964 965

FAIL:
  if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
  if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
  if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
  if (pTmq->qall) taosFreeQall(pTmq->qall);
  taosMemoryFree(pTmq);
  return NULL;
966 967
}

L
Liu Jicong 已提交
968
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
L
Liu Jicong 已提交
969 970 971 972 973
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
  SCMSubscribeReq req = {0};
  int32_t         code = -1;
974

L
Liu Jicong 已提交
975 976
  tscDebug("call tmq subscribe, consumer: %ld, topic num %d", tmq->consumerId, sz);

977
  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
978
  tstrncpy(req.clientId, tmq->clientId, 256);
L
Liu Jicong 已提交
979
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
980
  req.topicNames = taosArrayInit(sz, sizeof(void*));
L
Liu Jicong 已提交
981
  if (req.topicNames == NULL) goto FAIL;
982

L
Liu Jicong 已提交
983 984
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
985 986

    SName name = {0};
L
Liu Jicong 已提交
987
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
988

L
Liu Jicong 已提交
989 990 991
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
992
    }
L
Liu Jicong 已提交
993
    tNameExtractFullName(&name, topicFName);
994

L
Liu Jicong 已提交
995 996 997
    tscDebug("subscribe topic: %s", topicFName);

    taosArrayPush(req.topicNames, &topicFName);
998 999
  }

L
Liu Jicong 已提交
1000 1001 1002 1003
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) goto FAIL;

1004 1005 1006
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

1007
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1008
  if (sendInfo == NULL) goto FAIL;
1009

X
Xiaoyu Wang 已提交
1010
  SMqSubscribeCbParam param = {
L
Liu Jicong 已提交
1011
      .rspErr = 0,
1012 1013
      .refId = tmq->refId,
      .epoch = tmq->epoch,
X
Xiaoyu Wang 已提交
1014
  };
L
Liu Jicong 已提交
1015

L
Liu Jicong 已提交
1016 1017 1018
  if (tsem_init(&param.rspSem, 0, 0) != 0) goto FAIL;

  sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1019 1020 1021 1022
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
1023

L
Liu Jicong 已提交
1024 1025
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1026 1027
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
1028 1029
  sendInfo->msgType = TDMT_MND_SUBSCRIBE;

1030 1031 1032 1033 1034
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
1035 1036 1037
  // avoid double free if msg is sent
  buf = NULL;

L
Liu Jicong 已提交
1038 1039
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
1040

L
Liu Jicong 已提交
1041 1042 1043
  code = param.rspErr;
  if (code != 0) goto FAIL;

L
Liu Jicong 已提交
1044
  int32_t retryCnt = 0;
L
Liu Jicong 已提交
1045
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
L
Liu Jicong 已提交
1046 1047 1048
    if (retryCnt++ > 10) {
      goto FAIL;
    }
L
fix  
Liu Jicong 已提交
1049
    tscDebug("consumer not ready, retry");
L
Liu Jicong 已提交
1050 1051
    taosMsleep(500);
  }
1052

1053 1054
  // init ep timer
  if (tmq->epTimer == NULL) {
1055 1056 1057
    int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId1 = tmq->refId;
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, pRefId1, tmqMgmt.timer);
1058
  }
L
Liu Jicong 已提交
1059 1060

  // init auto commit timer
1061
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
1062 1063 1064
    int64_t* pRefId2 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId2 = tmq->refId;
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId2, tmqMgmt.timer);
L
Liu Jicong 已提交
1065 1066
  }

L
Liu Jicong 已提交
1067 1068 1069
  code = 0;
FAIL:
  if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
1070 1071
  taosMemoryFree(buf);

L
Liu Jicong 已提交
1072
  return code;
1073 1074
}

L
Liu Jicong 已提交
1075
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
1076
  //
1077
  conf->commitCb = cb;
L
Liu Jicong 已提交
1078
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
1079
}
1080

D
dapan1121 已提交
1081
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1082 1083
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
1084
  SMqClientTopic* pTopic = pParam->pTopic;
1085 1086 1087 1088 1089 1090

  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
  if (tmq == NULL) {
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1091
    taosMemoryFree(pMsg->pEpSet);
1092 1093 1094 1095 1096 1097
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  int32_t epoch = pParam->epoch;
  int32_t vgId = pParam->vgId;
L
Liu Jicong 已提交
1098
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
1099
  if (code != 0) {
L
Liu Jicong 已提交
1100
    tscWarn("msg discard from vgId:%d, epoch %d, since %s", vgId, epoch, terrstr());
L
Liu Jicong 已提交
1101
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1102 1103 1104 1105
    if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
      goto CREATE_MSG_FAIL;
    }
L
Liu Jicong 已提交
1106 1107 1108
    if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
      if (pRspWrapper == NULL) {
S
Shengliang Guan 已提交
1109
        tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
Liu Jicong 已提交
1110 1111 1112 1113 1114 1115 1116 1117
        goto CREATE_MSG_FAIL;
      }
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      /*pRspWrapper->vgHandle = pVg;*/
      /*pRspWrapper->topicHandle = pTopic;*/
      taosWriteQitem(tmq->mqueue, pRspWrapper);
      tsem_post(&tmq->rspSem);
    }
L
fix txn  
Liu Jicong 已提交
1118
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1119 1120
  }

X
Xiaoyu Wang 已提交
1121 1122 1123
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
1124
    // do not write into queue since updating epoch reset
S
Shengliang Guan 已提交
1125
    tscWarn("msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d", vgId, msgEpoch,
L
Liu Jicong 已提交
1126
            tmqEpoch);
1127
    tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1128
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1129
    taosMemoryFree(pMsg->pEpSet);
X
Xiaoyu Wang 已提交
1130 1131 1132 1133
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
S
Shengliang Guan 已提交
1134
    tscWarn("mismatch rsp from vgId:%d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
X
Xiaoyu Wang 已提交
1135 1136
  }

L
Liu Jicong 已提交
1137 1138 1139
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

1140
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1141
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1142
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1143
    taosMemoryFree(pMsg->pEpSet);
S
Shengliang Guan 已提交
1144
    tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
fix txn  
Liu Jicong 已提交
1145
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1146
  }
L
Liu Jicong 已提交
1147

L
Liu Jicong 已提交
1148
  pRspWrapper->tmqRspType = rspType;
L
Liu Jicong 已提交
1149 1150
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
1151

L
Liu Jicong 已提交
1152
  if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1153 1154 1155
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
wmmhello's avatar
wmmhello 已提交
1156
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1157
    memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1158 1159 1160 1161 1162 1163

    tscDebug("consumer:%" PRId64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d",
             tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version,
             rspType);

  } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
1164 1165 1166 1167
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqMetaRsp(&decoder, &pRspWrapper->metaRsp);
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1168
    memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1169 1170 1171 1172 1173 1174 1175 1176
  } else if (rspType == TMQ_MSG_TYPE__TAOSX_RSP) {
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp);
    tDecoderClear(&decoder);
    memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead));
  } else {
    ASSERT(0);
L
Liu Jicong 已提交
1177
  }
L
Liu Jicong 已提交
1178

L
Liu Jicong 已提交
1179
  taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1180
  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
1181

L
Liu Jicong 已提交
1182
  taosWriteQitem(tmq->mqueue, pRspWrapper);
1183
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1184

L
Liu Jicong 已提交
1185
  return 0;
L
fix txn  
Liu Jicong 已提交
1186
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1187
  if (epoch == tmq->epoch) {
L
Liu Jicong 已提交
1188 1189
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
1190
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1191
  return -1;
1192 1193
}

L
Liu Jicong 已提交
1194
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
1195 1196 1197 1198
  bool set = false;

  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
S
Shengliang Guan 已提交
1199
  tscDebug("consumer:%" PRId64 ", update ep epoch %d to epoch %d, topic num:%d", tmq->consumerId, tmq->epoch, epoch,
1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217
           topicNumGet);

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
S
Shengliang Guan 已提交
1218
      tscDebug("consumer:%" PRId64 ", new vg num: %d", tmq->consumerId, vgNumCur);
1219 1220 1221
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
        sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
L
Liu Jicong 已提交
1222
        char buf[80];
L
Liu Jicong 已提交
1223
        tFormatOffset(buf, 80, &pVgCur->currentOffset);
L
Liu Jicong 已提交
1224 1225
        tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch,
                 pVgCur->vgId, vgKey, buf);
L
Liu Jicong 已提交
1226
        taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal));
1227 1228 1229 1230 1231 1232 1233 1234
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
    topic.schema = pTopicEp->schema;
L
Liu Jicong 已提交
1235 1236
    pTopicEp->schema.nCols = 0;
    pTopicEp->schema.pSchema = NULL;
1237
    tstrncpy(topic.topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
1238 1239
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);

S
Shengliang Guan 已提交
1240
    tscDebug("consumer:%" PRId64 ", update topic: %s", tmq->consumerId, topic.topicName);
1241 1242 1243 1244 1245 1246

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
L
Liu Jicong 已提交
1247 1248
      STqOffsetVal* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      STqOffsetVal  offsetNew = {.type = tmq->resetOffsetCfg};
1249
      if (pOffset != NULL) {
L
Liu Jicong 已提交
1250
        offsetNew = *pOffset;
1251 1252 1253 1254
      }

      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1255
          .currentOffset = offsetNew,
1256 1257 1258 1259 1260 1261 1262 1263 1264 1265
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
          .vgSkipCnt = 0,
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
    taosArrayPush(newTopics, &topic);
  }
1266 1267 1268 1269
  if (tmq->clientTopics) {
    int32_t sz = taosArrayGetSize(tmq->clientTopics);
    for (int32_t i = 0; i < sz; i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
1270
      if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
1271 1272
      int32_t vgSz = taosArrayGetSize(pTopic->vgs);
      taosArrayDestroy(pTopic->vgs);
L
Liu Jicong 已提交
1273
    }
1274
    taosArrayDestroy(tmq->clientTopics);
X
Xiaoyu Wang 已提交
1275
  }
L
Liu Jicong 已提交
1276
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
1277
  tmq->clientTopics = newTopics;
1278

1279 1280 1281 1282
  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
1283

X
Xiaoyu Wang 已提交
1284 1285 1286 1287
  atomic_store_32(&tmq->epoch, epoch);
  return set;
}

D
dapan1121 已提交
1288
int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
1289
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1290
  int8_t           async = pParam->async;
1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303
  tmq_t*           tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);

  if (tmq == NULL) {
    if (!async) {
      tsem_destroy(&pParam->rspSem);
    } else {
      taosMemoryFree(pParam);
    }
    taosMemoryFree(pMsg->pData);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

L
Liu Jicong 已提交
1304
  pParam->code = code;
1305
  if (code != 0) {
L
Liu Jicong 已提交
1306 1307
    tscError("consumer:%" PRId64 ", get topic endpoint error, not ready, wait:%d, code %x", tmq->consumerId,
             pParam->async, code);
L
Liu Jicong 已提交
1308
    goto END;
1309
  }
L
Liu Jicong 已提交
1310

L
Liu Jicong 已提交
1311
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1312
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1313
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1314 1315
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
S
Shengliang Guan 已提交
1316
  tscDebug("consumer:%" PRId64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1317 1318
  if (head->epoch <= epoch) {
    goto END;
1319
  }
L
Liu Jicong 已提交
1320

L
Liu Jicong 已提交
1321
  if (!async) {
L
Liu Jicong 已提交
1322 1323
    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
S
Shengliang Guan 已提交
1324 1325
    /*printf("rsp epoch %" PRId64 " sz %" PRId64 "\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %" PRId64 " sz %" PRId64 "\n", tmq->epoch, tmq->clientTopics->size);*/
L
Liu Jicong 已提交
1326
    tmqUpdateEp(tmq, head->epoch, &rsp);
L
Liu Jicong 已提交
1327
    tDeleteSMqAskEpRsp(&rsp);
X
Xiaoyu Wang 已提交
1328
  } else {
1329
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1330
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1331
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1332 1333
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1334
    }
L
Liu Jicong 已提交
1335 1336 1337
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1338
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1339

L
Liu Jicong 已提交
1340
    taosWriteQitem(tmq->mqueue, pWrapper);
1341
    tsem_post(&tmq->rspSem);
1342
  }
L
Liu Jicong 已提交
1343 1344

END:
L
Liu Jicong 已提交
1345
  /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1346
  if (!async) {
L
Liu Jicong 已提交
1347
    tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
1348 1349
  } else {
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
1350
  }
L
Liu Jicong 已提交
1351
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1352
  return code;
1353 1354
}

L
Liu Jicong 已提交
1355
int32_t tmqAskEp(tmq_t* tmq, bool async) {
L
Liu Jicong 已提交
1356
  int32_t code = 0;
L
Liu Jicong 已提交
1357
#if 0
L
Liu Jicong 已提交
1358
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
L
fix txn  
Liu Jicong 已提交
1359
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
1360
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
S
Shengliang Guan 已提交
1361
    tscTrace("consumer:%" PRId64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
1362
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
1363
  }
L
temp  
Liu Jicong 已提交
1364
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
1365
#endif
L
Liu Jicong 已提交
1366
  int32_t      tlen = sizeof(SMqAskEpReq);
L
Liu Jicong 已提交
1367
  SMqAskEpReq* req = taosMemoryCalloc(1, tlen);
L
Liu Jicong 已提交
1368
  if (req == NULL) {
L
Liu Jicong 已提交
1369
    tscError("failed to malloc get subscribe ep buf");
L
Liu Jicong 已提交
1370
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1371
    return -1;
L
Liu Jicong 已提交
1372
  }
L
Liu Jicong 已提交
1373 1374 1375
  req->consumerId = htobe64(tmq->consumerId);
  req->epoch = htonl(tmq->epoch);
  strcpy(req->cgroup, tmq->groupId);
1376

L
Liu Jicong 已提交
1377
  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
1378 1379
  if (pParam == NULL) {
    tscError("failed to malloc subscribe param");
wafwerar's avatar
wafwerar 已提交
1380
    taosMemoryFree(req);
L
Liu Jicong 已提交
1381
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1382
    return -1;
L
Liu Jicong 已提交
1383
  }
1384 1385
  pParam->refId = tmq->refId;
  pParam->epoch = tmq->epoch;
L
Liu Jicong 已提交
1386
  pParam->async = async;
X
Xiaoyu Wang 已提交
1387
  tsem_init(&pParam->rspSem, 0, 0);
1388

1389
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1390 1391
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1392 1393
    taosMemoryFree(pParam);
    taosMemoryFree(req);
L
Liu Jicong 已提交
1394
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1395 1396 1397 1398 1399 1400 1401 1402 1403 1404
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
      .pData = req,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
L
Liu Jicong 已提交
1405 1406 1407
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1408
  sendInfo->msgType = TDMT_MND_MQ_ASK_EP;
1409

L
Liu Jicong 已提交
1410 1411
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

S
Shengliang Guan 已提交
1412
  tscDebug("consumer:%" PRId64 ", ask ep", tmq->consumerId);
L
add log  
Liu Jicong 已提交
1413

L
Liu Jicong 已提交
1414 1415
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1416

L
Liu Jicong 已提交
1417
  if (!async) {
L
Liu Jicong 已提交
1418 1419 1420 1421 1422
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }
  return code;
1423 1424
}

1425
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1426
  SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq));
1427 1428 1429
  if (pReq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
1430

L
Liu Jicong 已提交
1431 1432 1433
  /*strcpy(pReq->topic, pTopic->topicName);*/
  /*strcpy(pReq->cgroup, tmq->groupId);*/

L
Liu Jicong 已提交
1434 1435 1436 1437
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1438

1439
  pReq->withTbName = tmq->withTbName;
1440
  pReq->timeout = timeout;
L
Liu Jicong 已提交
1441
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1442
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1443
  /*pReq->currentOffset = reqOffset;*/
L
Liu Jicong 已提交
1444
  pReq->reqOffset = pVg->currentOffset;
L
Liu Jicong 已提交
1445
  pReq->reqId = generateRequestId();
1446

L
Liu Jicong 已提交
1447 1448
  pReq->useSnapshot = tmq->useSnapshot;

1449
  pReq->head.vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
1450
  pReq->head.contLen = htonl(sizeof(SMqPollReq));
1451 1452 1453
  return pReq;
}

L
Liu Jicong 已提交
1454 1455
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
L
Liu Jicong 已提交
1456
  pRspObj->resType = RES_TYPE__TMQ_META;
L
Liu Jicong 已提交
1457 1458 1459 1460 1461 1462 1463 1464
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

L
Liu Jicong 已提交
1465 1466 1467
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1468 1469
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1470
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1471
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1472
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
L
Liu Jicong 已提交
1473

L
Liu Jicong 已提交
1474 1475
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
L
Liu Jicong 已提交
1476
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1477 1478
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1479

L
Liu Jicong 已提交
1480
  return pRspObj;
X
Xiaoyu Wang 已提交
1481 1482
}

L
Liu Jicong 已提交
1483 1484
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
1485
  pRspObj->resType = RES_TYPE__TMQ_METADATA;
L
Liu Jicong 已提交
1486 1487 1488 1489
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;
  pRspObj->resIter = -1;
1490
  memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp));
L
Liu Jicong 已提交
1491 1492 1493

  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
1494
  if (!pWrapper->taosxRsp.withSchema) {
L
Liu Jicong 已提交
1495 1496 1497 1498 1499 1500
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }

  return pRspObj;
}

1501
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1502
  /*tscDebug("call poll");*/
X
Xiaoyu Wang 已提交
1503 1504 1505 1506 1507 1508
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
      if (vgStatus != TMQ_VG_STATUS__IDLE) {
L
Liu Jicong 已提交
1509
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
L
Liu Jicong 已提交
1510 1511
        tscTrace("consumer:%" PRId64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId,
                 vgSkipCnt);
X
Xiaoyu Wang 已提交
1512
        continue;
L
Liu Jicong 已提交
1513
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1514 1515 1516 1517
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
S
Shengliang Guan 已提交
1518
        tscDebug("consumer:%" PRId64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId);
L
temp  
Liu Jicong 已提交
1519 1520
        }
#endif
X
Xiaoyu Wang 已提交
1521
      }
L
Liu Jicong 已提交
1522
      atomic_store_32(&pVg->vgSkipCnt, 0);
1523
      SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, timeout, pTopic, pVg);
X
Xiaoyu Wang 已提交
1524 1525
      if (pReq == NULL) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1526
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1527 1528
        return -1;
      }
wafwerar's avatar
wafwerar 已提交
1529
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1530
      if (pParam == NULL) {
wafwerar's avatar
wafwerar 已提交
1531
        taosMemoryFree(pReq);
X
Xiaoyu Wang 已提交
1532
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1533
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1534 1535
        return -1;
      }
1536 1537 1538
      pParam->refId = tmq->refId;
      pParam->epoch = tmq->epoch;

L
Liu Jicong 已提交
1539
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1540
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1541
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1542

1543
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1544
      if (sendInfo == NULL) {
wafwerar's avatar
wafwerar 已提交
1545 1546
        taosMemoryFree(pReq);
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1547
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1548
        tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1549 1550 1551 1552
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1553
          .pData = pReq,
L
Liu Jicong 已提交
1554
          .len = sizeof(SMqPollReq),
X
Xiaoyu Wang 已提交
1555 1556
          .handle = NULL,
      };
L
Liu Jicong 已提交
1557
      sendInfo->requestId = pReq->reqId;
X
Xiaoyu Wang 已提交
1558
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1559
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1560
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1561
      sendInfo->msgType = TDMT_VND_CONSUME;
X
Xiaoyu Wang 已提交
1562 1563

      int64_t transporterId = 0;
L
fix  
Liu Jicong 已提交
1564
      /*printf("send poll\n");*/
L
Liu Jicong 已提交
1565

1566
      char offsetFormatBuf[80];
L
Liu Jicong 已提交
1567
      tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffset);
L
Liu Jicong 已提交
1568 1569
      tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64,
               tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId);
S
Shengliang Guan 已提交
1570
      /*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/
X
Xiaoyu Wang 已提交
1571 1572 1573 1574 1575 1576 1577 1578
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
  return 0;
}

L
Liu Jicong 已提交
1579 1580
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1581
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1582 1583
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1584
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
L
Liu Jicong 已提交
1585
      tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1586
      /*tmqClearUnhandleMsg(tmq);*/
X
Xiaoyu Wang 已提交
1587 1588 1589 1590 1591 1592 1593 1594 1595 1596
      *pReset = true;
    } else {
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
1597
void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
X
Xiaoyu Wang 已提交
1598
  while (1) {
L
Liu Jicong 已提交
1599 1600 1601
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1602
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1603 1604
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
      if (rspWrapper == NULL) return NULL;
X
Xiaoyu Wang 已提交
1605 1606
    }

L
Liu Jicong 已提交
1607 1608 1609 1610 1611
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(rspWrapper);
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
      return NULL;
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1612
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1613
      tscDebug("consumer %ld actual process poll rsp", tmq->consumerId);
L
Liu Jicong 已提交
1614
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
1615
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1616
      if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1617
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1618
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1619
         * rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1620
        pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;
X
Xiaoyu Wang 已提交
1621
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1622
        if (pollRspWrapper->dataRsp.blockNum == 0) {
L
Liu Jicong 已提交
1623 1624
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1625 1626
          continue;
        }
L
Liu Jicong 已提交
1627
        // build rsp
L
Liu Jicong 已提交
1628
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1629
        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1630
        return pRsp;
X
Xiaoyu Wang 已提交
1631
      } else {
L
Liu Jicong 已提交
1632 1633 1634 1635 1636 1637 1638
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
                 pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
        taosFreeQitem(pollRspWrapper);
      }
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1639
      if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1640
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1641
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1642
         * rspMsg->msg.rspOffset);*/
wmmhello's avatar
wmmhello 已提交
1643
        pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset;
L
Liu Jicong 已提交
1644 1645
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // build rsp
L
Liu Jicong 已提交
1646
        SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1647 1648 1649 1650
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
L
Liu Jicong 已提交
1651
                 pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1652
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1653
      }
L
Liu Jicong 已提交
1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
      if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
         * rspMsg->msg.rspOffset);*/
        pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset;
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        if (pollRspWrapper->taosxRsp.blockNum == 0) {
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
          continue;
        }
wmmhello's avatar
wmmhello 已提交
1669

L
Liu Jicong 已提交
1670
        // build rsp
wmmhello's avatar
wmmhello 已提交
1671
        void* pRsp = NULL;
L
Liu Jicong 已提交
1672
        if (pollRspWrapper->taosxRsp.createTableNum == 0) {
wmmhello's avatar
wmmhello 已提交
1673
          pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1674
        } else {
wmmhello's avatar
wmmhello 已提交
1675 1676
          pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper);
        }
L
Liu Jicong 已提交
1677 1678 1679 1680 1681 1682 1683
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
                 pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
        taosFreeQitem(pollRspWrapper);
      }
X
Xiaoyu Wang 已提交
1684
    } else {
L
fix  
Liu Jicong 已提交
1685
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1686
      bool reset = false;
L
Liu Jicong 已提交
1687 1688
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1689
      if (pollIfReset && reset) {
S
Shengliang Guan 已提交
1690
        tscDebug("consumer:%" PRId64 ", reset and repoll", tmq->consumerId);
1691
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
1692 1693 1694 1695 1696
      }
    }
  }
}

1697
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1698
  /*tscDebug("call poll1");*/
L
Liu Jicong 已提交
1699 1700
  void*   rspObj;
  int64_t startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1701

1702 1703 1704
#if 0
  tmqHandleAllDelayedTask(tmq);
  tmqPollImpl(tmq, timeout);
1705
  rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1706 1707
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1708
  }
1709
#endif
X
Xiaoyu Wang 已提交
1710

1711
  // in no topic status, delayed task also need to be processed
L
Liu Jicong 已提交
1712
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1713 1714 1715
    return NULL;
  }

L
Liu Jicong 已提交
1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
    int32_t retryCnt = 0;
    while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
      if (retryCnt++ > 10) {
        return NULL;
      }
      tscDebug("consumer not ready, retry");
      taosMsleep(500);
    }
  }

X
Xiaoyu Wang 已提交
1727
  while (1) {
L
Liu Jicong 已提交
1728
    tmqHandleAllDelayedTask(tmq);
L
Liu Jicong 已提交
1729 1730 1731 1732
    if (tmqPollImpl(tmq, timeout) < 0) {
      tscDebug("return since poll err");
      /*return NULL;*/
    }
L
Liu Jicong 已提交
1733

1734
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1735 1736
    if (rspObj) {
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
1737 1738
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      return NULL;
X
Xiaoyu Wang 已提交
1739
    }
1740
    if (timeout != -1) {
X
Xiaoyu Wang 已提交
1741
      int64_t endTime = taosGetTimestampMs();
1742
      int64_t leftTime = endTime - startTime;
1743
      if (leftTime > timeout) {
S
Shengliang Guan 已提交
1744
        tscDebug("consumer:%" PRId64 ", (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
X
Xiaoyu Wang 已提交
1745 1746
        return NULL;
      }
1747
      tsem_timewait(&tmq->rspSem, leftTime * 1000);
L
Liu Jicong 已提交
1748 1749 1750
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
      tsem_timewait(&tmq->rspSem, 500 * 1000);
X
Xiaoyu Wang 已提交
1751 1752 1753 1754
    }
  }
}

L
Liu Jicong 已提交
1755
int32_t tmq_consumer_close(tmq_t* tmq) {
1756
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
L
Liu Jicong 已提交
1757 1758
    int32_t rsp = tmq_commit_sync(tmq, NULL);
    if (rsp != 0) {
L
Liu Jicong 已提交
1759
      return rsp;
1760 1761
    }

L
Liu Jicong 已提交
1762
    int32_t     retryCnt = 0;
1763
    tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
1764 1765 1766 1767 1768 1769 1770 1771 1772 1773
    while (1) {
      rsp = tmq_subscribe(tmq, lst);
      if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
        break;
      } else {
        retryCnt++;
        taosMsleep(500);
      }
    }

1774
    tmq_list_destroy(lst);
1775

L
Liu Jicong 已提交
1776 1777
    /*return rsp;*/
    return 0;
L
Liu Jicong 已提交
1778
  }
1779
  taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
L
Liu Jicong 已提交
1780
  return 0;
1781
}
L
Liu Jicong 已提交
1782

L
Liu Jicong 已提交
1783 1784
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
1785
    return "success";
L
Liu Jicong 已提交
1786
  } else if (err == -1) {
L
Liu Jicong 已提交
1787 1788 1789
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
1790 1791
  }
}
L
Liu Jicong 已提交
1792

L
Liu Jicong 已提交
1793 1794 1795 1796
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
wmmhello's avatar
wmmhello 已提交
1797 1798
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DELETE) {
1799
      return TMQ_RES_DATA;
wmmhello's avatar
wmmhello 已提交
1800
    }
L
Liu Jicong 已提交
1801
    return TMQ_RES_TABLE_META;
1802 1803
  } else if (TD_RES_TMQ_METADATA(res)) {
    return TMQ_RES_METADATA;
L
Liu Jicong 已提交
1804 1805 1806 1807 1808
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
1809
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
1810 1811
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
1812
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1813 1814 1815
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
1816 1817 1818
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1819 1820 1821 1822 1823
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1824 1825 1826 1827
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1828 1829 1830
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
1831 1832 1833
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1834 1835 1836 1837 1838
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1839 1840 1841 1842
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
1843 1844 1845
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
1846
  } else if (TD_RES_TMQ_METADATA(res)) {
1847 1848
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
1849 1850 1851 1852
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
1853 1854 1855 1856 1857 1858 1859 1860

const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
1861
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
1862 1863
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
L
Liu Jicong 已提交
1864 1865 1866
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
1867
    }
L
Liu Jicong 已提交
1868 1869
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
  }
L
Liu Jicong 已提交
1870 1871
  return NULL;
}
1872

L
Liu Jicong 已提交
1873
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
1874
  //
L
Liu Jicong 已提交
1875
  tmqCommitInner(tmq, msg, 0, 1, cb, param);
L
Liu Jicong 已提交
1876 1877
}

1878 1879
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) {
  //
L
Liu Jicong 已提交
1880
  return tmqCommitInner(tmq, msg, 0, 0, NULL, NULL);
1881
}