clientTmq.c 57.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "cJSON.h"
17 18 19
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22 23
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
24
#include "tqueue.h"
25
#include "tref.h"
L
Liu Jicong 已提交
26 27
#include "ttimer.h"

L
Liu Jicong 已提交
28 29 30
int32_t tmqAskEp(tmq_t* tmq, bool async);

typedef struct {
31 32 33
  int8_t  inited;
  tmr_h   timer;
  int32_t rsetId;
L
Liu Jicong 已提交
34 35 36
} SMqMgmt;

static SMqMgmt tmqMgmt = {0};
37

L
Liu Jicong 已提交
38 39 40 41 42 43
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
44 45 46
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
47 48
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
49
struct tmq_list_t {
L
Liu Jicong 已提交
50
  SArray container;
L
Liu Jicong 已提交
51
};
L
Liu Jicong 已提交
52

L
Liu Jicong 已提交
53
struct tmq_conf_t {
54 55 56 57 58
  char    clientId[256];
  char    groupId[TSDB_CGROUP_LEN];
  int8_t  autoCommit;
  int8_t  resetOffset;
  int8_t  withTbName;
L
Liu Jicong 已提交
59 60
  int8_t  snapEnable;
  int32_t snapBatchSize;
61 62 63

  bool hbBgEnable;

64 65 66 67 68
  uint16_t       port;
  int32_t        autoCommitInterval;
  char*          ip;
  char*          user;
  char*          pass;
69
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
70
  void*          commitCbUserParam;
L
Liu Jicong 已提交
71 72 73
};

struct tmq_t {
74
  int64_t refId;
L
Liu Jicong 已提交
75
  // conf
76 77 78 79 80 81 82 83 84 85 86
  char    groupId[TSDB_CGROUP_LEN];
  char    clientId[256];
  int8_t  withTbName;
  int8_t  useSnapshot;
  int8_t  autoCommit;
  int32_t autoCommitInterval;
  int32_t resetOffsetCfg;
  int64_t consumerId;

  bool hbBgEnable;

L
Liu Jicong 已提交
87 88
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
89 90 91 92

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
93 94
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
95
  int32_t epSkipCnt;
L
Liu Jicong 已提交
96
#endif
L
Liu Jicong 已提交
97 98
  int64_t pollCnt;

L
Liu Jicong 已提交
99
  // timer
100 101
  tmr_h hbLiveTimer;
  tmr_h epTimer;
L
Liu Jicong 已提交
102 103 104
  tmr_h reportTimer;
  tmr_h commitTimer;

L
Liu Jicong 已提交
105 106 107 108
  // connection
  STscObj* pTscObj;

  // container
L
Liu Jicong 已提交
109
  SArray*     clientTopics;  // SArray<SMqClientTopic>
L
Liu Jicong 已提交
110
  STaosQueue* mqueue;        // queue of rsp
L
Liu Jicong 已提交
111
  STaosQall*  qall;
L
Liu Jicong 已提交
112 113 114 115
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit

  // ctl
  tsem_t rspSem;
L
Liu Jicong 已提交
116 117
};

X
Xiaoyu Wang 已提交
118 119 120 121 122 123 124 125
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
126
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
127
  TMQ_CONSUMER_STATUS__RECOVER,
L
Liu Jicong 已提交
128 129
};

