clientTmq.c 60.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "cJSON.h"
17 18 19
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22 23
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
24
#include "tqueue.h"
25
#include "tref.h"
L
Liu Jicong 已提交
26 27
#include "ttimer.h"

L
Liu Jicong 已提交
28 29 30 31 32 33 34
#if 0
#undef tsem_post
#define tsem_post(x)                                         \
  tscInfo("call sem post at %s %d", __FUNCTION__, __LINE__); \
  sem_post(x)
#endif

35
struct SMqMgmt{
36 37 38
  int8_t  inited;
  tmr_h   timer;
  int32_t rsetId;
39
};
L
Liu Jicong 已提交
40

41 42 43
static TdThreadOnce tmqInit = PTHREAD_ONCE_INIT;   // initialize only once
volatile int32_t    tmqInitRes = 0;                // initialize rsp code
static struct SMqMgmt tmqMgmt = {0};
44

L
Liu Jicong 已提交
45 46 47 48 49 50
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
51 52 53
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
54 55
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
56
struct tmq_list_t {
L
Liu Jicong 已提交
57
  SArray container;
L
Liu Jicong 已提交
58
};
L
Liu Jicong 已提交
59

L
Liu Jicong 已提交
60
struct tmq_conf_t {
61 62 63 64 65
  char    clientId[256];
  char    groupId[TSDB_CGROUP_LEN];
  int8_t  autoCommit;
  int8_t  resetOffset;
  int8_t  withTbName;
L
Liu Jicong 已提交
66 67
  int8_t  snapEnable;
  int32_t snapBatchSize;
68
  bool    hbBgEnable;
69

70 71 72 73 74
  uint16_t       port;
  int32_t        autoCommitInterval;
  char*          ip;
  char*          user;
  char*          pass;
75
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
76
  void*          commitCbUserParam;
L
Liu Jicong 已提交
77 78 79
};

struct tmq_t {
80
  int64_t  refId;
L
Liu Jicong 已提交
81
  // conf
82 83 84 85 86 87 88 89 90
  char     groupId[TSDB_CGROUP_LEN];
  char     clientId[256];
  int8_t   withTbName;
  int8_t   useSnapshot;
  int8_t   autoCommit;
  int32_t  autoCommitInterval;
  int32_t  resetOffsetCfg;
  uint64_t consumerId;
  bool     hbBgEnable;
91

L
Liu Jicong 已提交
92 93
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
94 95 96 97

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
98 99
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
100
  int32_t epSkipCnt;
L
Liu Jicong 已提交
101
#endif
L
Liu Jicong 已提交
102 103
  int64_t pollCnt;

L
Liu Jicong 已提交
104
  // timer
105 106
  tmr_h hbLiveTimer;
  tmr_h epTimer;
L
Liu Jicong 已提交
107 108 109
  tmr_h reportTimer;
  tmr_h commitTimer;

L
Liu Jicong 已提交
110 111 112 113
  // connection
  STscObj* pTscObj;

  // container
L
Liu Jicong 已提交
114
  SArray*     clientTopics;  // SArray<SMqClientTopic>
L
Liu Jicong 已提交
115
  STaosQueue* mqueue;        // queue of rsp
L
Liu Jicong 已提交
116
  STaosQall*  qall;
L
Liu Jicong 已提交
117 118 119 120
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit

  // ctl
  tsem_t rspSem;
L
Liu Jicong 已提交
121 122
};

X
Xiaoyu Wang 已提交
123 124 125 126 127 128 129 130
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
131
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
132
  TMQ_CONSUMER_STATUS__RECOVER,
L
Liu Jicong 已提交
133 134
};

L
Liu Jicong 已提交
135
enum {
136
  TMQ_DELAYED_TASK__ASK_EP = 1,
L
Liu Jicong 已提交
137 138 139 140
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

L
Liu Jicong 已提交
141
typedef struct {
142 143 144
  // statistics
  int64_t pollCnt;
  // offset
L
Liu Jicong 已提交
145 146
  STqOffsetVal committedOffset;
  STqOffsetVal currentOffset;
L
Liu Jicong 已提交
147
  // connection info
148
  int32_t vgId;
X
Xiaoyu Wang 已提交
149
  int32_t vgStatus;
L
Liu Jicong 已提交
150
  int32_t vgSkipCnt;
151 152 153
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
154
typedef struct {
155
  // subscribe info
156 157 158
  char           topicName[TSDB_TOPIC_FNAME_LEN];
  char           db[TSDB_DB_FNAME_LEN];
  SArray*        vgs;  // SArray<SMqClientVg>
L
Liu Jicong 已提交
159
  SSchemaWrapper schema;
160 161
} SMqClientTopic;

L
Liu Jicong 已提交
162 163 164 165 166
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
167
  union {
L
Liu Jicong 已提交
168 169
    SMqDataRsp dataRsp;
    SMqMetaRsp metaRsp;
L
Liu Jicong 已提交
170
    STaosxRsp  taosxRsp;
L
Liu Jicong 已提交
171
  };
L
Liu Jicong 已提交
172 173
} SMqPollRspWrapper;

L
Liu Jicong 已提交
174
typedef struct {
175 176
  int64_t refId;
  int32_t epoch;
L
Liu Jicong 已提交
177 178
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
179
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
180

L
Liu Jicong 已提交
181
typedef struct {
182 183
  int64_t refId;
  int32_t epoch;
L
Liu Jicong 已提交
184
  int32_t code;
L
Liu Jicong 已提交
185
  int32_t async;
X
Xiaoyu Wang 已提交
186
  tsem_t  rspSem;
187 188
} SMqAskEpCbParam;

L
Liu Jicong 已提交
189
typedef struct {
190 191
  int64_t         refId;
  int32_t         epoch;
L
Liu Jicong 已提交
192
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
193
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
194
  int32_t         vgId;
L
Liu Jicong 已提交
195
  tsem_t          rspSem;
X
Xiaoyu Wang 已提交
196
} SMqPollCbParam;
197

198
typedef struct {
199 200
  int64_t        refId;
  int32_t        epoch;
L
Liu Jicong 已提交
201 202
  int8_t         automatic;
  int8_t         async;
L
Liu Jicong 已提交
203 204
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
L
Liu Jicong 已提交
205
  int32_t        rspErr;
206
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
207 208 209 210
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
  void*  userParam;
  tsem_t rspSem;
211 212 213 214 215
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
  STqOffset*           pOffset;
L
Liu Jicong 已提交
216 217
  /*char                 topicName[TSDB_TOPIC_FNAME_LEN];*/
  /*int32_t              vgId;*/
218
} SMqCommitCbParam;
219

220 221
static int32_t tmqAskEp(tmq_t* tmq, bool async);

222
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
223
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
224 225 226 227 228
  if (conf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return conf;
  }

229
  conf->withTbName = false;
L
Liu Jicong 已提交
230
  conf->autoCommit = true;
L
Liu Jicong 已提交
231
  conf->autoCommitInterval = 5000;
X
Xiaoyu Wang 已提交
232
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
233
  conf->hbBgEnable = true;
234

235 236 237
  return conf;
}

L
Liu Jicong 已提交
238
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
239 240 241 242 243 244
  if (conf) {
    if (conf->ip) taosMemoryFree(conf->ip);
    if (conf->user) taosMemoryFree(conf->user);
    if (conf->pass) taosMemoryFree(conf->pass);
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
245 246 247
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
248
  if (strcmp(key, "group.id") == 0) {
L
Liu Jicong 已提交
249
    tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN);
L
Liu Jicong 已提交
250
    return TMQ_CONF_OK;
251
  }
L
Liu Jicong 已提交
252

253
  if (strcmp(key, "client.id") == 0) {
L
Liu Jicong 已提交
254
    tstrncpy(conf->clientId, value, 256);
L
Liu Jicong 已提交
255 256
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
257

L
Liu Jicong 已提交
258 259
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
260
      conf->autoCommit = true;
L
Liu Jicong 已提交
261 262
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
263
      conf->autoCommit = false;
L
Liu Jicong 已提交
264 265 266 267
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
268
  }
L
Liu Jicong 已提交
269

L
Liu Jicong 已提交
270 271 272 273 274
  if (strcmp(key, "auto.commit.interval.ms") == 0) {
    conf->autoCommitInterval = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
275 276 277 278 279 280 281 282 283 284 285 286 287 288
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
289

290 291
  if (strcmp(key, "msg.with.table.name") == 0) {
    if (strcmp(value, "true") == 0) {
292
      conf->withTbName = true;
L
Liu Jicong 已提交
293
      return TMQ_CONF_OK;
294
    } else if (strcmp(value, "false") == 0) {
295
      conf->withTbName = false;
L
Liu Jicong 已提交
296
      return TMQ_CONF_OK;
297 298 299 300 301
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
302
  if (strcmp(key, "experimental.snapshot.enable") == 0) {
L
Liu Jicong 已提交
303
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
304
      conf->snapEnable = true;
305 306
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
307
      conf->snapEnable = false;
308 309 310 311 312 313
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
314 315 316 317 318
  if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
    conf->snapBatchSize = atoi(value);
    return TMQ_CONF_OK;
  }

319 320 321
  if (strcmp(key, "enable.heartbeat.background") == 0) {
    if (strcmp(value, "true") == 0) {
      conf->hbBgEnable = true;
L
Liu Jicong 已提交
322 323
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
324
      conf->hbBgEnable = false;
L
Liu Jicong 已提交
325 326 327 328
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
329
    return TMQ_CONF_OK;
L
Liu Jicong 已提交
330 331
  }

L
Liu Jicong 已提交
332
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
333 334 335
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
336
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
337 338 339
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
340
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
341 342 343
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
344
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
345 346 347
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
348
  if (strcmp(key, "td.connect.db") == 0) {
349
    /*conf->db = strdup(value);*/
L
Liu Jicong 已提交
350 351 352
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
353
  return TMQ_CONF_UNKNOWN;
354 355 356
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
357 358
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
359 360
}

L
Liu Jicong 已提交
361 362
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
363 364 365 366 367
  if (src == NULL || src[0] == 0) return -1;
  char* topic = strdup(src);
  if (topic[0] != '`') {
    strtolower(topic, src);
  }
L
fix  
Liu Jicong 已提交
368
  if (taosArrayPush(container, &topic) == NULL) return -1;
369 370 371
  return 0;
}

L
Liu Jicong 已提交
372
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
373
  SArray* container = &list->container;
L
Liu Jicong 已提交
374
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
375 376
}

L
Liu Jicong 已提交
377 378 379 380 381 382 383 384 385 386
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

L
Liu Jicong 已提交
387 388 389 390
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422
int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParamSet->refId);
  if (tmq == NULL) {
    if (!pParamSet->async) {
      tsem_destroy(&pParamSet->rspSem);
    }
    taosMemoryFree(pParamSet);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  // if no more waiting rsp
  if (pParamSet->async) {
    // call async cb func
    if (pParamSet->automatic && tmq->commitCb) {
      tmq->commitCb(tmq, pParamSet->rspErr, tmq->commitCbUserParam);
    } else if (!pParamSet->automatic && pParamSet->userCb) {
      // sem post
      pParamSet->userCb(tmq, pParamSet->rspErr, pParamSet->userParam);
    }
    taosMemoryFree(pParamSet);
  } else {
    tsem_post(&pParamSet->rspSem);
  }

#if 0
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
#endif
  return 0;
}

L
Liu Jicong 已提交
423 424
static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet) {
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
425
  ASSERT(waitingRspNum >= 0);
L
Liu Jicong 已提交
426 427 428 429 430
  if (waitingRspNum == 0) {
    tmqCommitDone(pParamSet);
  }
}

431 432
int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
433 434
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
  // push into array
L
Liu Jicong 已提交
435
#if 0
436 437 438 439 440
  if (code == 0) {
    taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset);
  } else {
    taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
  }
L
Liu Jicong 已提交
441
#endif
L
Liu Jicong 已提交
442

L
Liu Jicong 已提交
443
  taosMemoryFree(pParam->pOffset);
L
Liu Jicong 已提交
444
  taosMemoryFree(pBuf->pData);
dengyihao's avatar
dengyihao 已提交
445
  taosMemoryFree(pBuf->pEpSet);
L
Liu Jicong 已提交
446

S
Shengliang Guan 已提交
447
  /*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
L
Liu Jicong 已提交
448 449
   * pOffset->version);*/

L
Liu Jicong 已提交
450
  tmqCommitRspCountDown(pParamSet);
451 452 453 454

  return 0;
}

L
Liu Jicong 已提交
455 456 457 458 459 460
static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pTopic, SMqCommitCbParamSet* pParamSet) {
  STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
  if (pOffset == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
461
  pOffset->val = pVg->currentOffset;
462

L
Liu Jicong 已提交
463 464 465 466
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pOffset->subKey, tmq->groupId, groupLen);
  pOffset->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);
L
Liu Jicong 已提交
467

L
Liu Jicong 已提交
468 469 470 471 472 473 474
  int32_t len;
  int32_t code;
  tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
  if (code < 0) {
    return -1;
  }
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
L
Liu Jicong 已提交
475 476 477 478
  if (buf == NULL) {
    taosMemoryFree(pOffset);
    return -1;
  }
L
Liu Jicong 已提交
479
  ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
480

L
Liu Jicong 已提交
481 482 483 484 485
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
  tEncodeSTqOffset(&encoder, pOffset);
L
Liu Jicong 已提交
486
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
487 488

  // build param
489
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
490
  if (pParam == NULL) {
L
Liu Jicong 已提交
491
    taosMemoryFree(pOffset);
L
Liu Jicong 已提交
492 493 494
    taosMemoryFree(buf);
    return -1;
  }
L
Liu Jicong 已提交
495 496 497 498 499 500
  pParam->params = pParamSet;
  pParam->pOffset = pOffset;

  // build send info
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
L
Liu Jicong 已提交
501
    taosMemoryFree(pOffset);
L
Liu Jicong 已提交
502 503
    taosMemoryFree(buf);
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
504 505 506 507 508 509 510 511
    return -1;
  }
  pMsgSendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = sizeof(SMsgHead) + len,
      .handle = NULL,
  };

512
  tscDebug("consumer:0x%" PRIx64 ", commit offset of %s on vgId:%d, offset is %" PRId64, tmq->consumerId, pOffset->subKey,
S
Shengliang Guan 已提交
513
           pVg->vgId, pOffset->val.version);
L
Liu Jicong 已提交
514 515

  // TODO: put into cb
L
Liu Jicong 已提交
516
  pVg->committedOffset = pVg->currentOffset;
L
Liu Jicong 已提交
517 518 519 520

  pMsgSendInfo->requestId = generateRequestId();
  pMsgSendInfo->requestObjRefId = 0;
  pMsgSendInfo->param = pParam;
L
Liu Jicong 已提交
521
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
522
  pMsgSendInfo->fp = tmqCommitCb;
L
Liu Jicong 已提交
523
  pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET;
L
Liu Jicong 已提交
524 525
  // send msg

L
Liu Jicong 已提交
526 527 528
  atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
  atomic_add_fetch_32(&pParamSet->totalRspNum, 1);

L
Liu Jicong 已提交
529 530 531 532 533 534 535 536
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
  return 0;
}

int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
  char*   topic;
  int32_t vgId;
537
  ASSERT(msg != NULL);
L
Liu Jicong 已提交
538 539 540 541 542 543 544 545
  if (TD_RES_TMQ(msg)) {
    SMqRspObj* pRspObj = (SMqRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
  } else if (TD_RES_TMQ_META(msg)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
    topic = pMetaRspObj->topic;
    vgId = pMetaRspObj->vgId;
L
Liu Jicong 已提交
546
  } else if (TD_RES_TMQ_METADATA(msg)) {
547 548 549
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
L
Liu Jicong 已提交
550 551 552 553 554 555 556 557 558
  } else {
    return TSDB_CODE_TMQ_INVALID_MSG;
  }

  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
559 560
  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;
L
Liu Jicong 已提交
561 562 563 564 565 566
  pParamSet->automatic = 0;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

L
Liu Jicong 已提交
567 568
  int32_t code = -1;

L
Liu Jicong 已提交
569 570 571 572 573 574 575
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(pTopic->topicName, topic) != 0) continue;
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      if (pVg->vgId != vgId) continue;