L
Liu Jicong 已提交
130
enum {
131
  TMQ_DELAYED_TASK__ASK_EP = 1,
L
Liu Jicong 已提交
132 133 134 135
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

L
Liu Jicong 已提交
136
typedef struct {
137 138 139
  // statistics
  int64_t pollCnt;
  // offset
L
Liu Jicong 已提交
140 141
  STqOffsetVal committedOffset;
  STqOffsetVal currentOffset;
L
Liu Jicong 已提交
142
  // connection info
143
  int32_t vgId;
X
Xiaoyu Wang 已提交
144
  int32_t vgStatus;
L
Liu Jicong 已提交
145
  int32_t vgSkipCnt;
146 147 148
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
149
typedef struct {
150
  // subscribe info
151 152
  char topicName[TSDB_TOPIC_FNAME_LEN];
  char db[TSDB_DB_FNAME_LEN];
L
Liu Jicong 已提交
153 154 155

  SArray* vgs;  // SArray<SMqClientVg>

L
Liu Jicong 已提交
156
  SSchemaWrapper schema;
157 158
} SMqClientTopic;

L
Liu Jicong 已提交
159 160 161 162 163
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
164
  union {
L
Liu Jicong 已提交
165 166
    SMqDataRsp dataRsp;
    SMqMetaRsp metaRsp;
L
Liu Jicong 已提交
167
    STaosxRsp  taosxRsp;
L
Liu Jicong 已提交
168
  };
L
Liu Jicong 已提交
169 170
} SMqPollRspWrapper;

L
Liu Jicong 已提交
171
typedef struct {
172 173
  int64_t refId;
  int32_t epoch;
L
Liu Jicong 已提交
174 175
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
176
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
177

L
Liu Jicong 已提交
178
typedef struct {
179 180
  int64_t refId;
  int32_t epoch;
L
Liu Jicong 已提交
181
  int32_t code;
L
Liu Jicong 已提交
182
  int32_t async;
X
Xiaoyu Wang 已提交
183
  tsem_t  rspSem;
184 185
} SMqAskEpCbParam;

L
Liu Jicong 已提交
186
typedef struct {
187 188
  int64_t         refId;
  int32_t         epoch;
L
Liu Jicong 已提交
189
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
190
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
191
  int32_t         vgId;
L
Liu Jicong 已提交
192
  tsem_t          rspSem;
X
Xiaoyu Wang 已提交
193
} SMqPollCbParam;
194

195
typedef struct {
196 197
  int64_t        refId;
  int32_t        epoch;
L
Liu Jicong 已提交
198 199
  int8_t         automatic;
  int8_t         async;
L
Liu Jicong 已提交
200 201
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
L
Liu Jicong 已提交
202
  int32_t        rspErr;
203
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
204 205 206 207
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
  void*  userParam;
  tsem_t rspSem;
208 209 210 211 212
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
  STqOffset*           pOffset;
L
Liu Jicong 已提交
213 214
  /*char                 topicName[TSDB_TOPIC_FNAME_LEN];*/
  /*int32_t              vgId;*/
215
} SMqCommitCbParam;
216

217
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
218
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
219
  conf->withTbName = false;
L
Liu Jicong 已提交
220
  conf->autoCommit = true;
L
Liu Jicong 已提交
221
  conf->autoCommitInterval = 5000;
X
Xiaoyu Wang 已提交
222
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
223
  conf->hbBgEnable = true;
224 225 226
  return conf;
}

L
Liu Jicong 已提交
227
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
228 229 230 231 232 233
  if (conf) {
    if (conf->ip) taosMemoryFree(conf->ip);
    if (conf->user) taosMemoryFree(conf->user);
    if (conf->pass) taosMemoryFree(conf->pass);
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
234 235 236
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
237
  if (strcmp(key, "group.id") == 0) {
L
Liu Jicong 已提交
238
    tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN);
L
Liu Jicong 已提交
239
    return TMQ_CONF_OK;
240
  }
L
Liu Jicong 已提交
241

242
  if (strcmp(key, "client.id") == 0) {
L
Liu Jicong 已提交
243
    tstrncpy(conf->clientId, value, 256);
L
Liu Jicong 已提交
244 245
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
246

L
Liu Jicong 已提交
247 248
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
249
      conf->autoCommit = true;
L
Liu Jicong 已提交
250 251
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
252
      conf->autoCommit = false;
L
Liu Jicong 已提交
253 254 255 256
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
257
  }
L
Liu Jicong 已提交
258

L
Liu Jicong 已提交
259 260 261 262 263
  if (strcmp(key, "auto.commit.interval.ms") == 0) {
    conf->autoCommitInterval = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
264 265 266 267 268 269 270 271 272 273 274 275 276 277
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
278

279 280
  if (strcmp(key, "msg.with.table.name") == 0) {
    if (strcmp(value, "true") == 0) {
281
      conf->withTbName = true;
L
Liu Jicong 已提交
282
      return TMQ_CONF_OK;
283
    } else if (strcmp(value, "false") == 0) {
284
      conf->withTbName = false;
L
Liu Jicong 已提交
285
      return TMQ_CONF_OK;
286 287 288 289 290
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
291
  if (strcmp(key, "experimental.snapshot.enable") == 0) {
L
Liu Jicong 已提交
292
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
293
      conf->snapEnable = true;
294 295
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
296
      conf->snapEnable = false;
297 298 299 300 301 302
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
303 304 305 306 307
  if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
    conf->snapBatchSize = atoi(value);
    return TMQ_CONF_OK;
  }

308 309 310
  if (strcmp(key, "enable.heartbeat.background") == 0) {
    if (strcmp(value, "true") == 0) {
      conf->hbBgEnable = true;
L
Liu Jicong 已提交
311 312
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
313
      conf->hbBgEnable = false;
L
Liu Jicong 已提交
314 315 316 317
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
318
    return TMQ_CONF_OK;
L
Liu Jicong 已提交
319 320
  }

L
Liu Jicong 已提交
321
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
322 323 324
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
325
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
326 327 328
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
329
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
330 331 332
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
333
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
334 335 336
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
337
  if (strcmp(key, "td.connect.db") == 0) {
338
    /*conf->db = strdup(value);*/
L
Liu Jicong 已提交
339 340 341
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
342
  return TMQ_CONF_UNKNOWN;
343 344 345
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
346 347
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
348 349
}

L
Liu Jicong 已提交
350 351
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
352 353 354 355 356
  if (src == NULL || src[0] == 0) return -1;
  char* topic = strdup(src);
  if (topic[0] != '`') {
    strtolower(topic, src);
  }
L
fix  
Liu Jicong 已提交
357
  if (taosArrayPush(container, &topic) == NULL) return -1;
358 359 360
  return 0;
}

L
Liu Jicong 已提交
361
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
362
  SArray* container = &list->container;
L
Liu Jicong 已提交
363
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
364 365
}

L
Liu Jicong 已提交
366 367 368 369 370 371 372 373 374 375
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

L
Liu Jicong 已提交
376 377 378 379
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411
int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParamSet->refId);
  if (tmq == NULL) {
    if (!pParamSet->async) {
      tsem_destroy(&pParamSet->rspSem);
    }
    taosMemoryFree(pParamSet);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  // if no more waiting rsp
  if (pParamSet->async) {
    // call async cb func
    if (pParamSet->automatic && tmq->commitCb) {
      tmq->commitCb(tmq, pParamSet->rspErr, tmq->commitCbUserParam);
    } else if (!pParamSet->automatic && pParamSet->userCb) {
      // sem post
      pParamSet->userCb(tmq, pParamSet->rspErr, pParamSet->userParam);
    }
    taosMemoryFree(pParamSet);
  } else {
    tsem_post(&pParamSet->rspSem);
  }

#if 0
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
#endif
  return 0;
}

L
Liu Jicong 已提交
412 413 414 415 416 417 418 419
static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet) {
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
  ASSERT(waitingRspNum >= 0);
  if (waitingRspNum == 0) {
    tmqCommitDone(pParamSet);
  }
}

420 421
int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
422 423
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
  // push into array
L
Liu Jicong 已提交
424
#if 0
425 426 427 428 429
  if (code == 0) {
    taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset);
  } else {
    taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
  }
L
Liu Jicong 已提交
430
#endif
L
Liu Jicong 已提交
431

L
Liu Jicong 已提交
432
  taosMemoryFree(pParam->pOffset);
L
Liu Jicong 已提交
433
  taosMemoryFree(pBuf->pData);
L
Liu Jicong 已提交
434

S
Shengliang Guan 已提交
435
  /*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
L
Liu Jicong 已提交
436 437
   * pOffset->version);*/

L
Liu Jicong 已提交
438
  tmqCommitRspCountDown(pParamSet);
439 440 441 442

  return 0;
}

L
Liu Jicong 已提交
443 444 445 446 447 448
static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pTopic, SMqCommitCbParamSet* pParamSet) {
  STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
  if (pOffset == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
449
  pOffset->val = pVg->currentOffset;
450

L
Liu Jicong 已提交
451 452 453 454
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pOffset->subKey, tmq->groupId, groupLen);
  pOffset->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);
L
Liu Jicong 已提交
455

L
Liu Jicong 已提交
456 457 458 459 460 461 462
  int32_t len;
  int32_t code;
  tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
  if (code < 0) {
    return -1;
  }
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
L
Liu Jicong 已提交
463 464 465 466
  if (buf == NULL) {
    taosMemoryFree(pOffset);
    return -1;
  }
L
Liu Jicong 已提交
467
  ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
468

L
Liu Jicong 已提交
469 470 471 472 473
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
  tEncodeSTqOffset(&encoder, pOffset);
L
Liu Jicong 已提交
474
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
475 476

  // build param
477
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
478
  if (pParam == NULL) {
L
Liu Jicong 已提交
479
    taosMemoryFree(pOffset);
L
Liu Jicong 已提交
480 481 482
    taosMemoryFree(buf);
    return -1;
  }
L
Liu Jicong 已提交
483 484 485 486 487 488
  pParam->params = pParamSet;
  pParam->pOffset = pOffset;

  // build send info
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
L
Liu Jicong 已提交
489
    taosMemoryFree(pOffset);
L
Liu Jicong 已提交
490 491
    taosMemoryFree(buf);
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
492 493 494 495 496 497 498 499
    return -1;
  }
  pMsgSendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = sizeof(SMsgHead) + len,
      .handle = NULL,
  };

S
Shengliang Guan 已提交
500 501
  tscDebug("consumer:%" PRId64 ", commit offset of %s on vgId:%d, offset is %" PRId64, tmq->consumerId, pOffset->subKey,
           pVg->vgId, pOffset->val.version);
L
Liu Jicong 已提交
502 503

  // TODO: put into cb
L
Liu Jicong 已提交
504
  pVg->committedOffset = pVg->currentOffset;
L
Liu Jicong 已提交
505 506 507 508

  pMsgSendInfo->requestId = generateRequestId();
  pMsgSendInfo->requestObjRefId = 0;
  pMsgSendInfo->param = pParam;
L
Liu Jicong 已提交
509
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
510
  pMsgSendInfo->fp = tmqCommitCb;
L
Liu Jicong 已提交
511
  pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET;
L
Liu Jicong 已提交
512 513
  // send msg