L
Liu Jicong 已提交
576
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
L
Liu Jicong 已提交
577
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
L
Liu Jicong 已提交
578 579
          tsem_destroy(&pParamSet->rspSem);
          taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
580
          goto FAIL;
L
Liu Jicong 已提交
581
        }
L
Liu Jicong 已提交
582
        goto HANDLE_RSP;
L
Liu Jicong 已提交
583 584
      }
    }
L
Liu Jicong 已提交
585
  }
L
Liu Jicong 已提交
586

L
Liu Jicong 已提交
587 588 589 590
HANDLE_RSP:
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
591 592 593
    return 0;
  }

L
Liu Jicong 已提交
594 595 596 597
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
L
Liu Jicong 已提交
598
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
599 600 601 602 603 604 605 606 607 608 609 610
    return code;
  } else {
    code = 0;
  }

FAIL:
  if (code != 0 && async) {
    userCb(tmq, code, userParam);
  }
  return 0;
}

L
Liu Jicong 已提交
611 612
static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                                     void* userParam) {
L
Liu Jicong 已提交
613 614
  int32_t code = -1;

615 616
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
L
Liu Jicong 已提交
617 618 619 620 621 622 623 624
    code = TSDB_CODE_OUT_OF_MEMORY;
    if (async) {
      if (automatic) {
        tmq->commitCb(tmq, code, tmq->commitCbUserParam);
      } else {
        userCb(tmq, code, userParam);
      }
    }
625 626
    return -1;
  }
627 628 629 630

  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;

631 632 633 634 635 636
  pParamSet->automatic = automatic;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

637 638 639
  // init as 1 to prevent concurrency issue
  pParamSet->waitingRspNum = 1;

640 641
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
642

643 644 645
    int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
    tscDebug("consumer:0x%" PRIx64 ", begin commit for topic %s, vgNum %d", tmq->consumerId, pTopic->topicName,
             numOfVgroups);
L
Liu Jicong 已提交
646

647
    for (int32_t j = 0; j < numOfVgroups; j++) {
L
Liu Jicong 已提交
648 649
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);

650
      tscDebug("consumer:0x%" PRIx64 ", begin commit for topic %s, vgId:%d", tmq->consumerId, pTopic->topicName,
S
Shengliang Guan 已提交
651
               pVg->vgId);
L
Liu Jicong 已提交
652

L
Liu Jicong 已提交
653
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
654
        tscDebug("consumer: %" PRId64 ", vg:%d, current %" PRId64 ", committed %" PRId64 "", tmq->consumerId, pVg->vgId,
L
Liu Jicong 已提交
655
                 pVg->currentOffset.version, pVg->committedOffset.version);
L
Liu Jicong 已提交
656 657 658
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          continue;
        }
659 660 661 662
      }
    }
  }

L
Liu Jicong 已提交
663
  // no request is sent
L
Liu Jicong 已提交
664 665 666 667 668 669
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
    return 0;
  }

L
Liu Jicong 已提交
670 671
  // count down since waiting rsp num init as 1
  tmqCommitRspCountDown(pParamSet);
672

673 674 675 676
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
677
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
678
#if 0
679 680
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
L
Liu Jicong 已提交
681
#endif
L
Liu Jicong 已提交
682
  }
683

L
Liu Jicong 已提交
684 685 686 687 688 689 690 691 692 693
  return code;
}

int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                       void* userParam) {
  if (msg) {
    return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
  } else {
    return tmqCommitConsumerImpl(tmq, automatic, async, userCb, userParam);
  }
694 695
}

696
void tmqAssignAskEpTask(void* param, void* tmrId) {
697 698 699
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
S
Shengliang Guan 已提交
700
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
701 702 703 704 705
    *pTaskType = TMQ_DELAYED_TASK__ASK_EP;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
706 707 708
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
709 710 711
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
S
Shengliang Guan 已提交
712
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
713 714 715 716 717
    *pTaskType = TMQ_DELAYED_TASK__COMMIT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
718 719 720
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
721 722 723
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
S
Shengliang Guan 已提交
724
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
725 726 727 728 729
    *pTaskType = TMQ_DELAYED_TASK__REPORT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
730 731
}

732
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
dengyihao's avatar
dengyihao 已提交
733 734 735 736
  if (pMsg) {
    taosMemoryFree(pMsg->pData);
    taosMemoryFree(pMsg->pEpSet);
  }
737 738 739 740
  return 0;
}

void tmqSendHbReq(void* param, void* tmrId) {
741 742 743
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq == NULL) {
L
Liu Jicong 已提交
744
    taosMemoryFree(param);
745 746
    return;
  }
D
dapan1121 已提交
747 748 749 750 751

  SMqHbReq req = {0};
  req.consumerId = tmq->consumerId;
  req.epoch = tmq->epoch;

L
Liu Jicong 已提交
752
  int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
D
dapan1121 已提交
753 754 755 756
  if (tlen < 0) {
    tscError("tSerializeSMqHbReq failed");
    return;
  }
L
Liu Jicong 已提交
757
  void* pReq = taosMemoryCalloc(1, tlen);
D
dapan1121 已提交
758 759 760 761 762 763 764 765 766
  if (tlen < 0) {
    tscError("failed to malloc MqHbReq msg, size:%d", tlen);
    return;
  }
  if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) {
    tscError("tSerializeSMqHbReq %d failed", tlen);
    taosMemoryFree(pReq);
    return;
  }
767 768 769 770

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
771
    goto OVER;
772 773 774
  }
  sendInfo->msgInfo = (SDataBuf){
      .pData = pReq,
D
dapan1121 已提交
775
      .len = tlen,
776 777 778 779 780 781 782
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = NULL;
  sendInfo->fp = tmqHbCb;
L
Liu Jicong 已提交
783
  sendInfo->msgType = TDMT_MND_TMQ_HB;
784 785 786 787 788 789 790

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

OVER:
791
  taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer);
792 793
}

794
int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
L
Liu Jicong 已提交
795
  STaosQall* qall = taosAllocateQall();
796 797 798 799
  taosReadAllQitems(pTmq->delayedTask, qall);

  tscDebug("consumer:0x%"PRIx64" handle delayed %d tasks before poll data", pTmq->consumerId, qall->numOfItems);

L
Liu Jicong 已提交
800 801 802 803 804
  while (1) {
    int8_t* pTaskType = NULL;
    taosGetQitem(qall, (void**)&pTaskType);
    if (pTaskType == NULL) break;

805
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
806
      tmqAskEp(pTmq, true);
807 808

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
809
      *pRefId = pTmq->refId;
810

811
      taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer);
L
Liu Jicong 已提交
812
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
813
      tmqCommitInner(pTmq, NULL, 1, 1, pTmq->commitCb, pTmq->commitCbUserParam);
814 815

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
816
      *pRefId = pTmq->refId;
817

818
      taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer);
L
Liu Jicong 已提交
819 820
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    } else {
821
      ASSERT(0);
L
Liu Jicong 已提交
822
    }
L
Liu Jicong 已提交
823
    taosFreeQitem(pTaskType);
L
Liu Jicong 已提交
824
  }
825

L
Liu Jicong 已提交
826 827 828 829
  taosFreeQall(qall);
  return 0;
}

L
Liu Jicong 已提交
830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856
static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
    // do nothing
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
    SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
    tDeleteSMqAskEpRsp(&pEpRspWrapper->msg);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
    taosArrayDestroyP(pRsp->dataRsp.blockData, taosMemoryFree);
    taosArrayDestroy(pRsp->dataRsp.blockDataLen);
    taosArrayDestroyP(pRsp->dataRsp.blockTbName, taosMemoryFree);
    taosArrayDestroyP(pRsp->dataRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
    taosMemoryFree(pRsp->metaRsp.metaRsp);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
    taosArrayDestroyP(pRsp->taosxRsp.blockData, taosMemoryFree);
    taosArrayDestroy(pRsp->taosxRsp.blockDataLen);
    taosArrayDestroyP(pRsp->taosxRsp.blockTbName, taosMemoryFree);
    taosArrayDestroyP(pRsp->taosxRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
    // taosx
    taosArrayDestroy(pRsp->taosxRsp.createTableLen);
    taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree);
  }
}

L
Liu Jicong 已提交
857
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
858
  SMqRspWrapper* rspWrapper = NULL;
L
Liu Jicong 已提交
859
  while (1) {
L
Liu Jicong 已提交
860 861 862 863 864
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper) {
      tmqFreeRspWrapper(rspWrapper);
      taosFreeQitem(rspWrapper);
    } else {
L
Liu Jicong 已提交
865
      break;
L
Liu Jicong 已提交
866
    }
L
Liu Jicong 已提交
867 868
  }

L
Liu Jicong 已提交
869
  rspWrapper = NULL;
L
Liu Jicong 已提交
870 871
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
L
Liu Jicong 已提交
872 873 874 875 876
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper) {
      tmqFreeRspWrapper(rspWrapper);
      taosFreeQitem(rspWrapper);
    } else {
L
Liu Jicong 已提交
877
      break;
L
Liu Jicong 已提交
878
    }
L
Liu Jicong 已提交
879 880 881
  }
}

D
dapan1121 已提交
882
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
883 884
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
dengyihao's avatar
dengyihao 已提交
885 886

  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
887 888 889
  tsem_post(&pParam->rspSem);
  return 0;
}
890

L
Liu Jicong 已提交
891
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
X
Xiaoyu Wang 已提交
892 893 894 895
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
896
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
897
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
898
  }
L
Liu Jicong 已提交
899
  return 0;
X
Xiaoyu Wang 已提交
900 901
}

L
Liu Jicong 已提交
902
int32_t tmq_unsubscribe(tmq_t* tmq) {
L
Liu Jicong 已提交
903 904
  int32_t     rsp;
  int32_t     retryCnt = 0;
L
Liu Jicong 已提交
905
  tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
906 907 908 909 910 911 912 913 914 915
  while (1) {
    rsp = tmq_subscribe(tmq, lst);
    if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
      break;
    } else {
      retryCnt++;
      taosMsleep(500);
    }
  }

L
Liu Jicong 已提交
916 917
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
918 919
}

920 921
void tmqFreeImpl(void* handle) {
  tmq_t* tmq = (tmq_t*)handle;
L
Liu Jicong 已提交
922

923
  // TODO stop timer
L
Liu Jicong 已提交
924 925 926 927
  if (tmq->mqueue) {
    tmqClearUnhandleMsg(tmq);
    taosCloseQueue(tmq->mqueue);
  }
928
  if (tmq->delayedTask) taosCloseQueue(tmq->delayedTask);
L
Liu Jicong 已提交
929
  taosFreeQall(tmq->qall);
L
Liu Jicong 已提交
930

931
  tsem_destroy(&tmq->rspSem);
L
Liu Jicong 已提交
932

933 934 935
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
936
    taosMemoryFreeClear(pTopic->schema.pSchema);
937 938 939 940 941
    taosArrayDestroy(pTopic->vgs);
  }
  taosArrayDestroy(tmq->clientTopics);
  taos_close_internal(tmq->pTscObj);
  taosMemoryFree(tmq);
L
Liu Jicong 已提交
942 943
}

944 945 946 947 948 949 950 951 952
static void tmqMgmtInit(void) {
  tmqInitRes = 0;
  tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");

  if (tmqMgmt.timer == NULL) {
    tmqInitRes = TSDB_CODE_OUT_OF_MEMORY;
  }

  tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
953
  if (tmqMgmt.rsetId < 0) {
954 955 956 957
    tmqInitRes = terrno;
  }
}

L
Liu Jicong 已提交
958
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
959 960 961 962
  taosThreadOnce(&tmqInit, tmqMgmtInit);
  if (tmqInitRes != 0) {
    terrno = tmqInitRes;
    return NULL;
L
Liu Jicong 已提交
963 964
  }

L
Liu Jicong 已提交
965 966
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
L
Liu Jicong 已提交
967
    terrno = TSDB_CODE_OUT_OF_MEMORY;
968
    tscError("failed to create consumer, consumer group %s, code:%s", conf->groupId, terrstr());
L
Liu Jicong 已提交
969 970
    return NULL;
  }
L
Liu Jicong 已提交
971

L
Liu Jicong 已提交
972 973 974
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

975
  ASSERT(conf->groupId[0]);
L
Liu Jicong 已提交
976

L
Liu Jicong 已提交
977 978 979 980 981 982
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
  pTmq->delayedTask = taosOpenQueue();

  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
L
Liu Jicong 已提交
983
    terrno = TSDB_CODE_OUT_OF_MEMORY;
984
    tscError("consumer:0x%" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
S
Shengliang Guan 已提交
985
             pTmq->groupId);
L
Liu Jicong 已提交
986 987
    goto FAIL;
  }
L
Liu Jicong 已提交
988

L
Liu Jicong 已提交
989 990
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
991 992
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
993 994
  /*pTmq->epStatus = 0;*/
  /*pTmq->epSkipCnt = 0;*/
L
Liu Jicong 已提交
995

L
Liu Jicong 已提交
996 997 998
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
999
  pTmq->withTbName = conf->withTbName;
L
Liu Jicong 已提交
1000
  pTmq->useSnapshot = conf->snapEnable;
L
Liu Jicong 已提交
1001
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
1002
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
1003 1004
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
1005 1006
  pTmq->resetOffsetCfg = conf->resetOffset;

1007 1008
  pTmq->hbBgEnable = conf->hbBgEnable;

L
Liu Jicong 已提交
1009
  // assign consumerId
L
Liu Jicong 已提交
1010
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
1011

L
Liu Jicong 已提交
1012 1013
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
1014
    tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
S
Shengliang Guan 已提交
1015
             pTmq->groupId);
L
Liu Jicong 已提交
1016 1017
    goto FAIL;
  }
L
Liu Jicong 已提交
1018

L
Liu Jicong 已提交
1019 1020 1021
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
1022
    tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
S
Shengliang Guan 已提交
1023
             pTmq->groupId);
L
Liu Jicong 已提交
1024 1025 1026
    tsem_destroy(&pTmq->rspSem);
    goto FAIL;
  }
L
Liu Jicong 已提交
1027

1028 1029 1030 1031 1032 1033
  pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
  if (pTmq->refId < 0) {
    tmqFreeImpl(pTmq);
    return NULL;
  }

1034
  if (pTmq->hbBgEnable) {
L
Liu Jicong 已提交
1035 1036
    int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
    *pRefId = pTmq->refId;
1037
    pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
1038 1039
  }

1040
  tscInfo("consumer:0x%" PRIx64 " is setup, consumer groupId %s", pTmq->consumerId, pTmq->groupId);
1041
  return pTmq;
L
Liu Jicong 已提交
1042 1043 1044 1045 1046 1047 1048

FAIL:
  if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
  if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
  if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
  if (pTmq->qall) taosFreeQall(pTmq->qall);
  taosMemoryFree(pTmq);
1049

L
Liu Jicong 已提交
1050
  return NULL;
1051 1052
}

L
Liu Jicong 已提交
1053
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
L
Liu Jicong 已提交
1054 1055 1056
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
L
Liu Jicong 已提交
1057
  SMsgSendInfo*   sendInfo = NULL;
L
Liu Jicong 已提交
1058
  SCMSubscribeReq req = {0};
1059
  int32_t         code = 0;
1060

1061
  tscDebug("consumer:0x%"PRIx64", tmq subscribe start, numOfTopic %d", tmq->consumerId, sz);
L
Liu Jicong 已提交
1062

1063
  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