L
Liu Jicong 已提交
514 515 516
  atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
  atomic_add_fetch_32(&pParamSet->totalRspNum, 1);

L
Liu Jicong 已提交
517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
  return 0;
}

int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
  char*   topic;
  int32_t vgId;
  ASSERT(msg != NULL);
  if (TD_RES_TMQ(msg)) {
    SMqRspObj* pRspObj = (SMqRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
  } else if (TD_RES_TMQ_META(msg)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
    topic = pMetaRspObj->topic;
    vgId = pMetaRspObj->vgId;
L
Liu Jicong 已提交
534
  } else if (TD_RES_TMQ_METADATA(msg)) {
535 536 537
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
L
Liu Jicong 已提交
538 539 540 541 542 543 544 545 546
  } else {
    return TSDB_CODE_TMQ_INVALID_MSG;
  }

  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
547 548
  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;
L
Liu Jicong 已提交
549 550 551 552 553 554
  pParamSet->automatic = 0;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

L
Liu Jicong 已提交
555 556
  int32_t code = -1;

L
Liu Jicong 已提交
557 558 559 560 561 562 563
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(pTopic->topicName, topic) != 0) continue;
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      if (pVg->vgId != vgId) continue;

L
Liu Jicong 已提交
564
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
L
Liu Jicong 已提交
565
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
L
Liu Jicong 已提交
566 567
          tsem_destroy(&pParamSet->rspSem);
          taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
568
          goto FAIL;
L
Liu Jicong 已提交
569
        }
L
Liu Jicong 已提交
570
        goto HANDLE_RSP;
L
Liu Jicong 已提交
571 572
      }
    }
L
Liu Jicong 已提交
573
  }
L
Liu Jicong 已提交
574

L
Liu Jicong 已提交
575 576 577 578
HANDLE_RSP:
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
579 580 581
    return 0;
  }

L
Liu Jicong 已提交
582 583 584 585
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
L
Liu Jicong 已提交
586
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
587 588 589 590 591 592 593 594 595 596 597 598
    return code;
  } else {
    code = 0;
  }

FAIL:
  if (code != 0 && async) {
    userCb(tmq, code, userParam);
  }
  return 0;
}

L
Liu Jicong 已提交
599 600
static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                                     void* userParam) {
L
Liu Jicong 已提交
601 602
  int32_t code = -1;

603 604
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
L
Liu Jicong 已提交
605 606 607 608 609 610 611 612
    code = TSDB_CODE_OUT_OF_MEMORY;
    if (async) {
      if (automatic) {
        tmq->commitCb(tmq, code, tmq->commitCbUserParam);
      } else {
        userCb(tmq, code, userParam);
      }
    }
613 614
    return -1;
  }
615 616 617 618

  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;

619 620 621 622 623 624
  pParamSet->automatic = automatic;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

625 626 627
  // init as 1 to prevent concurrency issue
  pParamSet->waitingRspNum = 1;

628 629
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
630

S
Shengliang Guan 已提交
631
    tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgNum %d", tmq->consumerId, pTopic->topicName,
L
Liu Jicong 已提交
632 633
             (int32_t)taosArrayGetSize(pTopic->vgs));

634
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
L
Liu Jicong 已提交
635 636
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);

S
Shengliang Guan 已提交
637 638
      tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgId:%d", tmq->consumerId, pTopic->topicName,
               pVg->vgId);
L
Liu Jicong 已提交
639

L
Liu Jicong 已提交
640
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
641
        tscDebug("consumer: %" PRId64 ", vg:%d, current %" PRId64 ", committed %" PRId64 "", tmq->consumerId, pVg->vgId,
L
Liu Jicong 已提交
642
                 pVg->currentOffset.version, pVg->committedOffset.version);
L
Liu Jicong 已提交
643 644 645
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          continue;
        }
646 647 648 649
      }
    }
  }

L
Liu Jicong 已提交
650
  // no request is sent
L
Liu Jicong 已提交
651 652 653 654 655 656
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
    return 0;
  }

L
Liu Jicong 已提交
657 658
  // count down since waiting rsp num init as 1
  tmqCommitRspCountDown(pParamSet);
659

660 661 662 663
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
664
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
665
#if 0
666 667
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
L
Liu Jicong 已提交
668
#endif
L
Liu Jicong 已提交
669
  }
670

L
Liu Jicong 已提交
671 672 673 674 675 676 677 678 679 680
  return code;
}

int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                       void* userParam) {
  if (msg) {
    return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
  } else {
    return tmqCommitConsumerImpl(tmq, automatic, async, userCb, userParam);
  }
681 682
}

683
void tmqAssignAskEpTask(void* param, void* tmrId) {
684 685 686 687 688 689 690 691 692
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
    *pTaskType = TMQ_DELAYED_TASK__ASK_EP;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
693 694 695
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
696 697 698 699 700 701 702 703 704
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
    *pTaskType = TMQ_DELAYED_TASK__COMMIT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
705 706 707
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
708 709 710 711 712 713 714 715 716
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
    *pTaskType = TMQ_DELAYED_TASK__REPORT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
717 718
}

719 720 721 722 723 724
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
  if (pMsg && pMsg->pData) taosMemoryFree(pMsg->pData);
  return 0;
}

void tmqSendHbReq(void* param, void* tmrId) {
725 726 727
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq == NULL) {
L
Liu Jicong 已提交
728
    taosMemoryFree(param);
729 730
    return;
  }
D
dapan1121 已提交
731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750

  SMqHbReq req = {0};
  req.consumerId = tmq->consumerId;
  req.epoch = tmq->epoch;

  int32_t      tlen = tSerializeSMqHbReq(NULL, 0, &req);
  if (tlen < 0) {
    tscError("tSerializeSMqHbReq failed");
    return;
  }
  void *pReq = taosMemoryCalloc(1, tlen);
  if (tlen < 0) {
    tscError("failed to malloc MqHbReq msg, size:%d", tlen);
    return;
  }
  if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) {
    tscError("tSerializeSMqHbReq %d failed", tlen);
    taosMemoryFree(pReq);
    return;
  }
751 752 753 754

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
755
    goto OVER;
756 757 758
  }
  sendInfo->msgInfo = (SDataBuf){
      .pData = pReq,
D
dapan1121 已提交
759
      .len = tlen,
760 761 762 763 764 765 766
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = NULL;
  sendInfo->fp = tmqHbCb;
L
Liu Jicong 已提交
767
  sendInfo->msgType = TDMT_MND_TMQ_HB;
768 769 770 771 772 773 774

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

OVER:
775
  taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer);
776 777
}

L
Liu Jicong 已提交
778 779 780 781 782 783 784 785
int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
  STaosQall* qall = taosAllocateQall();
  taosReadAllQitems(tmq->delayedTask, qall);
  while (1) {
    int8_t* pTaskType = NULL;
    taosGetQitem(qall, (void**)&pTaskType);
    if (pTaskType == NULL) break;

786
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
L
Liu Jicong 已提交
787
      tmqAskEp(tmq, true);
788 789 790 791 792

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
      *pRefId = tmq->refId;

      taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &tmq->epTimer);
L
Liu Jicong 已提交
793
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
L
Liu Jicong 已提交
794
      tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
795 796 797 798 799

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
      *pRefId = tmq->refId;

      taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId, tmqMgmt.timer, &tmq->commitTimer);