1064
  tstrncpy(req.clientId, tmq->clientId, 256);
L
Liu Jicong 已提交
1065
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
1066 1067
  req.topicNames = taosArrayInit(sz, sizeof(void*));

1068 1069 1070 1071
  if (req.topicNames == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
1072

L
Liu Jicong 已提交
1073 1074
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
1075 1076

    SName name = {0};
L
Liu Jicong 已提交
1077 1078 1079 1080
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
1081 1082
    }

1083 1084
    tNameExtractFullName(&name, topicFName);
    tscDebug("consumer:0x%"PRIx64", subscribe topic: %s", tmq->consumerId, topicFName);
L
Liu Jicong 已提交
1085 1086

    taosArrayPush(req.topicNames, &topicFName);
1087 1088
  }

L
Liu Jicong 已提交
1089
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
1090

L
Liu Jicong 已提交
1091
  buf = taosMemoryMalloc(tlen);
1092 1093 1094 1095
  if (buf == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
L
Liu Jicong 已提交
1096

1097 1098 1099
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

L
Liu Jicong 已提交
1100
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
1101 1102 1103 1104
  if (sendInfo == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
1105

X
Xiaoyu Wang 已提交
1106
  SMqSubscribeCbParam param = {
L
Liu Jicong 已提交
1107
      .rspErr = 0,
1108 1109
      .refId = tmq->refId,
      .epoch = tmq->epoch,
X
Xiaoyu Wang 已提交
1110
  };
L
Liu Jicong 已提交
1111

1112 1113 1114
  if (tsem_init(&param.rspSem, 0, 0) != 0) {
    goto FAIL;
  }
L
Liu Jicong 已提交
1115 1116

  sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1117 1118 1119 1120
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
1121

L
Liu Jicong 已提交
1122 1123
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1124 1125
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
1126
  sendInfo->msgType = TDMT_MND_TMQ_SUBSCRIBE;
L
Liu Jicong 已提交
1127

1128 1129 1130 1131 1132
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
1133 1134
  // avoid double free if msg is sent
  buf = NULL;
L
Liu Jicong 已提交
1135
  sendInfo = NULL;
L
Liu Jicong 已提交
1136

L
Liu Jicong 已提交
1137 1138
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
1139

1140 1141 1142 1143
  if (param.rspErr != 0) {
    code = param.rspErr;
    goto FAIL;
  }
L
Liu Jicong 已提交
1144

L
Liu Jicong 已提交
1145
  int32_t retryCnt = 0;
L
Liu Jicong 已提交
1146
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
L
Liu Jicong 已提交
1147 1148 1149
    if (retryCnt++ > 10) {
      goto FAIL;
    }
1150 1151

    tscDebug("consumer:0x%"PRIx64", mnd not ready for subscribe, retry count:%d in 500ms", tmq->consumerId, retryCnt);
L
Liu Jicong 已提交
1152 1153
    taosMsleep(500);
  }
1154

1155 1156
  // init ep timer
  if (tmq->epTimer == NULL) {
1157 1158 1159
    int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId1 = tmq->refId;
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, pRefId1, tmqMgmt.timer);
1160
  }
L
Liu Jicong 已提交
1161 1162

  // init auto commit timer
1163
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
1164 1165 1166
    int64_t* pRefId2 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId2 = tmq->refId;
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId2, tmqMgmt.timer);
L
Liu Jicong 已提交
1167 1168
  }

L
Liu Jicong 已提交
1169
FAIL:
L
Liu Jicong 已提交
1170
  taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
1171
  taosMemoryFree(buf);
L
Liu Jicong 已提交
1172
  taosMemoryFree(sendInfo);
L
Liu Jicong 已提交
1173

L
Liu Jicong 已提交
1174
  return code;
1175 1176
}

L
Liu Jicong 已提交
1177
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
1178
  //
1179
  conf->commitCb = cb;
L
Liu Jicong 已提交
1180
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
1181
}
1182

D
dapan1121 已提交
1183
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1184 1185
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
1186
  SMqClientTopic* pTopic = pParam->pTopic;
1187 1188 1189 1190 1191 1192

  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
  if (tmq == NULL) {
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1193
    taosMemoryFree(pMsg->pEpSet);
1194 1195 1196 1197 1198 1199
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  int32_t epoch = pParam->epoch;
  int32_t vgId = pParam->vgId;
L
Liu Jicong 已提交
1200
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
1201
  if (code != 0) {
L
Liu Jicong 已提交
1202
    tscWarn("msg discard from vgId:%d, epoch %d, since %s", vgId, epoch, terrstr());
L
Liu Jicong 已提交
1203
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1204 1205
    if (pMsg->pEpSet) taosMemoryFree(pMsg->pEpSet);

L
Liu Jicong 已提交
1206 1207 1208 1209
    if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
      goto CREATE_MSG_FAIL;
    }
L
Liu Jicong 已提交
1210
    if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
S
Shengliang Guan 已提交
1211
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1212
      if (pRspWrapper == NULL) {
S
Shengliang Guan 已提交
1213
        tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
Liu Jicong 已提交
1214 1215 1216 1217 1218 1219 1220 1221
        goto CREATE_MSG_FAIL;
      }
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      /*pRspWrapper->vgHandle = pVg;*/
      /*pRspWrapper->topicHandle = pTopic;*/
      taosWriteQitem(tmq->mqueue, pRspWrapper);
      tsem_post(&tmq->rspSem);
    }
L
fix txn  
Liu Jicong 已提交
1222
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1223 1224
  }

X
Xiaoyu Wang 已提交
1225 1226 1227
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
1228
    // do not write into queue since updating epoch reset
S
Shengliang Guan 已提交
1229
    tscWarn("msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d", vgId, msgEpoch,
L
Liu Jicong 已提交
1230
            tmqEpoch);
1231
    tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1232
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1233
    taosMemoryFree(pMsg->pEpSet);
X
Xiaoyu Wang 已提交
1234 1235 1236 1237
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
S
Shengliang Guan 已提交
1238
    tscWarn("mismatch rsp from vgId:%d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
X
Xiaoyu Wang 已提交
1239 1240
  }

L
Liu Jicong 已提交
1241 1242 1243
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

S
Shengliang Guan 已提交
1244
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1245
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1246
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1247
    taosMemoryFree(pMsg->pEpSet);
S
Shengliang Guan 已提交
1248
    tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
fix txn  
Liu Jicong 已提交
1249
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1250
  }
L
Liu Jicong 已提交
1251

L
Liu Jicong 已提交
1252
  pRspWrapper->tmqRspType = rspType;
L
Liu Jicong 已提交
1253 1254
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
1255

L
Liu Jicong 已提交
1256
  if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1257 1258 1259
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
wmmhello's avatar
wmmhello 已提交
1260
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1261
    memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1262

1263
    tscDebug("consumer:0x%" PRIx64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d",
L
Liu Jicong 已提交
1264 1265 1266 1267
             tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version,
             rspType);

  } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
1268 1269 1270 1271
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqMetaRsp(&decoder, &pRspWrapper->metaRsp);
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1272
    memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1273 1274 1275 1276 1277 1278 1279
  } else if (rspType == TMQ_MSG_TYPE__TAOSX_RSP) {
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp);
    tDecoderClear(&decoder);
    memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead));
  } else {
1280
    ASSERT(0);
L
Liu Jicong 已提交
1281
  }
L
Liu Jicong 已提交
1282

L
Liu Jicong 已提交
1283
  taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1284
  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
1285

1286
  tscDebug("consumer:0x%" PRIx64 ", put poll res into mqueue %p", tmq->consumerId, pRspWrapper);
L
Liu Jicong 已提交
1287

L
Liu Jicong 已提交
1288
  taosWriteQitem(tmq->mqueue, pRspWrapper);
1289
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1290

L
Liu Jicong 已提交
1291
  return 0;
L
fix txn  
Liu Jicong 已提交
1292
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1293
  if (epoch == tmq->epoch) {
L
Liu Jicong 已提交
1294 1295
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
1296
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1297
  return -1;
1298 1299
}