L
Liu Jicong 已提交
800 801 802 803
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    } else {
      ASSERT(0);
    }
L
Liu Jicong 已提交
804
    taosFreeQitem(pTaskType);
L
Liu Jicong 已提交
805 806 807 808 809
  }
  taosFreeQall(qall);
  return 0;
}

L
Liu Jicong 已提交
810
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
811
  SMqRspWrapper* msg = NULL;
L
Liu Jicong 已提交
812 813 814 815 816 817 818 819
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }

L
fix  
Liu Jicong 已提交
820
  msg = NULL;
L
Liu Jicong 已提交
821 822 823 824 825 826 827 828 829 830
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }
}

D
dapan1121 已提交
831
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
832 833 834 835 836
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
  tsem_post(&pParam->rspSem);
  return 0;
}
837

L
Liu Jicong 已提交
838
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
X
Xiaoyu Wang 已提交
839 840 841 842
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
843
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
844
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
845
  }
L
Liu Jicong 已提交
846
  return 0;
X
Xiaoyu Wang 已提交
847 848
}

L
Liu Jicong 已提交
849
int32_t tmq_unsubscribe(tmq_t* tmq) {
L
Liu Jicong 已提交
850 851
  int32_t     rsp;
  int32_t     retryCnt = 0;
L
Liu Jicong 已提交
852
  tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
853 854 855 856 857 858 859 860 861 862
  while (1) {
    rsp = tmq_subscribe(tmq, lst);
    if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
      break;
    } else {
      retryCnt++;
      taosMsleep(500);
    }
  }

L
Liu Jicong 已提交
863 864
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
865 866
}

867 868
void tmqFreeImpl(void* handle) {
  tmq_t* tmq = (tmq_t*)handle;
L
Liu Jicong 已提交
869

870 871 872 873
  // TODO stop timer
  if (tmq->mqueue) taosCloseQueue(tmq->mqueue);
  if (tmq->delayedTask) taosCloseQueue(tmq->delayedTask);
  if (tmq->qall) taosFreeQall(tmq->qall);
L
Liu Jicong 已提交
874

875
  tsem_destroy(&tmq->rspSem);
L
Liu Jicong 已提交
876

877 878 879
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
880
    if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
881 882 883 884 885 886
    int32_t vgSz = taosArrayGetSize(pTopic->vgs);
    taosArrayDestroy(pTopic->vgs);
  }
  taosArrayDestroy(tmq->clientTopics);
  taos_close_internal(tmq->pTscObj);
  taosMemoryFree(tmq);
L
Liu Jicong 已提交
887 888
}

L
Liu Jicong 已提交
889
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
L
Liu Jicong 已提交
890 891 892 893 894 895 896 897 898
  // init timer
  int8_t inited = atomic_val_compare_exchange_8(&tmqMgmt.inited, 0, 1);
  if (inited == 0) {
    tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
    if (tmqMgmt.timer == NULL) {
      atomic_store_8(&tmqMgmt.inited, 0);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
899
    tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
L
Liu Jicong 已提交
900 901
  }

L
Liu Jicong 已提交
902 903
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
L
Liu Jicong 已提交
904
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
905
    tscError("setting up new consumer failed since %s, consumer group %s", terrstr(), conf->groupId);
L
Liu Jicong 已提交
906 907
    return NULL;
  }
L
Liu Jicong 已提交
908

L
Liu Jicong 已提交
909 910 911 912 913
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

  ASSERT(user);
  ASSERT(pass);
L
Liu Jicong 已提交
914
  ASSERT(conf->groupId[0]);
L
Liu Jicong 已提交
915

L
Liu Jicong 已提交
916 917 918 919 920 921
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
  pTmq->delayedTask = taosOpenQueue();

  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
L
Liu Jicong 已提交
922
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
923 924
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
925 926
    goto FAIL;
  }
L
Liu Jicong 已提交
927

L
Liu Jicong 已提交
928 929
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
930 931
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
932 933
  /*pTmq->epStatus = 0;*/
  /*pTmq->epSkipCnt = 0;*/
L
Liu Jicong 已提交
934

L
Liu Jicong 已提交
935 936 937
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
938
  pTmq->withTbName = conf->withTbName;
L
Liu Jicong 已提交
939
  pTmq->useSnapshot = conf->snapEnable;
L
Liu Jicong 已提交
940
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
941
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
942 943
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
944 945
  pTmq->resetOffsetCfg = conf->resetOffset;

946 947
  pTmq->hbBgEnable = conf->hbBgEnable;

L
Liu Jicong 已提交
948
  // assign consumerId
L
Liu Jicong 已提交
949
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
950

L
Liu Jicong 已提交
951 952
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
S
Shengliang Guan 已提交
953 954
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
955 956
    goto FAIL;
  }
L
Liu Jicong 已提交
957

L
Liu Jicong 已提交
958 959 960
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
S
Shengliang Guan 已提交
961 962
    tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
             pTmq->groupId);
L
Liu Jicong 已提交
963 964 965
    tsem_destroy(&pTmq->rspSem);
    goto FAIL;
  }
L
Liu Jicong 已提交
966

967 968 969 970 971 972
  pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
  if (pTmq->refId < 0) {
    tmqFreeImpl(pTmq);
    return NULL;
  }

973
  if (pTmq->hbBgEnable) {
L
Liu Jicong 已提交
974 975
    int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
    *pRefId = pTmq->refId;
976
    pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
977 978
  }

S
Shengliang Guan 已提交
979
  tscInfo("consumer %" PRId64 " is setup, consumer group %s", pTmq->consumerId, pTmq->groupId);
L
Liu Jicong 已提交
980

981
  return pTmq;
L
Liu Jicong 已提交
982 983 984 985 986 987 988 989

FAIL:
  if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
  if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
  if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
  if (pTmq->qall) taosFreeQall(pTmq->qall);
  taosMemoryFree(pTmq);
  return NULL;
990 991
}

L
Liu Jicong 已提交
992
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
L
Liu Jicong 已提交
993 994 995
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
L
Liu Jicong 已提交
996
  SMsgSendInfo*   sendInfo = NULL;
L
Liu Jicong 已提交
997 998
  SCMSubscribeReq req = {0};
  int32_t         code = -1;
999

1000
  tscDebug("call tmq subscribe, consumer: %" PRId64 ", topic num %d", tmq->consumerId, sz);
L
Liu Jicong 已提交
1001

1002
  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
1003
  tstrncpy(req.clientId, tmq->clientId, 256);
L
Liu Jicong 已提交
1004
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
1005
  req.topicNames = taosArrayInit(sz, sizeof(void*));
L
Liu Jicong 已提交
1006
  if (req.topicNames == NULL) goto FAIL;
1007

1008 1009
  tscDebug("call tmq subscribe, consumer: %" PRId64 ", topic num %d", tmq->consumerId, sz);

L
Liu Jicong 已提交
1010 1011
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
1012 1013

    SName name = {0};
L
Liu Jicong 已提交
1014
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
1015

L
Liu Jicong 已提交
1016 1017 1018
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
1019
    }
L
Liu Jicong 已提交
1020
    tNameExtractFullName(&name, topicFName);
1021

L
Liu Jicong 已提交
1022 1023 1024
    tscDebug("subscribe topic: %s", topicFName);

    taosArrayPush(req.topicNames, &topicFName);
1025 1026
  }

L
Liu Jicong 已提交
1027 1028 1029 1030
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) goto FAIL;

1031 1032 1033
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

L
Liu Jicong 已提交
1034
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1035
  if (sendInfo == NULL) goto FAIL;
1036

X
Xiaoyu Wang 已提交
1037
  SMqSubscribeCbParam param = {
L
Liu Jicong 已提交
1038
      .rspErr = 0,
1039 1040
      .refId = tmq->refId,
      .epoch = tmq->epoch,
X
Xiaoyu Wang 已提交
1041
  };
L
Liu Jicong 已提交
1042

L
Liu Jicong 已提交
1043 1044 1045
  if (tsem_init(&param.rspSem, 0, 0) != 0) goto FAIL;

  sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1046 1047 1048 1049
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
1050

L
Liu Jicong 已提交
1051 1052
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1053 1054
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
1055
  sendInfo->msgType = TDMT_MND_TMQ_SUBSCRIBE;
L
Liu Jicong 已提交
1056

1057 1058 1059 1060 1061
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
1062 1063
  // avoid double free if msg is sent
  buf = NULL;
L
Liu Jicong 已提交
1064
  sendInfo = NULL;
L
Liu Jicong 已提交
1065

L
Liu Jicong 已提交
1066 1067
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
1068

L
Liu Jicong 已提交
1069 1070 1071
  code = param.rspErr;
  if (code != 0) goto FAIL;

L
Liu Jicong 已提交
1072
  int32_t retryCnt = 0;
L
Liu Jicong 已提交
1073
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
L
Liu Jicong 已提交
1074 1075 1076
    if (retryCnt++ > 10) {
      goto FAIL;
    }
L
fix  
Liu Jicong 已提交
1077
    tscDebug("consumer not ready, retry");
L
Liu Jicong 已提交
1078 1079
    taosMsleep(500);
  }
1080

1081 1082
  // init ep timer
  if (tmq->epTimer == NULL) {
1083 1084 1085
    int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId1 = tmq->refId;
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, pRefId1, tmqMgmt.timer);
1086
  }
L
Liu Jicong 已提交
1087 1088

  // init auto commit timer
1089
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
1090 1091 1092
    int64_t* pRefId2 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId2 = tmq->refId;
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId2, tmqMgmt.timer);
L
Liu Jicong 已提交
1093 1094
  }

L
Liu Jicong 已提交
1095 1096
  code = 0;
FAIL:
L
Liu Jicong 已提交
1097
  taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
1098
  taosMemoryFree(buf);
L
Liu Jicong 已提交
1099
  taosMemoryFree(sendInfo);
L
Liu Jicong 已提交
1100

L
Liu Jicong 已提交
1101
  return code;
1102 1103
}

L
Liu Jicong 已提交
1104
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
1105
  //
1106
  conf->commitCb = cb;
L
Liu Jicong 已提交
1107
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
1108
}
1109

D
dapan1121 已提交
1110
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1111 1112
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
1113
  SMqClientTopic* pTopic = pParam->pTopic;
1114 1115 1116 1117 1118 1119

  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
  if (tmq == NULL) {
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1120
    taosMemoryFree(pMsg->pEpSet);
1121 1122 1123 1124 1125 1126
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  int32_t epoch = pParam->epoch;
  int32_t vgId = pParam->vgId;
L
Liu Jicong 已提交
1127
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
1128
  if (code != 0) {
L
Liu Jicong 已提交
1129
    tscWarn("msg discard from vgId:%d, epoch %d, since %s", vgId, epoch, terrstr());
L
Liu Jicong 已提交
1130
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1131 1132 1133 1134
    if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
      goto CREATE_MSG_FAIL;
    }
L
Liu Jicong 已提交
1135 1136 1137
    if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
      if (pRspWrapper == NULL) {
S
Shengliang Guan 已提交
1138
        tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
Liu Jicong 已提交
1139 1140 1141 1142 1143 1144 1145 1146
        goto CREATE_MSG_FAIL;
      }
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      /*pRspWrapper->vgHandle = pVg;*/
      /*pRspWrapper->topicHandle = pTopic;*/
      taosWriteQitem(tmq->mqueue, pRspWrapper);
      tsem_post(&tmq->rspSem);
    }
L
fix txn  
Liu Jicong 已提交
1147
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1148 1149
  }

X
Xiaoyu Wang 已提交
1150 1151 1152
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
1153
    // do not write into queue since updating epoch reset
S
Shengliang Guan 已提交
1154
    tscWarn("msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d", vgId, msgEpoch,
L
Liu Jicong 已提交
1155
            tmqEpoch);
1156
    tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1157
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1158
    taosMemoryFree(pMsg->pEpSet);
X
Xiaoyu Wang 已提交
1159 1160 1161 1162
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
S
Shengliang Guan 已提交
1163
    tscWarn("mismatch rsp from vgId:%d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
X
Xiaoyu Wang 已提交
1164 1165
  }

L
Liu Jicong 已提交
1166 1167 1168
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

1169
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1170
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1171
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1172
    taosMemoryFree(pMsg->pEpSet);
S
Shengliang Guan 已提交
1173
    tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
fix txn  
Liu Jicong 已提交
1174
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1175
  }
L
Liu Jicong 已提交
1176

L
Liu Jicong 已提交
1177
  pRspWrapper->tmqRspType = rspType;
L
Liu Jicong 已提交
1178 1179
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
1180

L
Liu Jicong 已提交
1181
  if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1182 1183 1184
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
wmmhello's avatar
wmmhello 已提交
1185
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1186
    memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1187 1188 1189 1190 1191 1192

    tscDebug("consumer:%" PRId64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d",
             tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version,
             rspType);

  } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
1193 1194 1195 1196
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqMetaRsp(&decoder, &pRspWrapper->metaRsp);
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1197
    memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1198 1199 1200 1201 1202 1203 1204 1205
  } else if (rspType == TMQ_MSG_TYPE__TAOSX_RSP) {
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp);
    tDecoderClear(&decoder);
    memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead));
  } else {
    ASSERT(0);
L
Liu Jicong 已提交
1206
  }
L
Liu Jicong 已提交
1207

L
Liu Jicong 已提交
1208
  taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1209
  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
1210

L
Liu Jicong 已提交
1211
  taosWriteQitem(tmq->mqueue, pRspWrapper);
1212
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1213

L
Liu Jicong 已提交
1214
  return 0;
L
fix txn  
Liu Jicong 已提交
1215
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1216
  if (epoch == tmq->epoch) {
L
Liu Jicong 已提交
1217 1218
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
1219
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1220
  return -1;
1221 1222
}

L
Liu Jicong 已提交
1223
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
1224 1225 1226 1227
  bool set = false;

  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
S
Shengliang Guan 已提交
1228
  tscDebug("consumer:%" PRId64 ", update ep epoch %d to epoch %d, topic num:%d", tmq->consumerId, tmq->epoch, epoch,
1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246
           topicNumGet);

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
S
Shengliang Guan 已提交
1247
      tscDebug("consumer:%" PRId64 ", new vg num: %d", tmq->consumerId, vgNumCur);
1248 1249 1250
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
        sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
L
Liu Jicong 已提交
1251
        char buf[80];