L
Liu Jicong 已提交
1300
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
1301 1302
  bool set = false;

1303
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
1304
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
1305

1306
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
1307 1308
  tscDebug("consumer:0x%" PRIx64", update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
           tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur);
1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }
1320

1321 1322 1323 1324 1325
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
1326
      tscDebug("consumer:0x%" PRIx64 ", new vg num: %d", tmq->consumerId, vgNumCur);
1327 1328 1329
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
        sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
L
Liu Jicong 已提交
1330
        char buf[80];
L
Liu Jicong 已提交
1331
        tFormatOffset(buf, 80, &pVgCur->currentOffset);
1332
        tscDebug("consumer:0x%" PRIx64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch,
L
Liu Jicong 已提交
1333
                 pVgCur->vgId, vgKey, buf);
L
Liu Jicong 已提交
1334
        taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal));
1335 1336 1337 1338 1339 1340 1341 1342
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
    topic.schema = pTopicEp->schema;
L
Liu Jicong 已提交
1343 1344
    pTopicEp->schema.nCols = 0;
    pTopicEp->schema.pSchema = NULL;
1345
    tstrncpy(topic.topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
1346 1347
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);

1348
    tscDebug("consumer:0x%" PRIx64 ", update topic: %s", tmq->consumerId, topic.topicName);
1349 1350 1351 1352 1353 1354

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
L
Liu Jicong 已提交
1355 1356
      STqOffsetVal* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      STqOffsetVal  offsetNew = {.type = tmq->resetOffsetCfg};
1357
      if (pOffset != NULL) {
L
Liu Jicong 已提交
1358
        offsetNew = *pOffset;
1359 1360 1361 1362
      }

      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1363
          .currentOffset = offsetNew,
1364 1365 1366 1367 1368 1369 1370 1371 1372 1373
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
          .vgSkipCnt = 0,
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
    taosArrayPush(newTopics, &topic);
  }
1374 1375

  // destroy current buffered existed topics info
1376 1377 1378 1379
  if (tmq->clientTopics) {
    int32_t sz = taosArrayGetSize(tmq->clientTopics);
    for (int32_t i = 0; i < sz; i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
1380
      if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
1381
      taosArrayDestroy(pTopic->vgs);
L
Liu Jicong 已提交
1382
    }
1383

1384
    taosArrayDestroy(tmq->clientTopics);
X
Xiaoyu Wang 已提交
1385
  }
1386

L
Liu Jicong 已提交
1387
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
1388
  tmq->clientTopics = newTopics;
1389

1390
  if (taosArrayGetSize(tmq->clientTopics) == 0) {
1391
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
1392
  } else {
1393
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
1394
  }
1395

X
Xiaoyu Wang 已提交
1396
  atomic_store_32(&tmq->epoch, epoch);
1397
  tscDebug("consumer:0x%" PRIx64 ", update topic info completed", tmq->consumerId);
X
Xiaoyu Wang 已提交
1398 1399 1400
  return set;
}

D
dapan1121 已提交
1401
int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
1402
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1403
  int8_t           async = pParam->async;
1404 1405 1406 1407 1408 1409 1410 1411 1412
  tmq_t*           tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);

  if (tmq == NULL) {
    if (!async) {
      tsem_destroy(&pParam->rspSem);
    } else {
      taosMemoryFree(pParam);
    }
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1413
    taosMemoryFree(pMsg->pEpSet);
1414 1415 1416 1417
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

L
Liu Jicong 已提交
1418
  pParam->code = code;
1419
  if (code != 0) {
1420 1421
    tscError("consumer:0x%" PRIx64 ", get topic endpoint error, async:%d, code:%s", tmq->consumerId,
             pParam->async, tstrerror(code));
L
Liu Jicong 已提交
1422
    goto END;
1423
  }
L
Liu Jicong 已提交
1424

L
Liu Jicong 已提交
1425
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1426
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1427
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1428 1429
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
1430
  tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1431 1432
  if (head->epoch <= epoch) {
    goto END;
1433
  }
L
Liu Jicong 已提交
1434

L
Liu Jicong 已提交
1435
  if (!async) {
L
Liu Jicong 已提交
1436 1437
    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
S
Shengliang Guan 已提交
1438 1439
    /*printf("rsp epoch %" PRId64 " sz %" PRId64 "\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %" PRId64 " sz %" PRId64 "\n", tmq->epoch, tmq->clientTopics->size);*/
L
Liu Jicong 已提交
1440
    tmqUpdateEp(tmq, head->epoch, &rsp);
L
Liu Jicong 已提交
1441
    tDeleteSMqAskEpRsp(&rsp);
X
Xiaoyu Wang 已提交
1442
  } else {
S
Shengliang Guan 已提交
1443
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1444
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1445
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1446 1447
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1448
    }
1449

L
Liu Jicong 已提交
1450 1451 1452
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1453
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1454

L
Liu Jicong 已提交
1455
    taosWriteQitem(tmq->mqueue, pWrapper);
1456
    tsem_post(&tmq->rspSem);
1457
  }
L
Liu Jicong 已提交
1458 1459

END:
L
Liu Jicong 已提交
1460
  /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1461
  if (!async) {
L
Liu Jicong 已提交
1462
    tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
1463 1464
  } else {
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
1465
  }
dengyihao's avatar
dengyihao 已提交
1466 1467

  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
1468
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1469
  return code;
1470 1471
}

L
Liu Jicong 已提交
1472
int32_t tmqAskEp(tmq_t* tmq, bool async) {
1473
  int32_t code = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1474
#if 0
L
Liu Jicong 已提交
1475
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
L
fix txn  
Liu Jicong 已提交
1476
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
1477
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
1478
    tscTrace("consumer:0x%" PRIx64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
1479
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
1480
  }
L
temp  
Liu Jicong 已提交
1481
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
1482
#endif
1483

D
dapan1121 已提交
1484 1485 1486 1487 1488
  SMqAskEpReq req = {0};
  req.consumerId = tmq->consumerId;
  req.epoch = tmq->epoch;
  strcpy(req.cgroup, tmq->groupId);

L
Liu Jicong 已提交
1489
  int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
D
dapan1121 已提交
1490
  if (tlen < 0) {
1491
    tscError("consumer:0x%"PRIx64", tSerializeSMqAskEpReq failed", tmq->consumerId);
D
dapan1121 已提交
1492 1493
    return -1;
  }
1494

L
Liu Jicong 已提交
1495
  void* pReq = taosMemoryCalloc(1, tlen);
L
Liu Jicong 已提交
1496
  if (pReq == NULL) {
1497 1498
    tscError("consumer:0x%"PRIx64", failed to malloc askEpReq msg, size:%d", tmq->consumerId, tlen);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
1499 1500
    return -1;
  }
1501

D
dapan1121 已提交
1502
  if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
1503
    tscError("consumer:0x%"PRIx64", tSerializeSMqAskEpReq %d failed", tmq->consumerId, tlen);
D
dapan1121 已提交
1504
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
1505
    return -1;
L
Liu Jicong 已提交
1506
  }
1507

L
Liu Jicong 已提交
1508
  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
1509
  if (pParam == NULL) {
1510
    tscError("consumer:0x%"PRIx64", failed to malloc subscribe param", tmq->consumerId);
D
dapan1121 已提交
1511
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
1512
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1513
    return -1;
L
Liu Jicong 已提交
1514
  }
1515

1516 1517
  pParam->refId = tmq->refId;
  pParam->epoch = tmq->epoch;
L
Liu Jicong 已提交
1518
  pParam->async = async;
X
Xiaoyu Wang 已提交
1519
  tsem_init(&pParam->rspSem, 0, 0);
1520

1521
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1522 1523
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1524
    taosMemoryFree(pParam);
D
dapan1121 已提交
1525
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
1526
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1527 1528 1529 1530
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
D
dapan1121 已提交
1531
      .pData = pReq,
L
Liu Jicong 已提交
1532 1533 1534 1535
      .len = tlen,
      .handle = NULL,
  };

1536
  sendInfo->requestId = tmq->consumerId;