L
Liu Jicong 已提交
1252
        tFormatOffset(buf, 80, &pVgCur->currentOffset);
L
Liu Jicong 已提交
1253 1254
        tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch,
                 pVgCur->vgId, vgKey, buf);
L
Liu Jicong 已提交
1255
        taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal));
1256 1257 1258 1259 1260 1261 1262 1263
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
    topic.schema = pTopicEp->schema;
L
Liu Jicong 已提交
1264 1265
    pTopicEp->schema.nCols = 0;
    pTopicEp->schema.pSchema = NULL;
1266
    tstrncpy(topic.topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
1267 1268
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);

S
Shengliang Guan 已提交
1269
    tscDebug("consumer:%" PRId64 ", update topic: %s", tmq->consumerId, topic.topicName);
1270 1271 1272 1273 1274 1275

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
L
Liu Jicong 已提交
1276 1277
      STqOffsetVal* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      STqOffsetVal  offsetNew = {.type = tmq->resetOffsetCfg};
1278
      if (pOffset != NULL) {
L
Liu Jicong 已提交
1279
        offsetNew = *pOffset;
1280 1281 1282 1283
      }

      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1284
          .currentOffset = offsetNew,
1285 1286 1287 1288 1289 1290 1291 1292 1293 1294
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
          .vgSkipCnt = 0,
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
    taosArrayPush(newTopics, &topic);
  }
1295 1296 1297 1298
  if (tmq->clientTopics) {
    int32_t sz = taosArrayGetSize(tmq->clientTopics);
    for (int32_t i = 0; i < sz; i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
1299
      if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
1300 1301
      int32_t vgSz = taosArrayGetSize(pTopic->vgs);
      taosArrayDestroy(pTopic->vgs);
L
Liu Jicong 已提交
1302
    }
1303
    taosArrayDestroy(tmq->clientTopics);
X
Xiaoyu Wang 已提交
1304
  }
L
Liu Jicong 已提交
1305
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
1306
  tmq->clientTopics = newTopics;
1307

1308 1309 1310 1311
  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
1312

X
Xiaoyu Wang 已提交
1313 1314 1315 1316
  atomic_store_32(&tmq->epoch, epoch);
  return set;
}

D
dapan1121 已提交
1317
int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
1318
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1319
  int8_t           async = pParam->async;
1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332
  tmq_t*           tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);

  if (tmq == NULL) {
    if (!async) {
      tsem_destroy(&pParam->rspSem);
    } else {
      taosMemoryFree(pParam);
    }
    taosMemoryFree(pMsg->pData);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

L
Liu Jicong 已提交
1333
  pParam->code = code;
1334
  if (code != 0) {
L
Liu Jicong 已提交
1335 1336
    tscError("consumer:%" PRId64 ", get topic endpoint error, not ready, wait:%d, code %x", tmq->consumerId,
             pParam->async, code);
L
Liu Jicong 已提交
1337
    goto END;
1338
  }
L
Liu Jicong 已提交
1339

L
Liu Jicong 已提交
1340
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1341
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1342
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1343 1344
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
S
Shengliang Guan 已提交
1345
  tscDebug("consumer:%" PRId64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1346 1347
  if (head->epoch <= epoch) {
    goto END;
1348
  }
L
Liu Jicong 已提交
1349

L
Liu Jicong 已提交
1350
  if (!async) {
L
Liu Jicong 已提交
1351 1352
    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
S
Shengliang Guan 已提交
1353 1354
    /*printf("rsp epoch %" PRId64 " sz %" PRId64 "\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %" PRId64 " sz %" PRId64 "\n", tmq->epoch, tmq->clientTopics->size);*/
L
Liu Jicong 已提交
1355
    tmqUpdateEp(tmq, head->epoch, &rsp);
L
Liu Jicong 已提交
1356
    tDeleteSMqAskEpRsp(&rsp);
X
Xiaoyu Wang 已提交
1357
  } else {
1358
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1359
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1360
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1361 1362
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1363
    }
L
Liu Jicong 已提交
1364 1365 1366
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1367
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1368

L
Liu Jicong 已提交
1369
    taosWriteQitem(tmq->mqueue, pWrapper);
1370
    tsem_post(&tmq->rspSem);
1371
  }
L
Liu Jicong 已提交
1372 1373

END:
L
Liu Jicong 已提交
1374
  /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1375
  if (!async) {
L
Liu Jicong 已提交
1376
    tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
1377 1378
  } else {
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
1379
  }
L
Liu Jicong 已提交
1380
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1381
  return code;
1382 1383
}

L
Liu Jicong 已提交
1384
int32_t tmqAskEp(tmq_t* tmq, bool async) {
L
Liu Jicong 已提交
1385
  int32_t code = 0;
L
Liu Jicong 已提交
1386
#if 0
L
Liu Jicong 已提交
1387
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
L
fix txn  
Liu Jicong 已提交
1388
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
1389
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
S
Shengliang Guan 已提交
1390
    tscTrace("consumer:%" PRId64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
1391
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
1392
  }
L
temp  
Liu Jicong 已提交
1393
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
1394
#endif
D
dapan1121 已提交
1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412
  SMqAskEpReq req = {0};
  req.consumerId = tmq->consumerId;
  req.epoch = tmq->epoch;
  strcpy(req.cgroup, tmq->groupId);

  int32_t      tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
  if (tlen < 0) {
    tscError("tSerializeSMqAskEpReq failed");
    return -1;
  }
  void *pReq = taosMemoryCalloc(1, tlen);
  if (tlen < 0) {
    tscError("failed to malloc askEpReq msg, size:%d", tlen);
    return -1;
  }
  if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
    tscError("tSerializeSMqAskEpReq %d failed", tlen);
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
1413
    return -1;
L
Liu Jicong 已提交
1414
  }
1415

L
Liu Jicong 已提交
1416
  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
1417 1418
  if (pParam == NULL) {
    tscError("failed to malloc subscribe param");
D
dapan1121 已提交
1419
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
1420
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1421
    return -1;
L
Liu Jicong 已提交
1422
  }
1423 1424
  pParam->refId = tmq->refId;
  pParam->epoch = tmq->epoch;
L
Liu Jicong 已提交
1425
  pParam->async = async;
X
Xiaoyu Wang 已提交
1426
  tsem_init(&pParam->rspSem, 0, 0);
1427

1428
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1429 1430
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1431
    taosMemoryFree(pParam);
D
dapan1121 已提交
1432
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
1433
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1434 1435 1436 1437
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
D
dapan1121 已提交
1438
      .pData = pReq,
L
Liu Jicong 已提交
1439 1440 1441 1442 1443
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
L
Liu Jicong 已提交
1444 1445 1446
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1447
  sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
1448

L
Liu Jicong 已提交
1449 1450
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

S
Shengliang Guan 已提交
1451
  tscDebug("consumer:%" PRId64 ", ask ep", tmq->consumerId);
L
add log  
Liu Jicong 已提交
1452

L
Liu Jicong 已提交
1453 1454
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1455

L
Liu Jicong 已提交
1456
  if (!async) {
L
Liu Jicong 已提交
1457 1458 1459 1460 1461
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }
  return code;
1462 1463
}

D
dapan1121 已提交
1464
void tmqBuildConsumeReqImpl(SMqPollReq *pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1465 1466 1467
  /*strcpy(pReq->topic, pTopic->topicName);*/
  /*strcpy(pReq->cgroup, tmq->groupId);*/

L
Liu Jicong 已提交
1468 1469 1470 1471
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1472

1473
  pReq->withTbName = tmq->withTbName;
1474
  pReq->timeout = timeout;
L
Liu Jicong 已提交
1475
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1476
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1477
  /*pReq->currentOffset = reqOffset;*/
L
Liu Jicong 已提交
1478
  pReq->reqOffset = pVg->currentOffset;
L
Liu Jicong 已提交
1479
  pReq->reqId = generateRequestId();
1480

L
Liu Jicong 已提交
1481 1482
  pReq->useSnapshot = tmq->useSnapshot;

D
dapan1121 已提交
1483
  pReq->head.vgId = pVg->vgId;
1484 1485
}

L
Liu Jicong 已提交
1486 1487
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
L
Liu Jicong 已提交
1488
  pRspObj->resType = RES_TYPE__TMQ_META;
L
Liu Jicong 已提交
1489 1490 1491 1492 1493 1494 1495 1496
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

L
Liu Jicong 已提交
1497 1498 1499
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1500 1501
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1502
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1503
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1504
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
L
Liu Jicong 已提交
1505

L
Liu Jicong 已提交
1506 1507
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
L
Liu Jicong 已提交
1508
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1509 1510
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1511

L
Liu Jicong 已提交
1512
  return pRspObj;
X
Xiaoyu Wang 已提交
1513 1514
}

L
Liu Jicong 已提交
1515 1516
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
1517
  pRspObj->resType = RES_TYPE__TMQ_METADATA;
L
Liu Jicong 已提交
1518 1519 1520 1521
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;
  pRspObj->resIter = -1;
1522
  memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp));
L
Liu Jicong 已提交
1523 1524 1525

  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
1526
  if (!pWrapper->taosxRsp.withSchema) {
L
Liu Jicong 已提交
1527 1528 1529 1530 1531 1532
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }

  return pRspObj;
}

1533
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1534
  /*tscDebug("call poll");*/
X
Xiaoyu Wang 已提交
1535 1536 1537 1538 1539 1540
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
      if (vgStatus != TMQ_VG_STATUS__IDLE) {
L
Liu Jicong 已提交
1541
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
L
Liu Jicong 已提交
1542 1543
        tscTrace("consumer:%" PRId64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId,
                 vgSkipCnt);
X
Xiaoyu Wang 已提交
1544
        continue;
L
Liu Jicong 已提交
1545
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1546 1547 1548 1549
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
S
Shengliang Guan 已提交
1550
        tscDebug("consumer:%" PRId64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId);
L
temp  
Liu Jicong 已提交
1551 1552
        }
#endif
X
Xiaoyu Wang 已提交
1553
      }
L
Liu Jicong 已提交
1554
      atomic_store_32(&pVg->vgSkipCnt, 0);
D
dapan1121 已提交
1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572

      SMqPollReq req = {0};
      tmqBuildConsumeReqImpl(&req, tmq, timeout, pTopic, pVg);
      int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
      if (msgSize < 0) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        tsem_post(&tmq->rspSem);
        return -1;
      }
      char *msg = taosMemoryCalloc(1, msgSize);
      if (NULL == msg) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        tsem_post(&tmq->rspSem);
        return -1;
      }
      
      if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
        taosMemoryFree(msg);
X
Xiaoyu Wang 已提交
1573
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1574
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1575 1576
        return -1;
      }
D
dapan1121 已提交
1577
      
wafwerar's avatar
wafwerar 已提交
1578
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1579
      if (pParam == NULL) {
D
dapan1121 已提交
1580
        taosMemoryFree(msg);
X
Xiaoyu Wang 已提交
1581
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1582
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1583 1584
        return -1;
      }
1585 1586 1587
      pParam->refId = tmq->refId;
      pParam->epoch = tmq->epoch;

L
Liu Jicong 已提交
1588
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1589
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1590
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1591

1592
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1593
      if (sendInfo == NULL) {
D
dapan1121 已提交
1594
        taosMemoryFree(msg);
wafwerar's avatar
wafwerar 已提交
1595
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1596
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1597
        tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1598 1599 1600 1601
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
D
dapan1121 已提交
1602 1603
          .pData = msg,
          .len = msgSize,
X
Xiaoyu Wang 已提交
1604 1605
          .handle = NULL,
      };
D
dapan1121 已提交
1606
      sendInfo->requestId = req.reqId;
X
Xiaoyu Wang 已提交
1607
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1608
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1609
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1610
      sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
X
Xiaoyu Wang 已提交
1611 1612

      int64_t transporterId = 0;
L
fix  
Liu Jicong 已提交
1613
      /*printf("send poll\n");*/
L
Liu Jicong 已提交
1614

1615
      char offsetFormatBuf[80];
L
Liu Jicong 已提交
1616
      tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffset);
L
Liu Jicong 已提交
1617
      tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64,
D
dapan1121 已提交
1618
               tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
S
Shengliang Guan 已提交
1619
      /*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/
X
Xiaoyu Wang 已提交
1620 1621 1622 1623 1624 1625 1626 1627
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
  return 0;
}

L
Liu Jicong 已提交
1628 1629
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1630
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1631 1632
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1633
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
L
Liu Jicong 已提交
1634
      tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1635
      /*tmqClearUnhandleMsg(tmq);*/
L
Liu Jicong 已提交
1636
      tDeleteSMqAskEpRsp(rspMsg);
X
Xiaoyu Wang 已提交
1637 1638 1639 1640 1641 1642 1643 1644 1645 1646
      *pReset = true;
    } else {
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
1647
void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
X
Xiaoyu Wang 已提交
1648
  while (1) {
L
Liu Jicong 已提交
1649 1650 1651
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1652
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1653
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
L
Liu Jicong 已提交
1654 1655

      if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1656
        /*tscDebug("consumer %" PRId64 " mqueue empty", tmq->consumerId);*/
L
Liu Jicong 已提交
1657 1658
        return NULL;
      }
X
Xiaoyu Wang 已提交
1659 1660
    }

L
Liu Jicong 已提交
1661 1662 1663 1664 1665
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(rspWrapper);
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
      return NULL;
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1666
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
1667
      tscDebug("consumer %" PRId64 " actual process poll rsp", tmq->consumerId);
L
Liu Jicong 已提交
1668
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
1669
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1670
      if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1671
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1672
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1673
         * rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1674
        pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;
X
Xiaoyu Wang 已提交
1675
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1676
        if (pollRspWrapper->dataRsp.blockNum == 0) {
L
Liu Jicong 已提交
1677 1678
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1679 1680
          continue;
        }
L
Liu Jicong 已提交
1681
        // build rsp
L
Liu Jicong 已提交
1682
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1683
        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1684
        return pRsp;
X
Xiaoyu Wang 已提交
1685
      } else {
1686
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
L
Liu Jicong 已提交
1687 1688 1689 1690 1691 1692
                 pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
        taosFreeQitem(pollRspWrapper);
      }
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1693
      if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1694
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1695
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1696
         * rspMsg->msg.rspOffset);*/
wmmhello's avatar
wmmhello 已提交
1697
        pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset;