L
Liu Jicong 已提交
1537 1538 1539
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1540
  sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
1541

L
Liu Jicong 已提交
1542
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
1543
  tscDebug("consumer:0x%" PRIx64 ", ask ep from mnode", tmq->consumerId);
L
add log  
Liu Jicong 已提交
1544

L
Liu Jicong 已提交
1545 1546
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1547

L
Liu Jicong 已提交
1548
  if (!async) {
L
Liu Jicong 已提交
1549 1550 1551 1552
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }
1553

L
Liu Jicong 已提交
1554
  return code;
1555 1556
}

L
Liu Jicong 已提交
1557
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1558 1559 1560
  /*strcpy(pReq->topic, pTopic->topicName);*/
  /*strcpy(pReq->cgroup, tmq->groupId);*/

L
Liu Jicong 已提交
1561 1562 1563 1564
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1565

1566
  pReq->withTbName = tmq->withTbName;
1567
  pReq->timeout = timeout;
L
Liu Jicong 已提交
1568
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1569
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1570
  /*pReq->currentOffset = reqOffset;*/
L
Liu Jicong 已提交
1571
  pReq->reqOffset = pVg->currentOffset;
L
Liu Jicong 已提交
1572
  pReq->reqId = generateRequestId();
1573

L
Liu Jicong 已提交
1574 1575
  pReq->useSnapshot = tmq->useSnapshot;

D
dapan1121 已提交
1576
  pReq->head.vgId = pVg->vgId;
1577 1578
}

L
Liu Jicong 已提交
1579 1580
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
L
Liu Jicong 已提交
1581
  pRspObj->resType = RES_TYPE__TMQ_META;
L
Liu Jicong 已提交
1582 1583 1584 1585 1586 1587 1588 1589
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

L
Liu Jicong 已提交
1590 1591 1592
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1593 1594
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1595
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1596
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1597
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
L
Liu Jicong 已提交
1598

L
Liu Jicong 已提交
1599 1600
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
L
Liu Jicong 已提交
1601
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1602 1603
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1604

L
Liu Jicong 已提交
1605
  return pRspObj;
X
Xiaoyu Wang 已提交
1606 1607
}

L
Liu Jicong 已提交
1608 1609
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
1610
  pRspObj->resType = RES_TYPE__TMQ_METADATA;
L
Liu Jicong 已提交
1611 1612 1613 1614
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;
  pRspObj->resIter = -1;
1615
  memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp));
L
Liu Jicong 已提交
1616 1617 1618

  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
1619
  if (!pWrapper->taosxRsp.withSchema) {
L
Liu Jicong 已提交
1620 1621 1622 1623 1624 1625
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }

  return pRspObj;
}

1626
// broadcast the poll request to all related vnodes
1627
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
X
Xiaoyu Wang 已提交
1628 1629 1630 1631 1632 1633
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
      if (vgStatus != TMQ_VG_STATUS__IDLE) {
L
Liu Jicong 已提交
1634
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
1635
        tscTrace("consumer:0x%" PRIx64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId,
L
Liu Jicong 已提交
1636
                 vgSkipCnt);
X
Xiaoyu Wang 已提交
1637
        continue;
L
Liu Jicong 已提交
1638
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1639 1640 1641 1642
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
1643
        tscDebug("consumer:0x%" PRIx64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId);
L
temp  
Liu Jicong 已提交
1644 1645
        }
#endif
X
Xiaoyu Wang 已提交
1646
      }
L
Liu Jicong 已提交
1647
      atomic_store_32(&pVg->vgSkipCnt, 0);
D
dapan1121 已提交
1648 1649 1650 1651 1652 1653 1654 1655 1656

      SMqPollReq req = {0};
      tmqBuildConsumeReqImpl(&req, tmq, timeout, pTopic, pVg);
      int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
      if (msgSize < 0) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        tsem_post(&tmq->rspSem);
        return -1;
      }
L
Liu Jicong 已提交
1657
      char* msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
1658 1659 1660 1661 1662
      if (NULL == msg) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        tsem_post(&tmq->rspSem);
        return -1;
      }
L
Liu Jicong 已提交
1663

D
dapan1121 已提交
1664 1665
      if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
        taosMemoryFree(msg);
X
Xiaoyu Wang 已提交
1666
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1667
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1668 1669
        return -1;
      }
L
Liu Jicong 已提交
1670

wafwerar's avatar
wafwerar 已提交
1671
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1672
      if (pParam == NULL) {
D
dapan1121 已提交
1673
        taosMemoryFree(msg);
X
Xiaoyu Wang 已提交
1674
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1675
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1676 1677
        return -1;
      }
1678 1679 1680
      pParam->refId = tmq->refId;
      pParam->epoch = tmq->epoch;

L
Liu Jicong 已提交
1681
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1682
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1683
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1684

1685
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1686
      if (sendInfo == NULL) {
D
dapan1121 已提交
1687
        taosMemoryFree(msg);
wafwerar's avatar
wafwerar 已提交
1688
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1689
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1690
        tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1691 1692 1693 1694
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
D
dapan1121 已提交
1695 1696
          .pData = msg,
          .len = msgSize,
X
Xiaoyu Wang 已提交
1697 1698
          .handle = NULL,
      };
1699

D
dapan1121 已提交
1700
      sendInfo->requestId = req.reqId;
X
Xiaoyu Wang 已提交
1701
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1702
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1703
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1704
      sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
X
Xiaoyu Wang 已提交
1705 1706

      int64_t transporterId = 0;
L
Liu Jicong 已提交
1707

1708
      char offsetFormatBuf[80];
L
Liu Jicong 已提交
1709
      tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffset);
1710 1711

      tscDebug("consumer:0x%" PRIx64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:0x%" PRIx64,
D
dapan1121 已提交
1712
               tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
X
Xiaoyu Wang 已提交
1713
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
1714

X
Xiaoyu Wang 已提交
1715 1716 1717 1718
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
1719

X
Xiaoyu Wang 已提交
1720 1721 1722
  return 0;
}

L
Liu Jicong 已提交
1723 1724
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1725
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1726 1727
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1728
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
L
Liu Jicong 已提交
1729
      tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1730
      /*tmqClearUnhandleMsg(tmq);*/
L
Liu Jicong 已提交
1731
      tDeleteSMqAskEpRsp(rspMsg);
X
Xiaoyu Wang 已提交
1732 1733
      *pReset = true;
    } else {
L
Liu Jicong 已提交
1734
      tmqFreeRspWrapper(rspWrapper);
X
Xiaoyu Wang 已提交
1735 1736 1737 1738 1739 1740 1741 1742
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
1743
void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
X
Xiaoyu Wang 已提交
1744
  while (1) {
L
Liu Jicong 已提交
1745 1746 1747
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1748
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1749
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
L
Liu Jicong 已提交
1750 1751

      if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1752
        /*tscDebug("consumer %" PRId64 " mqueue empty", tmq->consumerId);*/
L
Liu Jicong 已提交
1753 1754
        return NULL;
      }
X
Xiaoyu Wang 已提交
1755 1756
    }

1757
    tscDebug("consumer:0x%" PRIx64 " handle rsp %p", tmq->consumerId, rspWrapper);
L
Liu Jicong 已提交
1758

L
Liu Jicong 已提交
1759 1760 1761 1762 1763
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(rspWrapper);
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
      return NULL;
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1764
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
1765
      tscDebug("consumer:0x%" PRIx64 " actual process poll rsp", tmq->consumerId);
L
Liu Jicong 已提交
1766
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
1767
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1768
      if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1769
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1770
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1771
         * rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1772
        pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;
X
Xiaoyu Wang 已提交
1773
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1774
        if (pollRspWrapper->dataRsp.blockNum == 0) {
L
Liu Jicong 已提交
1775 1776
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1777 1778
          continue;
        }
L
Liu Jicong 已提交
1779
        // build rsp
L
Liu Jicong 已提交
1780
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1781
        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1782
        return pRsp;
X
Xiaoyu Wang 已提交
1783
      } else {
1784 1785
        tscDebug("consumer:0x%"PRIx64", msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
                 tmq->consumerId, pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1786
        tmqFreeRspWrapper(rspWrapper);
L
Liu Jicong 已提交
1787 1788 1789 1790 1791
        taosFreeQitem(pollRspWrapper);
      }
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1792
      if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1793
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1794
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1795
         * rspMsg->msg.rspOffset);*/
wmmhello's avatar
wmmhello 已提交
1796
        pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset;
L
Liu Jicong 已提交
1797 1798
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // build rsp
L
Liu Jicong 已提交
1799
        SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1800 1801 1802
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
1803 1804
        tscDebug("consumer:0x%"PRIx64", msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
                 tmq->consumerId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1805
        tmqFreeRspWrapper(rspWrapper);
L
Liu Jicong 已提交
1806
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1807
      }
L
Liu Jicong 已提交
1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
      if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
         * rspMsg->msg.rspOffset);*/
        pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset;
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        if (pollRspWrapper->taosxRsp.blockNum == 0) {
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
          continue;
        }
wmmhello's avatar
wmmhello 已提交
1823

L
Liu Jicong 已提交
1824
        // build rsp
wmmhello's avatar
wmmhello 已提交
1825
        void* pRsp = NULL;
L
Liu Jicong 已提交
1826
        if (pollRspWrapper->taosxRsp.createTableNum == 0) {
wmmhello's avatar
wmmhello 已提交
1827
          pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1828
        } else {
wmmhello's avatar
wmmhello 已提交
1829 1830
          pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper);
        }
L
Liu Jicong 已提交
1831 1832 1833
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
1834 1835
        tscDebug("consumer:0x%"PRIx64", msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
                 tmq->consumerId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1836
        tmqFreeRspWrapper(rspWrapper);
L
Liu Jicong 已提交
1837 1838
        taosFreeQitem(pollRspWrapper);
      }
X
Xiaoyu Wang 已提交
1839
    } else {
L
fix  
Liu Jicong 已提交
1840
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1841
      bool reset = false;
L
Liu Jicong 已提交
1842 1843
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1844
      if (pollIfReset && reset) {
1845
        tscDebug("consumer:0x%" PRIx64 ", reset and repoll", tmq->consumerId);
1846
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
1847 1848 1849 1850 1851
      }
    }
  }
}

1852
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1853 1854
  void*   rspObj;
  int64_t startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1855

1856
  tscDebug("consumer:0x%" PRIx64 ", start poll at %" PRId64, tmq->consumerId, startTime);
L
Liu Jicong 已提交
1857

1858 1859 1860
#if 0
  tmqHandleAllDelayedTask(tmq);
  tmqPollImpl(tmq, timeout);
1861
  rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1862 1863
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1864
  }
1865
#endif
X
Xiaoyu Wang 已提交
1866

1867
  // in no topic status, delayed task also need to be processed
L
Liu Jicong 已提交
1868
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1869
    tscDebug("consumer:0x%" PRIx64 ", poll return since consumer status is init", tmq->consumerId);
1870 1871 1872
    return NULL;
  }

L
Liu Jicong 已提交
1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
    int32_t retryCnt = 0;
    while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
      if (retryCnt++ > 10) {
        return NULL;
      }
      tscDebug("consumer not ready, retry");
      taosMsleep(500);
    }
  }

X
Xiaoyu Wang 已提交
1884
  while (1) {
L
Liu Jicong 已提交
1885
    tmqHandleAllDelayedTask(tmq);
1886

L
Liu Jicong 已提交
1887
    if (tmqPollImpl(tmq, timeout) < 0) {
1888
      tscDebug("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId);
L
Liu Jicong 已提交
1889 1890
      /*return NULL;*/
    }
L
Liu Jicong 已提交
1891

1892
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1893
    if (rspObj) {
1894
      tscDebug("consumer:0x%" PRIx64 ", return rsp %p", tmq->consumerId, rspObj);
L
Liu Jicong 已提交
1895
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
1896
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
1897
      tscDebug("consumer:0x%" PRIx64 ", return null since no committed offset", tmq->consumerId);
L
Liu Jicong 已提交
1898
      return NULL;
X
Xiaoyu Wang 已提交
1899
    }
1900

1901
    if (timeout != -1) {
L
Liu Jicong 已提交
1902 1903 1904
      int64_t currentTime = taosGetTimestampMs();
      int64_t passedTime = currentTime - startTime;
      if (passedTime > timeout) {
1905
        tscDebug("consumer:0x%" PRIx64 ", (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
L
Liu Jicong 已提交
1906
                 tmq->consumerId, tmq->epoch, startTime, currentTime);
X
Xiaoyu Wang 已提交
1907 1908
        return NULL;
      }
1909
      /*tscInfo("consumer:0x%" PRIx64 ", (epoch %d) wait, start time %" PRId64 ", current time %" PRId64*/
L
Liu Jicong 已提交
1910 1911 1912
      /*", left time %" PRId64,*/
      /*tmq->consumerId, tmq->epoch, startTime, currentTime, (timeout - passedTime));*/
      tsem_timewait(&tmq->rspSem, (timeout - passedTime));
L
Liu Jicong 已提交
1913 1914
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
L
Liu Jicong 已提交
1915
      tsem_timewait(&tmq->rspSem, 1000);
X
Xiaoyu Wang 已提交
1916 1917 1918 1919
    }
  }
}

L
Liu Jicong 已提交
1920
int32_t tmq_consumer_close(tmq_t* tmq) {
1921
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
L
Liu Jicong 已提交
1922 1923
    int32_t rsp = tmq_commit_sync(tmq, NULL);
    if (rsp != 0) {
L
Liu Jicong 已提交
1924
      return rsp;
1925 1926
    }

L
Liu Jicong 已提交
1927
    int32_t     retryCnt = 0;
1928
    tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
1929 1930 1931 1932 1933 1934 1935 1936 1937 1938
    while (1) {
      rsp = tmq_subscribe(tmq, lst);
      if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
        break;
      } else {
        retryCnt++;
        taosMsleep(500);
      }
    }

1939
    tmq_list_destroy(lst);
L
Liu Jicong 已提交
1940
  }
1941
  taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
L
Liu Jicong 已提交
1942
  return 0;
1943
}
L
Liu Jicong 已提交
1944

L
Liu Jicong 已提交
1945 1946
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
1947
    return "success";
L
Liu Jicong 已提交
1948
  } else if (err == -1) {
L
Liu Jicong 已提交
1949 1950 1951
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
1952 1953
  }
}
L
Liu Jicong 已提交
1954

L
Liu Jicong 已提交
1955 1956 1957 1958 1959
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
    return TMQ_RES_TABLE_META;
1960 1961
  } else if (TD_RES_TMQ_METADATA(res)) {
    return TMQ_RES_METADATA;
L
Liu Jicong 已提交
1962 1963 1964 1965 1966
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
1967
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
1968 1969
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
1970
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1971 1972 1973
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
1974 1975 1976
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1977 1978 1979 1980 1981
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1982 1983 1984 1985
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1986 1987 1988
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
1989 1990 1991
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1992 1993 1994 1995 1996
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1997 1998 1999 2000
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
2001 2002 2003
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
2004
  } else if (TD_RES_TMQ_METADATA(res)) {
2005 2006
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
2007 2008 2009 2010
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
2011 2012 2013 2014 2015 2016 2017 2018

const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
2019
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
2020 2021
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
L
Liu Jicong 已提交
2022 2023 2024
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
2025
    }
L
Liu Jicong 已提交
2026 2027
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
  }
L
Liu Jicong 已提交
2028 2029
  return NULL;
}
2030

L
Liu Jicong 已提交
2031
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
2032
  //
L
Liu Jicong 已提交
2033
  tmqCommitInner(tmq, msg, 0, 1, cb, param);
L
Liu Jicong 已提交
2034 2035
}

2036 2037
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) {
  //
L
Liu Jicong 已提交
2038
  return tmqCommitInner(tmq, msg, 0, 0, NULL, NULL);
2039
}