L
Liu Jicong 已提交
1698 1699
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // build rsp
L
Liu Jicong 已提交
1700
        SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1701 1702 1703
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
1704
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
L
Liu Jicong 已提交
1705
                 pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1706
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1707
      }
L
Liu Jicong 已提交
1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
      if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
         * rspMsg->msg.rspOffset);*/
        pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset;
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        if (pollRspWrapper->taosxRsp.blockNum == 0) {
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
          continue;
        }
wmmhello's avatar
wmmhello 已提交
1723

L
Liu Jicong 已提交
1724
        // build rsp
wmmhello's avatar
wmmhello 已提交
1725
        void* pRsp = NULL;
L
Liu Jicong 已提交
1726
        if (pollRspWrapper->taosxRsp.createTableNum == 0) {
wmmhello's avatar
wmmhello 已提交
1727
          pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1728
        } else {
wmmhello's avatar
wmmhello 已提交
1729 1730
          pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper);
        }
L
Liu Jicong 已提交
1731 1732 1733 1734 1735 1736 1737
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
                 pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
        taosFreeQitem(pollRspWrapper);
      }
X
Xiaoyu Wang 已提交
1738
    } else {
L
fix  
Liu Jicong 已提交
1739
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1740
      bool reset = false;
L
Liu Jicong 已提交
1741 1742
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1743
      if (pollIfReset && reset) {
S
Shengliang Guan 已提交
1744
        tscDebug("consumer:%" PRId64 ", reset and repoll", tmq->consumerId);
1745
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
1746 1747 1748 1749 1750
      }
    }
  }
}

1751
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1752
  /*tscDebug("call poll1");*/
L
Liu Jicong 已提交
1753 1754
  void*   rspObj;
  int64_t startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1755

L
Liu Jicong 已提交
1756 1757
  tscDebug("consumer:%" PRId64 ", start poll at %" PRId64, tmq->consumerId, startTime);

1758 1759 1760
#if 0
  tmqHandleAllDelayedTask(tmq);
  tmqPollImpl(tmq, timeout);
1761
  rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1762 1763
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1764
  }
1765
#endif
X
Xiaoyu Wang 已提交
1766

1767
  // in no topic status, delayed task also need to be processed
L
Liu Jicong 已提交
1768
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1769
    tscDebug("consumer:%" PRId64 ", poll return since consumer status is init", tmq->consumerId);
1770 1771 1772
    return NULL;
  }

L
Liu Jicong 已提交
1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
    int32_t retryCnt = 0;
    while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
      if (retryCnt++ > 10) {
        return NULL;
      }
      tscDebug("consumer not ready, retry");
      taosMsleep(500);
    }
  }

X
Xiaoyu Wang 已提交
1784
  while (1) {
L
Liu Jicong 已提交
1785
    tmqHandleAllDelayedTask(tmq);
L
Liu Jicong 已提交
1786 1787 1788 1789
    if (tmqPollImpl(tmq, timeout) < 0) {
      tscDebug("return since poll err");
      /*return NULL;*/
    }
L
Liu Jicong 已提交
1790

1791
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1792
    if (rspObj) {
1793
      tscDebug("consumer:%" PRId64 ", return rsp %p", tmq->consumerId, rspObj);
L
Liu Jicong 已提交
1794
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
1795
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
L
Liu Jicong 已提交
1796
      tscDebug("consumer:%" PRId64 ", return null since no committed offset", tmq->consumerId);
L
Liu Jicong 已提交
1797
      return NULL;
X
Xiaoyu Wang 已提交
1798
    }
1799
    if (timeout != -1) {
X
Xiaoyu Wang 已提交
1800
      int64_t endTime = taosGetTimestampMs();
1801
      int64_t leftTime = endTime - startTime;
1802
      if (leftTime > timeout) {
L
Liu Jicong 已提交
1803 1804
        tscDebug("consumer:%" PRId64 ", (epoch %d) timeout, no rsp, start time %" PRId64 ", end time %" PRId64,
                 tmq->consumerId, tmq->epoch, startTime, endTime);
X
Xiaoyu Wang 已提交
1805 1806
        return NULL;
      }
1807
      tsem_timewait(&tmq->rspSem, leftTime * 1000);
L
Liu Jicong 已提交
1808 1809 1810
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
      tsem_timewait(&tmq->rspSem, 500 * 1000);
X
Xiaoyu Wang 已提交
1811 1812 1813 1814
    }
  }
}

L
Liu Jicong 已提交
1815
int32_t tmq_consumer_close(tmq_t* tmq) {
1816
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
L
Liu Jicong 已提交
1817 1818
    int32_t rsp = tmq_commit_sync(tmq, NULL);
    if (rsp != 0) {
L
Liu Jicong 已提交
1819
      return rsp;
1820 1821
    }

L
Liu Jicong 已提交
1822
    int32_t     retryCnt = 0;
1823
    tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
1824 1825 1826 1827 1828 1829 1830 1831 1832 1833
    while (1) {
      rsp = tmq_subscribe(tmq, lst);
      if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
        break;
      } else {
        retryCnt++;
        taosMsleep(500);
      }
    }

1834
    tmq_list_destroy(lst);
1835

L
Liu Jicong 已提交
1836 1837
    /*return rsp;*/
    return 0;
L
Liu Jicong 已提交
1838
  }
1839
  taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
L
Liu Jicong 已提交
1840
  return 0;
1841
}
L
Liu Jicong 已提交
1842

L
Liu Jicong 已提交
1843 1844
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
1845
    return "success";
L
Liu Jicong 已提交
1846
  } else if (err == -1) {
L
Liu Jicong 已提交
1847 1848 1849
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
1850 1851
  }
}
L
Liu Jicong 已提交
1852

L
Liu Jicong 已提交
1853 1854 1855 1856
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
wmmhello's avatar
wmmhello 已提交
1857 1858
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DELETE) {
1859
      return TMQ_RES_DATA;
wmmhello's avatar
wmmhello 已提交
1860
    }
L
Liu Jicong 已提交
1861
    return TMQ_RES_TABLE_META;
1862 1863
  } else if (TD_RES_TMQ_METADATA(res)) {
    return TMQ_RES_METADATA;
L
Liu Jicong 已提交
1864 1865 1866 1867 1868
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
1869
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
1870 1871
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
1872
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1873 1874 1875
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
1876 1877 1878
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1879 1880 1881 1882 1883
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1884 1885 1886 1887
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1888 1889 1890
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
1891 1892 1893
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1894 1895 1896 1897 1898
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1899 1900 1901 1902
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
1903 1904 1905
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
1906
  } else if (TD_RES_TMQ_METADATA(res)) {
1907 1908
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
1909 1910 1911 1912
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
1913 1914 1915 1916 1917 1918 1919 1920

const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
1921
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
1922 1923
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
L
Liu Jicong 已提交
1924 1925 1926
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
1927
    }
L
Liu Jicong 已提交
1928 1929
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
  }
L
Liu Jicong 已提交
1930 1931
  return NULL;
}
1932

L
Liu Jicong 已提交
1933
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
1934
  //
L
Liu Jicong 已提交
1935
  tmqCommitInner(tmq, msg, 0, 1, cb, param);
L
Liu Jicong 已提交
1936 1937
}

1938 1939
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) {
  //
L
Liu Jicong 已提交
1940
  return tmqCommitInner(tmq, msg, 0, 0, NULL, NULL);
1941
}