clientTmq.c 61.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "cJSON.h"
17 18 19
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22 23
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
24
#include "tqueue.h"
25
#include "tref.h"
L
Liu Jicong 已提交
26 27
#include "ttimer.h"

L
Liu Jicong 已提交
28 29 30 31 32 33 34
#if 0
#undef tsem_post
#define tsem_post(x)                                         \
  tscInfo("call sem post at %s %d", __FUNCTION__, __LINE__); \
  sem_post(x)
#endif

35
struct SMqMgmt{
36 37 38
  int8_t  inited;
  tmr_h   timer;
  int32_t rsetId;
39
};
L
Liu Jicong 已提交
40

41 42 43
static TdThreadOnce tmqInit = PTHREAD_ONCE_INIT;   // initialize only once
volatile int32_t    tmqInitRes = 0;                // initialize rsp code
static struct SMqMgmt tmqMgmt = {0};
44

L
Liu Jicong 已提交
45 46 47 48 49 50
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
51 52 53
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
54 55
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
56
struct tmq_list_t {
L
Liu Jicong 已提交
57
  SArray container;
L
Liu Jicong 已提交
58
};
L
Liu Jicong 已提交
59

L
Liu Jicong 已提交
60
struct tmq_conf_t {
61 62 63 64 65
  char    clientId[256];
  char    groupId[TSDB_CGROUP_LEN];
  int8_t  autoCommit;
  int8_t  resetOffset;
  int8_t  withTbName;
L
Liu Jicong 已提交
66 67
  int8_t  snapEnable;
  int32_t snapBatchSize;
68
  bool    hbBgEnable;
69

70 71 72 73 74
  uint16_t       port;
  int32_t        autoCommitInterval;
  char*          ip;
  char*          user;
  char*          pass;
75
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
76
  void*          commitCbUserParam;
L
Liu Jicong 已提交
77 78 79
};

struct tmq_t {
80
  int64_t  refId;
L
Liu Jicong 已提交
81
  // conf
82 83 84 85 86 87 88 89 90
  char     groupId[TSDB_CGROUP_LEN];
  char     clientId[256];
  int8_t   withTbName;
  int8_t   useSnapshot;
  int8_t   autoCommit;
  int32_t  autoCommitInterval;
  int32_t  resetOffsetCfg;
  uint64_t consumerId;
  bool     hbBgEnable;
91

L
Liu Jicong 已提交
92 93
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
94 95 96 97

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
98 99
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
100
  int32_t epSkipCnt;
L
Liu Jicong 已提交
101
#endif
L
Liu Jicong 已提交
102 103
  int64_t pollCnt;

L
Liu Jicong 已提交
104
  // timer
105 106
  tmr_h hbLiveTimer;
  tmr_h epTimer;
L
Liu Jicong 已提交
107 108 109
  tmr_h reportTimer;
  tmr_h commitTimer;

L
Liu Jicong 已提交
110 111 112 113
  // connection
  STscObj* pTscObj;

  // container
L
Liu Jicong 已提交
114
  SArray*     clientTopics;  // SArray<SMqClientTopic>
L
Liu Jicong 已提交
115
  STaosQueue* mqueue;        // queue of rsp
L
Liu Jicong 已提交
116
  STaosQall*  qall;
L
Liu Jicong 已提交
117 118 119 120
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit

  // ctl
  tsem_t rspSem;
L
Liu Jicong 已提交
121 122
};

X
Xiaoyu Wang 已提交
123 124 125 126 127 128 129 130
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
131
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
132
  TMQ_CONSUMER_STATUS__RECOVER,
L
Liu Jicong 已提交
133 134
};

L
Liu Jicong 已提交
135
enum {
136
  TMQ_DELAYED_TASK__ASK_EP = 1,
L
Liu Jicong 已提交
137 138 139 140
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

L
Liu Jicong 已提交
141
typedef struct {
142 143 144
  // statistics
  int64_t pollCnt;
  // offset
L
Liu Jicong 已提交
145 146
  STqOffsetVal committedOffset;
  STqOffsetVal currentOffset;
L
Liu Jicong 已提交
147
  // connection info
148
  int32_t vgId;
X
Xiaoyu Wang 已提交
149
  int32_t vgStatus;
L
Liu Jicong 已提交
150
  int32_t vgSkipCnt;
151 152 153
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
154
typedef struct {
155
  // subscribe info
156 157 158
  char           topicName[TSDB_TOPIC_FNAME_LEN];
  char           db[TSDB_DB_FNAME_LEN];
  SArray*        vgs;  // SArray<SMqClientVg>
L
Liu Jicong 已提交
159
  SSchemaWrapper schema;
160 161
} SMqClientTopic;

L
Liu Jicong 已提交
162 163 164 165 166
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
167
  union {
L
Liu Jicong 已提交
168 169
    SMqDataRsp dataRsp;
    SMqMetaRsp metaRsp;
L
Liu Jicong 已提交
170
    STaosxRsp  taosxRsp;
L
Liu Jicong 已提交
171
  };
L
Liu Jicong 已提交
172 173
} SMqPollRspWrapper;

L
Liu Jicong 已提交
174
typedef struct {
175 176
  int64_t refId;
  int32_t epoch;
L
Liu Jicong 已提交
177 178
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
179
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
180

L
Liu Jicong 已提交
181
typedef struct {
182 183
  int64_t refId;
  int32_t epoch;
L
Liu Jicong 已提交
184
  int32_t code;
L
Liu Jicong 已提交
185
  int32_t async;
X
Xiaoyu Wang 已提交
186
  tsem_t  rspSem;
187 188
} SMqAskEpCbParam;

L
Liu Jicong 已提交
189
typedef struct {
190 191
  int64_t         refId;
  int32_t         epoch;
L
Liu Jicong 已提交
192
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
193
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
194
  int32_t         vgId;
L
Liu Jicong 已提交
195
  tsem_t          rspSem;
X
Xiaoyu Wang 已提交
196
} SMqPollCbParam;
197

198
typedef struct {
199 200
  int64_t        refId;
  int32_t        epoch;
L
Liu Jicong 已提交
201 202
  int8_t         automatic;
  int8_t         async;
L
Liu Jicong 已提交
203 204
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
L
Liu Jicong 已提交
205
  int32_t        rspErr;
206
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
207 208 209 210
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
  void*  userParam;
  tsem_t rspSem;
211 212 213 214 215
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
  STqOffset*           pOffset;
L
Liu Jicong 已提交
216 217
  /*char                 topicName[TSDB_TOPIC_FNAME_LEN];*/
  /*int32_t              vgId;*/
218
} SMqCommitCbParam;
219

220 221
static int32_t tmqAskEp(tmq_t* tmq, bool async);

222
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
223
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
224 225 226 227 228
  if (conf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return conf;
  }

229
  conf->withTbName = false;
L
Liu Jicong 已提交
230
  conf->autoCommit = true;
L
Liu Jicong 已提交
231
  conf->autoCommitInterval = 5000;
X
Xiaoyu Wang 已提交
232
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
233
  conf->hbBgEnable = true;
234

235 236 237
  return conf;
}

L
Liu Jicong 已提交
238
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
239 240 241 242 243 244
  if (conf) {
    if (conf->ip) taosMemoryFree(conf->ip);
    if (conf->user) taosMemoryFree(conf->user);
    if (conf->pass) taosMemoryFree(conf->pass);
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
245 246 247
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
248
  if (strcmp(key, "group.id") == 0) {
L
Liu Jicong 已提交
249
    tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN);
L
Liu Jicong 已提交
250
    return TMQ_CONF_OK;
251
  }
L
Liu Jicong 已提交
252

253
  if (strcmp(key, "client.id") == 0) {
L
Liu Jicong 已提交
254
    tstrncpy(conf->clientId, value, 256);
L
Liu Jicong 已提交
255 256
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
257

L
Liu Jicong 已提交
258 259
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
260
      conf->autoCommit = true;
L
Liu Jicong 已提交
261 262
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
263
      conf->autoCommit = false;
L
Liu Jicong 已提交
264 265 266 267
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
268
  }
L
Liu Jicong 已提交
269

L
Liu Jicong 已提交
270 271 272 273 274
  if (strcmp(key, "auto.commit.interval.ms") == 0) {
    conf->autoCommitInterval = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
275 276 277 278 279 280 281 282 283 284 285 286 287 288
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
289

290 291
  if (strcmp(key, "msg.with.table.name") == 0) {
    if (strcmp(value, "true") == 0) {
292
      conf->withTbName = true;
L
Liu Jicong 已提交
293
      return TMQ_CONF_OK;
294
    } else if (strcmp(value, "false") == 0) {
295
      conf->withTbName = false;
L
Liu Jicong 已提交
296
      return TMQ_CONF_OK;
297 298 299 300 301
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
302
  if (strcmp(key, "experimental.snapshot.enable") == 0) {
L
Liu Jicong 已提交
303
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
304
      conf->snapEnable = true;
305 306
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
307
      conf->snapEnable = false;
308 309 310 311 312 313
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
314 315 316 317 318
  if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
    conf->snapBatchSize = atoi(value);
    return TMQ_CONF_OK;
  }

319 320 321
  if (strcmp(key, "enable.heartbeat.background") == 0) {
    if (strcmp(value, "true") == 0) {
      conf->hbBgEnable = true;
L
Liu Jicong 已提交
322 323
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
324
      conf->hbBgEnable = false;
L
Liu Jicong 已提交
325 326 327 328
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
329
    return TMQ_CONF_OK;
L
Liu Jicong 已提交
330 331
  }

L
Liu Jicong 已提交
332
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
333 334 335
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
336
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
337 338 339
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
340
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
341 342 343
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
344
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
345 346 347
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
348
  if (strcmp(key, "td.connect.db") == 0) {
349
    /*conf->db = strdup(value);*/
L
Liu Jicong 已提交
350 351 352
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
353
  return TMQ_CONF_UNKNOWN;
354 355 356
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
357 358
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
359 360
}

L
Liu Jicong 已提交
361 362
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
363 364 365 366 367
  if (src == NULL || src[0] == 0) return -1;
  char* topic = strdup(src);
  if (topic[0] != '`') {
    strtolower(topic, src);
  }
L
fix  
Liu Jicong 已提交
368
  if (taosArrayPush(container, &topic) == NULL) return -1;
369 370 371
  return 0;
}

L
Liu Jicong 已提交
372
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
373
  SArray* container = &list->container;
L
Liu Jicong 已提交
374
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
375 376
}

L
Liu Jicong 已提交
377 378 379 380 381 382 383 384 385 386
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

L
Liu Jicong 已提交
387 388 389 390
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422
int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParamSet->refId);
  if (tmq == NULL) {
    if (!pParamSet->async) {
      tsem_destroy(&pParamSet->rspSem);
    }
    taosMemoryFree(pParamSet);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  // if no more waiting rsp
  if (pParamSet->async) {
    // call async cb func
    if (pParamSet->automatic && tmq->commitCb) {
      tmq->commitCb(tmq, pParamSet->rspErr, tmq->commitCbUserParam);
    } else if (!pParamSet->automatic && pParamSet->userCb) {
      // sem post
      pParamSet->userCb(tmq, pParamSet->rspErr, pParamSet->userParam);
    }
    taosMemoryFree(pParamSet);
  } else {
    tsem_post(&pParamSet->rspSem);
  }

#if 0
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
#endif
  return 0;
}

L
Liu Jicong 已提交
423 424
static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet) {
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
425
  ASSERT(waitingRspNum >= 0);
L
Liu Jicong 已提交
426 427 428 429 430
  if (waitingRspNum == 0) {
    tmqCommitDone(pParamSet);
  }
}

431 432
int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
433 434
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
  // push into array
L
Liu Jicong 已提交
435
#if 0
436 437 438 439 440
  if (code == 0) {
    taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset);
  } else {
    taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
  }
L
Liu Jicong 已提交
441
#endif
L
Liu Jicong 已提交
442

L
Liu Jicong 已提交
443
  taosMemoryFree(pParam->pOffset);
L
Liu Jicong 已提交
444
  taosMemoryFree(pBuf->pData);
dengyihao's avatar
dengyihao 已提交
445
  taosMemoryFree(pBuf->pEpSet);
L
Liu Jicong 已提交
446

S
Shengliang Guan 已提交
447
  /*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
L
Liu Jicong 已提交
448 449
   * pOffset->version);*/

L
Liu Jicong 已提交
450
  tmqCommitRspCountDown(pParamSet);
451 452 453 454

  return 0;
}

L
Liu Jicong 已提交
455 456 457 458 459 460
static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pTopic, SMqCommitCbParamSet* pParamSet) {
  STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
  if (pOffset == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
461

L
Liu Jicong 已提交
462
  pOffset->val = pVg->currentOffset;
463

L
Liu Jicong 已提交
464 465 466 467
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pOffset->subKey, tmq->groupId, groupLen);
  pOffset->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);
L
Liu Jicong 已提交
468

L
Liu Jicong 已提交
469 470 471 472 473 474
  int32_t len;
  int32_t code;
  tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
  if (code < 0) {
    return -1;
  }
475

L
Liu Jicong 已提交
476
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
L
Liu Jicong 已提交
477 478 479 480
  if (buf == NULL) {
    taosMemoryFree(pOffset);
    return -1;
  }
481

L
Liu Jicong 已提交
482
  ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
483

L
Liu Jicong 已提交
484 485 486 487 488
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
  tEncodeSTqOffset(&encoder, pOffset);
L
Liu Jicong 已提交
489
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
490 491

  // build param
492
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
493
  if (pParam == NULL) {
L
Liu Jicong 已提交
494
    taosMemoryFree(pOffset);
L
Liu Jicong 已提交
495 496 497
    taosMemoryFree(buf);
    return -1;
  }
498

L
Liu Jicong 已提交
499 500 501 502 503 504
  pParam->params = pParamSet;
  pParam->pOffset = pOffset;

  // build send info
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
L
Liu Jicong 已提交
505
    taosMemoryFree(pOffset);
L
Liu Jicong 已提交
506 507
    taosMemoryFree(buf);
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
508 509
    return -1;
  }
510

L
Liu Jicong 已提交
511 512 513 514 515 516
  pMsgSendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = sizeof(SMsgHead) + len,
      .handle = NULL,
  };

517 518 519
  SEp* pEp = &pVg->epSet.eps[pVg->epSet.inUse];
  tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d offset:%" PRId64" prev:%"PRId64", ep:%s:%d", tmq->consumerId, pOffset->subKey,
           pVg->vgId, pOffset->val.version, pVg->committedOffset.version, pEp->fqdn, pEp->port);
L
Liu Jicong 已提交
520 521

  // TODO: put into cb
L
Liu Jicong 已提交
522
  pVg->committedOffset = pVg->currentOffset;
L
Liu Jicong 已提交
523 524 525 526

  pMsgSendInfo->requestId = generateRequestId();
  pMsgSendInfo->requestObjRefId = 0;
  pMsgSendInfo->param = pParam;
L
Liu Jicong 已提交
527
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
528
  pMsgSendInfo->fp = tmqCommitCb;
L
Liu Jicong 已提交
529
  pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET;
L
Liu Jicong 已提交
530 531
  // send msg

L
Liu Jicong 已提交
532 533 534
  atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
  atomic_add_fetch_32(&pParamSet->totalRspNum, 1);

L
Liu Jicong 已提交
535 536 537 538 539 540 541 542
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
  return 0;
}

int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
  char*   topic;
  int32_t vgId;
543
  ASSERT(msg != NULL);
L
Liu Jicong 已提交
544 545 546 547 548 549 550 551
  if (TD_RES_TMQ(msg)) {
    SMqRspObj* pRspObj = (SMqRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
  } else if (TD_RES_TMQ_META(msg)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
    topic = pMetaRspObj->topic;
    vgId = pMetaRspObj->vgId;
L
Liu Jicong 已提交
552
  } else if (TD_RES_TMQ_METADATA(msg)) {
553 554 555
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
L
Liu Jicong 已提交
556 557 558 559 560 561 562 563 564
  } else {
    return TSDB_CODE_TMQ_INVALID_MSG;
  }

  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
565 566
  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;
L
Liu Jicong 已提交
567 568 569 570 571 572
  pParamSet->automatic = 0;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

L
Liu Jicong 已提交
573 574
  int32_t code = -1;

L
Liu Jicong 已提交
575 576 577 578 579 580 581
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(pTopic->topicName, topic) != 0) continue;
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      if (pVg->vgId != vgId) continue;

L
Liu Jicong 已提交
582
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
L
Liu Jicong 已提交
583
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
L
Liu Jicong 已提交
584 585
          tsem_destroy(&pParamSet->rspSem);
          taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
586
          goto FAIL;
L
Liu Jicong 已提交
587
        }
L
Liu Jicong 已提交
588
        goto HANDLE_RSP;
L
Liu Jicong 已提交
589 590
      }
    }
L
Liu Jicong 已提交
591
  }
L
Liu Jicong 已提交
592

L
Liu Jicong 已提交
593 594 595 596
HANDLE_RSP:
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
597 598 599
    return 0;
  }

L
Liu Jicong 已提交
600 601 602 603
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
L
Liu Jicong 已提交
604
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
605 606 607 608 609 610 611 612 613 614 615 616
    return code;
  } else {
    code = 0;
  }

FAIL:
  if (code != 0 && async) {
    userCb(tmq, code, userParam);
  }
  return 0;
}

L
Liu Jicong 已提交
617 618
static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                                     void* userParam) {
L
Liu Jicong 已提交
619 620
  int32_t code = -1;

621 622
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
L
Liu Jicong 已提交
623 624 625 626 627 628 629 630
    code = TSDB_CODE_OUT_OF_MEMORY;
    if (async) {
      if (automatic) {
        tmq->commitCb(tmq, code, tmq->commitCbUserParam);
      } else {
        userCb(tmq, code, userParam);
      }
    }
631 632
    return -1;
  }
633 634 635 636

  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;

637 638 639 640 641 642
  pParamSet->automatic = automatic;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

643 644 645
  // init as 1 to prevent concurrency issue
  pParamSet->waitingRspNum = 1;

646 647 648 649
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
  tscDebug("consumer:0x%"PRIx64" start to commit offset for %d topics", tmq->consumerId, numOfTopics);

  for (int32_t i = 0; i < numOfTopics; i++) {
650
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
651

652 653
    int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
    for (int32_t j = 0; j < numOfVgroups; j++) {
L
Liu Jicong 已提交
654
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
L
Liu Jicong 已提交
655
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
L
Liu Jicong 已提交
656 657 658
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          continue;
        }
659 660 661
      } else {
        tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, not commit, current:%" PRId64 ", ordinal:%d/%d",
                 tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->currentOffset.version, j + 1, numOfVgroups);
662 663 664 665
      }
    }
  }

L
Liu Jicong 已提交
666
  // no request is sent
L
Liu Jicong 已提交
667 668 669 670 671 672
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
    return 0;
  }

L
Liu Jicong 已提交
673 674
  // count down since waiting rsp num init as 1
  tmqCommitRspCountDown(pParamSet);
675

676 677 678 679
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
680
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
681
#if 0
682 683
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
L
Liu Jicong 已提交
684
#endif
L
Liu Jicong 已提交
685
  }
686

L
Liu Jicong 已提交
687 688 689 690 691 692 693 694 695 696
  return code;
}

int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                       void* userParam) {
  if (msg) {
    return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
  } else {
    return tmqCommitConsumerImpl(tmq, automatic, async, userCb, userParam);
  }
697 698
}

699
void tmqAssignAskEpTask(void* param, void* tmrId) {
700 701 702
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
S
Shengliang Guan 已提交
703
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
704 705 706 707 708
    *pTaskType = TMQ_DELAYED_TASK__ASK_EP;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
709 710 711
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
712 713 714
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
S
Shengliang Guan 已提交
715
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
716 717 718 719 720
    *pTaskType = TMQ_DELAYED_TASK__COMMIT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
721 722 723
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
724 725 726
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
S
Shengliang Guan 已提交
727
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
728 729 730 731 732
    *pTaskType = TMQ_DELAYED_TASK__REPORT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
733 734
}

735
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
dengyihao's avatar
dengyihao 已提交
736 737 738 739
  if (pMsg) {
    taosMemoryFree(pMsg->pData);
    taosMemoryFree(pMsg->pEpSet);
  }
740 741 742 743
  return 0;
}

void tmqSendHbReq(void* param, void* tmrId) {
744 745 746
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq == NULL) {
L
Liu Jicong 已提交
747
    taosMemoryFree(param);
748 749
    return;
  }
D
dapan1121 已提交
750 751 752 753 754

  SMqHbReq req = {0};
  req.consumerId = tmq->consumerId;
  req.epoch = tmq->epoch;

L
Liu Jicong 已提交
755
  int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
D
dapan1121 已提交
756 757 758 759
  if (tlen < 0) {
    tscError("tSerializeSMqHbReq failed");
    return;
  }
L
Liu Jicong 已提交
760
  void* pReq = taosMemoryCalloc(1, tlen);
D
dapan1121 已提交
761 762 763 764 765 766 767 768 769
  if (tlen < 0) {
    tscError("failed to malloc MqHbReq msg, size:%d", tlen);
    return;
  }
  if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) {
    tscError("tSerializeSMqHbReq %d failed", tlen);
    taosMemoryFree(pReq);
    return;
  }
770 771 772 773

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
774
    goto OVER;
775 776 777
  }
  sendInfo->msgInfo = (SDataBuf){
      .pData = pReq,
D
dapan1121 已提交
778
      .len = tlen,
779 780 781 782 783 784 785
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = NULL;
  sendInfo->fp = tmqHbCb;
L
Liu Jicong 已提交
786
  sendInfo->msgType = TDMT_MND_TMQ_HB;
787 788 789 790 791 792 793

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

OVER:
794
  taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer);
795 796
}

797
int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
L
Liu Jicong 已提交
798
  STaosQall* qall = taosAllocateQall();
799 800
  taosReadAllQitems(pTmq->delayedTask, qall);

801 802 803 804
  if (qall->numOfItems == 0) {
    taosFreeQall(qall);
    return TSDB_CODE_SUCCESS;
  }
805

806 807 808
  tscDebug("consumer:0x%"PRIx64" handle delayed %d tasks before poll data", pTmq->consumerId, qall->numOfItems);
  int8_t* pTaskType = NULL;
  taosGetQitem(qall, (void**)&pTaskType);
L
Liu Jicong 已提交
809

810
  while (pTaskType != NULL) {
811
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
812
      tmqAskEp(pTmq, true);
813 814

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
815
      *pRefId = pTmq->refId;
816

817
      tscDebug("consumer:0x%"PRIx64" next retrieve ep from mnode in 1s", pTmq->consumerId);
818
      taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer);
L
Liu Jicong 已提交
819
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
820
      tmqCommitInner(pTmq, NULL, 1, 1, pTmq->commitCb, pTmq->commitCbUserParam);
821 822

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
823
      *pRefId = pTmq->refId;
824

825
      tscDebug("consumer:0x%"PRIx64" next commit to mnode in %.2fs", pTmq->consumerId, pTmq->autoCommitInterval/1000.0);
826
      taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer);
L
Liu Jicong 已提交
827
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
828
      // do nothing
L
Liu Jicong 已提交
829
    } else {
830
      ASSERT(0);
L
Liu Jicong 已提交
831
    }
832

L
Liu Jicong 已提交
833
    taosFreeQitem(pTaskType);
834
    taosGetQitem(qall, (void**)&pTaskType);
L
Liu Jicong 已提交
835
  }
836

L
Liu Jicong 已提交
837 838 839 840
  taosFreeQall(qall);
  return 0;
}

L
Liu Jicong 已提交
841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867
static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
    // do nothing
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
    SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
    tDeleteSMqAskEpRsp(&pEpRspWrapper->msg);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
    taosArrayDestroyP(pRsp->dataRsp.blockData, taosMemoryFree);
    taosArrayDestroy(pRsp->dataRsp.blockDataLen);
    taosArrayDestroyP(pRsp->dataRsp.blockTbName, taosMemoryFree);
    taosArrayDestroyP(pRsp->dataRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
    taosMemoryFree(pRsp->metaRsp.metaRsp);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
    taosArrayDestroyP(pRsp->taosxRsp.blockData, taosMemoryFree);
    taosArrayDestroy(pRsp->taosxRsp.blockDataLen);
    taosArrayDestroyP(pRsp->taosxRsp.blockTbName, taosMemoryFree);
    taosArrayDestroyP(pRsp->taosxRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
    // taosx
    taosArrayDestroy(pRsp->taosxRsp.createTableLen);
    taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree);
  }
}

L
Liu Jicong 已提交
868
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
869
  SMqRspWrapper* rspWrapper = NULL;
L
Liu Jicong 已提交
870
  while (1) {
L
Liu Jicong 已提交
871 872 873 874 875
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper) {
      tmqFreeRspWrapper(rspWrapper);
      taosFreeQitem(rspWrapper);
    } else {
L
Liu Jicong 已提交
876
      break;
L
Liu Jicong 已提交
877
    }
L
Liu Jicong 已提交
878 879
  }

L
Liu Jicong 已提交
880
  rspWrapper = NULL;
L
Liu Jicong 已提交
881 882
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
L
Liu Jicong 已提交
883 884 885 886 887
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper) {
      tmqFreeRspWrapper(rspWrapper);
      taosFreeQitem(rspWrapper);
    } else {
L
Liu Jicong 已提交
888
      break;
L
Liu Jicong 已提交
889
    }
L
Liu Jicong 已提交
890 891 892
  }
}

D
dapan1121 已提交
893
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
894 895
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
dengyihao's avatar
dengyihao 已提交
896 897

  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
898 899 900
  tsem_post(&pParam->rspSem);
  return 0;
}
901

L
Liu Jicong 已提交
902
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
X
Xiaoyu Wang 已提交
903 904 905 906
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
907
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
908
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
909
  }
L
Liu Jicong 已提交
910
  return 0;
X
Xiaoyu Wang 已提交
911 912
}

L
Liu Jicong 已提交
913
int32_t tmq_unsubscribe(tmq_t* tmq) {
L
Liu Jicong 已提交
914 915
  int32_t     rsp;
  int32_t     retryCnt = 0;
L
Liu Jicong 已提交
916
  tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
917 918 919 920 921 922 923 924 925 926
  while (1) {
    rsp = tmq_subscribe(tmq, lst);
    if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
      break;
    } else {
      retryCnt++;
      taosMsleep(500);
    }
  }

L
Liu Jicong 已提交
927 928
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
929 930
}

931 932
void tmqFreeImpl(void* handle) {
  tmq_t* tmq = (tmq_t*)handle;
L
Liu Jicong 已提交
933

934
  // TODO stop timer
L
Liu Jicong 已提交
935 936 937 938
  if (tmq->mqueue) {
    tmqClearUnhandleMsg(tmq);
    taosCloseQueue(tmq->mqueue);
  }
939
  if (tmq->delayedTask) taosCloseQueue(tmq->delayedTask);
L
Liu Jicong 已提交
940
  taosFreeQall(tmq->qall);
L
Liu Jicong 已提交
941

942
  tsem_destroy(&tmq->rspSem);
L
Liu Jicong 已提交
943

944 945 946
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
947
    taosMemoryFreeClear(pTopic->schema.pSchema);
948 949 950 951 952
    taosArrayDestroy(pTopic->vgs);
  }
  taosArrayDestroy(tmq->clientTopics);
  taos_close_internal(tmq->pTscObj);
  taosMemoryFree(tmq);
L
Liu Jicong 已提交
953 954
}

955 956 957 958 959 960 961 962 963
static void tmqMgmtInit(void) {
  tmqInitRes = 0;
  tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");

  if (tmqMgmt.timer == NULL) {
    tmqInitRes = TSDB_CODE_OUT_OF_MEMORY;
  }

  tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
964
  if (tmqMgmt.rsetId < 0) {
965 966 967 968
    tmqInitRes = terrno;
  }
}

L
Liu Jicong 已提交
969
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
970 971 972 973
  taosThreadOnce(&tmqInit, tmqMgmtInit);
  if (tmqInitRes != 0) {
    terrno = tmqInitRes;
    return NULL;
L
Liu Jicong 已提交
974 975
  }

L
Liu Jicong 已提交
976 977
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
L
Liu Jicong 已提交
978
    terrno = TSDB_CODE_OUT_OF_MEMORY;
979
    tscError("failed to create consumer, consumer group %s, code:%s", conf->groupId, terrstr());
L
Liu Jicong 已提交
980 981
    return NULL;
  }
L
Liu Jicong 已提交
982

L
Liu Jicong 已提交
983 984 985
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

986
  ASSERT(conf->groupId[0]);
L
Liu Jicong 已提交
987

L
Liu Jicong 已提交
988 989 990 991 992 993
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
  pTmq->delayedTask = taosOpenQueue();

  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
L
Liu Jicong 已提交
994
    terrno = TSDB_CODE_OUT_OF_MEMORY;
995
    tscError("consumer:0x%" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
S
Shengliang Guan 已提交
996
             pTmq->groupId);
L
Liu Jicong 已提交
997 998
    goto FAIL;
  }
L
Liu Jicong 已提交
999

L
Liu Jicong 已提交
1000 1001
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
1002 1003
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
1004 1005
  /*pTmq->epStatus = 0;*/
  /*pTmq->epSkipCnt = 0;*/
L
Liu Jicong 已提交
1006

L
Liu Jicong 已提交
1007 1008 1009
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
1010
  pTmq->withTbName = conf->withTbName;
L
Liu Jicong 已提交
1011
  pTmq->useSnapshot = conf->snapEnable;
L
Liu Jicong 已提交
1012
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
1013
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
1014 1015
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
1016 1017
  pTmq->resetOffsetCfg = conf->resetOffset;

1018 1019
  pTmq->hbBgEnable = conf->hbBgEnable;

L
Liu Jicong 已提交
1020
  // assign consumerId
L
Liu Jicong 已提交
1021
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
1022

L
Liu Jicong 已提交
1023 1024
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
1025
    tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
S
Shengliang Guan 已提交
1026
             pTmq->groupId);
L
Liu Jicong 已提交
1027 1028
    goto FAIL;
  }
L
Liu Jicong 已提交
1029

L
Liu Jicong 已提交
1030 1031 1032
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
1033
    tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
S
Shengliang Guan 已提交
1034
             pTmq->groupId);
L
Liu Jicong 已提交
1035 1036 1037
    tsem_destroy(&pTmq->rspSem);
    goto FAIL;
  }
L
Liu Jicong 已提交
1038

1039 1040 1041 1042 1043 1044
  pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
  if (pTmq->refId < 0) {
    tmqFreeImpl(pTmq);
    return NULL;
  }

1045
  if (pTmq->hbBgEnable) {
L
Liu Jicong 已提交
1046 1047
    int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
    *pRefId = pTmq->refId;
1048
    pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
1049 1050
  }

1051
  tscInfo("consumer:0x%" PRIx64 " is setup, consumer groupId %s", pTmq->consumerId, pTmq->groupId);
1052
  return pTmq;
L
Liu Jicong 已提交
1053 1054 1055 1056 1057 1058 1059

FAIL:
  if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
  if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
  if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
  if (pTmq->qall) taosFreeQall(pTmq->qall);
  taosMemoryFree(pTmq);
1060

L
Liu Jicong 已提交
1061
  return NULL;
1062 1063
}

L
Liu Jicong 已提交
1064
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
L
Liu Jicong 已提交
1065 1066 1067
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
L
Liu Jicong 已提交
1068
  SMsgSendInfo*   sendInfo = NULL;
L
Liu Jicong 已提交
1069
  SCMSubscribeReq req = {0};
1070
  int32_t         code = 0;
1071

1072
  tscDebug("consumer:0x%"PRIx64" tmq subscribe start, numOfTopic %d", tmq->consumerId, sz);
L
Liu Jicong 已提交
1073

1074
  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
1075
  tstrncpy(req.clientId, tmq->clientId, 256);
L
Liu Jicong 已提交
1076
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
1077 1078
  req.topicNames = taosArrayInit(sz, sizeof(void*));

1079 1080 1081 1082
  if (req.topicNames == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
1083

L
Liu Jicong 已提交
1084 1085
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
1086 1087

    SName name = {0};
L
Liu Jicong 已提交
1088 1089 1090 1091
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
1092 1093
    }

1094
    tNameExtractFullName(&name, topicFName);
1095
    tscDebug("consumer:0x%"PRIx64" subscribe topic: %s", tmq->consumerId, topicFName);
L
Liu Jicong 已提交
1096 1097

    taosArrayPush(req.topicNames, &topicFName);
1098 1099
  }

L
Liu Jicong 已提交
1100
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
1101

L
Liu Jicong 已提交
1102
  buf = taosMemoryMalloc(tlen);
1103 1104 1105 1106
  if (buf == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
L
Liu Jicong 已提交
1107

1108 1109 1110
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

L
Liu Jicong 已提交
1111
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
1112 1113 1114 1115
  if (sendInfo == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
1116

X
Xiaoyu Wang 已提交
1117
  SMqSubscribeCbParam param = {
L
Liu Jicong 已提交
1118
      .rspErr = 0,
1119 1120
      .refId = tmq->refId,
      .epoch = tmq->epoch,
X
Xiaoyu Wang 已提交
1121
  };
L
Liu Jicong 已提交
1122

1123 1124 1125
  if (tsem_init(&param.rspSem, 0, 0) != 0) {
    goto FAIL;
  }
L
Liu Jicong 已提交
1126 1127

  sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1128 1129 1130 1131
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
1132

L
Liu Jicong 已提交
1133 1134
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1135 1136
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
1137
  sendInfo->msgType = TDMT_MND_TMQ_SUBSCRIBE;
L
Liu Jicong 已提交
1138

1139 1140 1141 1142 1143
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
1144 1145
  // avoid double free if msg is sent
  buf = NULL;
L
Liu Jicong 已提交
1146
  sendInfo = NULL;
L
Liu Jicong 已提交
1147

L
Liu Jicong 已提交
1148 1149
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
1150

1151 1152 1153 1154
  if (param.rspErr != 0) {
    code = param.rspErr;
    goto FAIL;
  }
L
Liu Jicong 已提交
1155

L
Liu Jicong 已提交
1156
  int32_t retryCnt = 0;
L
Liu Jicong 已提交
1157
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
L
Liu Jicong 已提交
1158 1159 1160
    if (retryCnt++ > 10) {
      goto FAIL;
    }
1161

1162
    tscDebug("consumer:0x%"PRIx64", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
L
Liu Jicong 已提交
1163 1164
    taosMsleep(500);
  }
1165

1166 1167
  // init ep timer
  if (tmq->epTimer == NULL) {
1168 1169 1170
    int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId1 = tmq->refId;
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, pRefId1, tmqMgmt.timer);
1171
  }
L
Liu Jicong 已提交
1172 1173

  // init auto commit timer
1174
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
1175 1176 1177
    int64_t* pRefId2 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId2 = tmq->refId;
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId2, tmqMgmt.timer);
L
Liu Jicong 已提交
1178 1179
  }

L
Liu Jicong 已提交
1180
FAIL:
L
Liu Jicong 已提交
1181
  taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
1182
  taosMemoryFree(buf);
L
Liu Jicong 已提交
1183
  taosMemoryFree(sendInfo);
L
Liu Jicong 已提交
1184

L
Liu Jicong 已提交
1185
  return code;
1186 1187
}

L
Liu Jicong 已提交
1188
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
1189
  //
1190
  conf->commitCb = cb;
L
Liu Jicong 已提交
1191
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
1192
}
1193

D
dapan1121 已提交
1194
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1195 1196
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
1197
  SMqClientTopic* pTopic = pParam->pTopic;
1198 1199 1200 1201 1202 1203

  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
  if (tmq == NULL) {
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1204
    taosMemoryFree(pMsg->pEpSet);
1205 1206 1207 1208 1209 1210
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  int32_t epoch = pParam->epoch;
  int32_t vgId = pParam->vgId;
L
Liu Jicong 已提交
1211
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
1212
  if (code != 0) {
L
Liu Jicong 已提交
1213
    tscWarn("msg discard from vgId:%d, epoch %d, since %s", vgId, epoch, terrstr());
L
Liu Jicong 已提交
1214
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1215 1216
    if (pMsg->pEpSet) taosMemoryFree(pMsg->pEpSet);

L
Liu Jicong 已提交
1217 1218 1219 1220
    if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
      goto CREATE_MSG_FAIL;
    }
L
Liu Jicong 已提交
1221
    if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
S
Shengliang Guan 已提交
1222
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1223
      if (pRspWrapper == NULL) {
S
Shengliang Guan 已提交
1224
        tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
Liu Jicong 已提交
1225 1226 1227 1228 1229 1230 1231 1232
        goto CREATE_MSG_FAIL;
      }
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      /*pRspWrapper->vgHandle = pVg;*/
      /*pRspWrapper->topicHandle = pTopic;*/
      taosWriteQitem(tmq->mqueue, pRspWrapper);
      tsem_post(&tmq->rspSem);
    }
L
fix txn  
Liu Jicong 已提交
1233
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1234 1235
  }

X
Xiaoyu Wang 已提交
1236 1237 1238
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
1239
    // do not write into queue since updating epoch reset
S
Shengliang Guan 已提交
1240
    tscWarn("msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d", vgId, msgEpoch,
L
Liu Jicong 已提交
1241
            tmqEpoch);
1242
    tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1243
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1244
    taosMemoryFree(pMsg->pEpSet);
X
Xiaoyu Wang 已提交
1245 1246 1247 1248
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
S
Shengliang Guan 已提交
1249
    tscWarn("mismatch rsp from vgId:%d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
X
Xiaoyu Wang 已提交
1250 1251
  }

L
Liu Jicong 已提交
1252 1253 1254
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

S
Shengliang Guan 已提交
1255
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1256
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1257
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1258
    taosMemoryFree(pMsg->pEpSet);
S
Shengliang Guan 已提交
1259
    tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
fix txn  
Liu Jicong 已提交
1260
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1261
  }
L
Liu Jicong 已提交
1262

L
Liu Jicong 已提交
1263
  pRspWrapper->tmqRspType = rspType;
L
Liu Jicong 已提交
1264 1265
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
1266

L
Liu Jicong 已提交
1267
  if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1268 1269 1270
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
wmmhello's avatar
wmmhello 已提交
1271
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1272
    memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1273

1274
    tscDebug("consumer:0x%" PRIx64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d",
L
Liu Jicong 已提交
1275 1276 1277 1278
             tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version,
             rspType);

  } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
1279 1280 1281 1282
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqMetaRsp(&decoder, &pRspWrapper->metaRsp);
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1283
    memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1284 1285 1286 1287 1288 1289 1290
  } else if (rspType == TMQ_MSG_TYPE__TAOSX_RSP) {
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp);
    tDecoderClear(&decoder);
    memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead));
  } else {
1291
    ASSERT(0);
L
Liu Jicong 已提交
1292
  }
L
Liu Jicong 已提交
1293

L
Liu Jicong 已提交
1294
  taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1295
  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
1296

1297
  tscDebug("consumer:0x%" PRIx64 ", put poll res into mqueue %p", tmq->consumerId, pRspWrapper);
L
Liu Jicong 已提交
1298

L
Liu Jicong 已提交
1299
  taosWriteQitem(tmq->mqueue, pRspWrapper);
1300
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1301

L
Liu Jicong 已提交
1302
  return 0;
L
fix txn  
Liu Jicong 已提交
1303
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1304
  if (epoch == tmq->epoch) {
L
Liu Jicong 已提交
1305 1306
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
1307
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1308
  return -1;
1309 1310
}

L
Liu Jicong 已提交
1311
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
1312 1313
  bool set = false;

1314
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
1315
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
1316

1317
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
1318
  tscDebug("consumer:0x%" PRIx64" update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
1319
           tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur);
1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }
1331

1332 1333 1334 1335 1336
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
1337
      tscDebug("consumer:0x%" PRIx64 ", new vg num: %d", tmq->consumerId, vgNumCur);
1338 1339 1340
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
        sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
L
Liu Jicong 已提交
1341
        char buf[80];
L
Liu Jicong 已提交
1342
        tFormatOffset(buf, 80, &pVgCur->currentOffset);
1343
        tscDebug("consumer:0x%" PRIx64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch,
L
Liu Jicong 已提交
1344
                 pVgCur->vgId, vgKey, buf);
L
Liu Jicong 已提交
1345
        taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal));
1346 1347 1348 1349 1350 1351 1352 1353
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
    topic.schema = pTopicEp->schema;
L
Liu Jicong 已提交
1354 1355
    pTopicEp->schema.nCols = 0;
    pTopicEp->schema.pSchema = NULL;
1356
    tstrncpy(topic.topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
1357 1358
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);

1359
    tscDebug("consumer:0x%" PRIx64 ", update topic: %s", tmq->consumerId, topic.topicName);
1360 1361 1362 1363 1364 1365

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
L
Liu Jicong 已提交
1366 1367
      STqOffsetVal* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      STqOffsetVal  offsetNew = {.type = tmq->resetOffsetCfg};
1368
      if (pOffset != NULL) {
L
Liu Jicong 已提交
1369
        offsetNew = *pOffset;
1370 1371 1372 1373
      }

      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1374
          .currentOffset = offsetNew,
1375 1376 1377 1378 1379 1380 1381 1382 1383 1384
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
          .vgSkipCnt = 0,
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
    taosArrayPush(newTopics, &topic);
  }
1385 1386

  // destroy current buffered existed topics info
1387 1388 1389 1390
  if (tmq->clientTopics) {
    int32_t sz = taosArrayGetSize(tmq->clientTopics);
    for (int32_t i = 0; i < sz; i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
1391
      if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
1392
      taosArrayDestroy(pTopic->vgs);
L
Liu Jicong 已提交
1393
    }
1394

1395
    taosArrayDestroy(tmq->clientTopics);
X
Xiaoyu Wang 已提交
1396
  }
1397

L
Liu Jicong 已提交
1398
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
1399
  tmq->clientTopics = newTopics;
1400

1401
  if (taosArrayGetSize(tmq->clientTopics) == 0) {
1402
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
1403
  } else {
1404
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
1405
  }
1406

X
Xiaoyu Wang 已提交
1407
  atomic_store_32(&tmq->epoch, epoch);
1408
  tscDebug("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId);
X
Xiaoyu Wang 已提交
1409 1410 1411
  return set;
}

D
dapan1121 已提交
1412
int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
1413
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1414
  int8_t           async = pParam->async;
1415 1416 1417 1418 1419 1420 1421 1422 1423
  tmq_t*           tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);

  if (tmq == NULL) {
    if (!async) {
      tsem_destroy(&pParam->rspSem);
    } else {
      taosMemoryFree(pParam);
    }
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1424
    taosMemoryFree(pMsg->pEpSet);
1425 1426 1427 1428
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

L
Liu Jicong 已提交
1429
  pParam->code = code;
1430
  if (code != 0) {
1431 1432
    tscError("consumer:0x%" PRIx64 ", get topic endpoint error, async:%d, code:%s", tmq->consumerId,
             pParam->async, tstrerror(code));
L
Liu Jicong 已提交
1433
    goto END;
1434
  }
L
Liu Jicong 已提交
1435

L
Liu Jicong 已提交
1436
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1437
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1438
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1439 1440 1441
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
  if (head->epoch <= epoch) {
1442 1443
    tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep",
             tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1444
    goto END;
1445
  }
L
Liu Jicong 已提交
1446

1447 1448 1449
  tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId,
           head->epoch, epoch);

L
Liu Jicong 已提交
1450
  if (!async) {
L
Liu Jicong 已提交
1451 1452
    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
S
Shengliang Guan 已提交
1453 1454
    /*printf("rsp epoch %" PRId64 " sz %" PRId64 "\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %" PRId64 " sz %" PRId64 "\n", tmq->epoch, tmq->clientTopics->size);*/
L
Liu Jicong 已提交
1455
    tmqUpdateEp(tmq, head->epoch, &rsp);
L
Liu Jicong 已提交
1456
    tDeleteSMqAskEpRsp(&rsp);
X
Xiaoyu Wang 已提交
1457
  } else {
S
Shengliang Guan 已提交
1458
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1459
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1460
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1461 1462
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1463
    }
1464

L
Liu Jicong 已提交
1465 1466 1467
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1468
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1469

L
Liu Jicong 已提交
1470
    taosWriteQitem(tmq->mqueue, pWrapper);
1471
    tsem_post(&tmq->rspSem);
1472
  }
L
Liu Jicong 已提交
1473 1474

END:
L
Liu Jicong 已提交
1475
  /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1476
  if (!async) {
L
Liu Jicong 已提交
1477
    tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
1478 1479
  } else {
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
1480
  }
dengyihao's avatar
dengyihao 已提交
1481 1482

  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
1483
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1484
  return code;
1485 1486
}

L
Liu Jicong 已提交
1487
int32_t tmqAskEp(tmq_t* tmq, bool async) {
1488
  int32_t code = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1489
#if 0
L
Liu Jicong 已提交
1490
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
L
fix txn  
Liu Jicong 已提交
1491
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
1492
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
1493
    tscTrace("consumer:0x%" PRIx64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
1494
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
1495
  }
L
temp  
Liu Jicong 已提交
1496
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
1497
#endif
1498

D
dapan1121 已提交
1499 1500 1501 1502 1503
  SMqAskEpReq req = {0};
  req.consumerId = tmq->consumerId;
  req.epoch = tmq->epoch;
  strcpy(req.cgroup, tmq->groupId);

L
Liu Jicong 已提交
1504
  int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
D
dapan1121 已提交
1505
  if (tlen < 0) {
1506
    tscError("consumer:0x%"PRIx64", tSerializeSMqAskEpReq failed", tmq->consumerId);
D
dapan1121 已提交
1507 1508
    return -1;
  }
1509

L
Liu Jicong 已提交
1510
  void* pReq = taosMemoryCalloc(1, tlen);
L
Liu Jicong 已提交
1511
  if (pReq == NULL) {
1512 1513
    tscError("consumer:0x%"PRIx64", failed to malloc askEpReq msg, size:%d", tmq->consumerId, tlen);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
1514 1515
    return -1;
  }
1516

D
dapan1121 已提交
1517
  if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
1518
    tscError("consumer:0x%"PRIx64", tSerializeSMqAskEpReq %d failed", tmq->consumerId, tlen);
D
dapan1121 已提交
1519
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
1520
    return -1;
L
Liu Jicong 已提交
1521
  }
1522

L
Liu Jicong 已提交
1523
  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
1524
  if (pParam == NULL) {
1525
    tscError("consumer:0x%"PRIx64", failed to malloc subscribe param", tmq->consumerId);
D
dapan1121 已提交
1526
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
1527
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1528
    return -1;
L
Liu Jicong 已提交
1529
  }
1530

1531 1532
  pParam->refId = tmq->refId;
  pParam->epoch = tmq->epoch;
L
Liu Jicong 已提交
1533
  pParam->async = async;
X
Xiaoyu Wang 已提交
1534
  tsem_init(&pParam->rspSem, 0, 0);
1535

1536
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1537 1538
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1539
    taosMemoryFree(pParam);
D
dapan1121 已提交
1540
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
1541
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1542 1543 1544 1545
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
D
dapan1121 已提交
1546
      .pData = pReq,
L
Liu Jicong 已提交
1547 1548 1549 1550
      .len = tlen,
      .handle = NULL,
  };

1551
  sendInfo->requestId = tmq->consumerId;
L
Liu Jicong 已提交
1552 1553 1554
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1555
  sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
1556

L
Liu Jicong 已提交
1557
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
1558
  tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, async:%d, reqId:0x%"PRIx64, tmq->consumerId, async, tmq->consumerId);
L
add log  
Liu Jicong 已提交
1559

L
Liu Jicong 已提交
1560 1561
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1562

L
Liu Jicong 已提交
1563
  if (!async) {
L
Liu Jicong 已提交
1564 1565 1566 1567
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }
1568

L
Liu Jicong 已提交
1569
  return code;
1570 1571
}

L
Liu Jicong 已提交
1572
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1573 1574 1575
  /*strcpy(pReq->topic, pTopic->topicName);*/
  /*strcpy(pReq->cgroup, tmq->groupId);*/

L
Liu Jicong 已提交
1576 1577 1578 1579
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1580

1581
  pReq->withTbName = tmq->withTbName;
1582
  pReq->timeout = timeout;
L
Liu Jicong 已提交
1583
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1584
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1585
  /*pReq->currentOffset = reqOffset;*/
L
Liu Jicong 已提交
1586
  pReq->reqOffset = pVg->currentOffset;
L
Liu Jicong 已提交
1587
  pReq->reqId = generateRequestId();
1588

L
Liu Jicong 已提交
1589 1590
  pReq->useSnapshot = tmq->useSnapshot;

D
dapan1121 已提交
1591
  pReq->head.vgId = pVg->vgId;
1592 1593
}

L
Liu Jicong 已提交
1594 1595
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
L
Liu Jicong 已提交
1596
  pRspObj->resType = RES_TYPE__TMQ_META;
L
Liu Jicong 已提交
1597 1598 1599 1600 1601 1602 1603 1604
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

L
Liu Jicong 已提交
1605 1606 1607
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1608 1609
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1610
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1611
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1612
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
L
Liu Jicong 已提交
1613

L
Liu Jicong 已提交
1614 1615
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
L
Liu Jicong 已提交
1616
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1617 1618
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1619

L
Liu Jicong 已提交
1620
  return pRspObj;
X
Xiaoyu Wang 已提交
1621 1622
}

L
Liu Jicong 已提交
1623 1624
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
1625
  pRspObj->resType = RES_TYPE__TMQ_METADATA;
L
Liu Jicong 已提交
1626 1627 1628 1629
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;
  pRspObj->resIter = -1;
1630
  memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp));
L
Liu Jicong 已提交
1631 1632 1633

  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
1634
  if (!pWrapper->taosxRsp.withSchema) {
L
Liu Jicong 已提交
1635 1636 1637 1638 1639 1640
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }

  return pRspObj;
}

1641
// broadcast the poll request to all related vnodes
1642
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
1643 1644 1645 1646 1647
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
  tscDebug("consumer:0x%" PRIx64" start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics);

  for (int i = 0; i < numOfTopics; i++) {

X
Xiaoyu Wang 已提交
1648 1649 1650 1651
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
1652
      if (vgStatus == TMQ_VG_STATUS__WAIT) {
L
Liu Jicong 已提交
1653
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
1654
        tscDebug("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId,
L
Liu Jicong 已提交
1655
                 vgSkipCnt);
X
Xiaoyu Wang 已提交
1656
        continue;
L
Liu Jicong 已提交
1657
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1658 1659 1660 1661
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
1662
        tscDebug("consumer:0x%" PRIx64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId);
L
temp  
Liu Jicong 已提交
1663 1664
        }
#endif
X
Xiaoyu Wang 已提交
1665
      }
1666

L
Liu Jicong 已提交
1667
      atomic_store_32(&pVg->vgSkipCnt, 0);
D
dapan1121 已提交
1668 1669 1670 1671 1672 1673 1674 1675 1676

      SMqPollReq req = {0};
      tmqBuildConsumeReqImpl(&req, tmq, timeout, pTopic, pVg);
      int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
      if (msgSize < 0) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        tsem_post(&tmq->rspSem);
        return -1;
      }
1677

L
Liu Jicong 已提交
1678
      char* msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
1679 1680 1681 1682 1683
      if (NULL == msg) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        tsem_post(&tmq->rspSem);
        return -1;
      }
L
Liu Jicong 已提交
1684

D
dapan1121 已提交
1685 1686
      if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
        taosMemoryFree(msg);
X
Xiaoyu Wang 已提交
1687
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1688
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1689 1690
        return -1;
      }
L
Liu Jicong 已提交
1691

wafwerar's avatar
wafwerar 已提交
1692
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1693
      if (pParam == NULL) {
D
dapan1121 已提交
1694
        taosMemoryFree(msg);
X
Xiaoyu Wang 已提交
1695
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1696
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1697 1698
        return -1;
      }
1699

1700 1701 1702
      pParam->refId = tmq->refId;
      pParam->epoch = tmq->epoch;

L
Liu Jicong 已提交
1703
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1704
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1705
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1706

1707
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1708
      if (sendInfo == NULL) {
D
dapan1121 已提交
1709
        taosMemoryFree(msg);
wafwerar's avatar
wafwerar 已提交
1710
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1711
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1712
        tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1713 1714 1715 1716
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
D
dapan1121 已提交
1717 1718
          .pData = msg,
          .len = msgSize,
X
Xiaoyu Wang 已提交
1719 1720
          .handle = NULL,
      };
1721

D
dapan1121 已提交
1722
      sendInfo->requestId = req.reqId;
X
Xiaoyu Wang 已提交
1723
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1724
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1725
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1726
      sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
X
Xiaoyu Wang 已提交
1727 1728

      int64_t transporterId = 0;
L
Liu Jicong 已提交
1729

1730
      char offsetFormatBuf[80];
L
Liu Jicong 已提交
1731
      tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffset);
1732 1733

      tscDebug("consumer:0x%" PRIx64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:0x%" PRIx64,
D
dapan1121 已提交
1734
               tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
X
Xiaoyu Wang 已提交
1735
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
1736

X
Xiaoyu Wang 已提交
1737 1738 1739 1740
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
1741

X
Xiaoyu Wang 已提交
1742 1743 1744
  return 0;
}

L
Liu Jicong 已提交
1745 1746
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1747
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1748 1749
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1750
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
L
Liu Jicong 已提交
1751
      tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1752
      /*tmqClearUnhandleMsg(tmq);*/
L
Liu Jicong 已提交
1753
      tDeleteSMqAskEpRsp(rspMsg);
X
Xiaoyu Wang 已提交
1754 1755
      *pReset = true;
    } else {
L
Liu Jicong 已提交
1756
      tmqFreeRspWrapper(rspWrapper);
X
Xiaoyu Wang 已提交
1757 1758 1759 1760 1761 1762 1763 1764
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
1765
void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
X
Xiaoyu Wang 已提交
1766
  while (1) {
L
Liu Jicong 已提交
1767 1768
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
1769

L
Liu Jicong 已提交
1770
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1771
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1772
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
L
Liu Jicong 已提交
1773 1774

      if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1775
        /*tscDebug("consumer %" PRId64 " mqueue empty", tmq->consumerId);*/
L
Liu Jicong 已提交
1776 1777
        return NULL;
      }
X
Xiaoyu Wang 已提交
1778 1779
    }

1780
    tscDebug("consumer:0x%" PRIx64 " handle rsp %p", tmq->consumerId, rspWrapper);
L
Liu Jicong 已提交
1781

L
Liu Jicong 已提交
1782 1783 1784 1785 1786
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(rspWrapper);
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
      return NULL;
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1787
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
1788
      tscDebug("consumer:0x%" PRIx64 " actual process poll rsp", tmq->consumerId);
L
Liu Jicong 已提交
1789
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
1790
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1791
      if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1792
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1793
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1794
         * rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1795
        pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;
X
Xiaoyu Wang 已提交
1796
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1797
        if (pollRspWrapper->dataRsp.blockNum == 0) {
L
Liu Jicong 已提交
1798 1799
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1800 1801
          continue;
        }
L
Liu Jicong 已提交
1802
        // build rsp
L
Liu Jicong 已提交
1803
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1804
        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1805
        return pRsp;
X
Xiaoyu Wang 已提交
1806
      } else {
1807
        tscDebug("consumer:0x%"PRIx64" msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
1808
                 tmq->consumerId, pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1809
        tmqFreeRspWrapper(rspWrapper);
L
Liu Jicong 已提交
1810 1811 1812 1813 1814
        taosFreeQitem(pollRspWrapper);
      }
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1815
      if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1816
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1817
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1818
         * rspMsg->msg.rspOffset);*/
wmmhello's avatar
wmmhello 已提交
1819
        pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset;
L
Liu Jicong 已提交
1820 1821
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // build rsp
L
Liu Jicong 已提交
1822
        SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1823 1824 1825
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
1826
        tscDebug("consumer:0x%"PRIx64" msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
1827
                 tmq->consumerId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1828
        tmqFreeRspWrapper(rspWrapper);
L
Liu Jicong 已提交
1829
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1830
      }
L
Liu Jicong 已提交
1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
      if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
         * rspMsg->msg.rspOffset);*/
        pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset;
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        if (pollRspWrapper->taosxRsp.blockNum == 0) {
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
          continue;
        }
wmmhello's avatar
wmmhello 已提交
1846

L
Liu Jicong 已提交
1847
        // build rsp
wmmhello's avatar
wmmhello 已提交
1848
        void* pRsp = NULL;
L
Liu Jicong 已提交
1849
        if (pollRspWrapper->taosxRsp.createTableNum == 0) {
wmmhello's avatar
wmmhello 已提交
1850
          pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1851
        } else {
wmmhello's avatar
wmmhello 已提交
1852 1853
          pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper);
        }
L
Liu Jicong 已提交
1854 1855 1856
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
1857
        tscDebug("consumer:0x%"PRIx64" msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
1858
                 tmq->consumerId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1859
        tmqFreeRspWrapper(rspWrapper);
L
Liu Jicong 已提交
1860 1861
        taosFreeQitem(pollRspWrapper);
      }
X
Xiaoyu Wang 已提交
1862
    } else {
L
fix  
Liu Jicong 已提交
1863
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1864
      bool reset = false;
L
Liu Jicong 已提交
1865 1866
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1867
      if (pollIfReset && reset) {
1868
        tscDebug("consumer:0x%" PRIx64 ", reset and repoll", tmq->consumerId);
1869
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
1870 1871 1872 1873 1874
      }
    }
  }
}

1875
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1876 1877
  void*   rspObj;
  int64_t startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1878

1879
  tscDebug("consumer:0x%" PRIx64 ", start poll at %" PRId64, tmq->consumerId, startTime);
L
Liu Jicong 已提交
1880

1881 1882 1883
#if 0
  tmqHandleAllDelayedTask(tmq);
  tmqPollImpl(tmq, timeout);
1884
  rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1885 1886
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1887
  }
1888
#endif
X
Xiaoyu Wang 已提交
1889

1890
  // in no topic status, delayed task also need to be processed
L
Liu Jicong 已提交
1891
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1892
    tscDebug("consumer:0x%" PRIx64 ", poll return since consumer is init", tmq->consumerId);
1893 1894 1895
    return NULL;
  }

L
Liu Jicong 已提交
1896 1897 1898 1899 1900 1901
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
    int32_t retryCnt = 0;
    while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
      if (retryCnt++ > 10) {
        return NULL;
      }
1902 1903

      tscDebug("consumer:0x%"PRIx64" not ready, retry:%d/10 in 500ms", tmq->consumerId, retryCnt);
L
Liu Jicong 已提交
1904 1905 1906 1907
      taosMsleep(500);
    }
  }

X
Xiaoyu Wang 已提交
1908
  while (1) {
L
Liu Jicong 已提交
1909
    tmqHandleAllDelayedTask(tmq);
1910

L
Liu Jicong 已提交
1911
    if (tmqPollImpl(tmq, timeout) < 0) {
1912
      tscDebug("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId);
L
Liu Jicong 已提交
1913 1914
      /*return NULL;*/
    }
L
Liu Jicong 已提交
1915

1916
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1917
    if (rspObj) {
1918
      tscDebug("consumer:0x%" PRIx64 ", return rsp %p", tmq->consumerId, rspObj);
L
Liu Jicong 已提交
1919
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
1920
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
1921
      tscDebug("consumer:0x%" PRIx64 ", return null since no committed offset", tmq->consumerId);
L
Liu Jicong 已提交
1922
      return NULL;
X
Xiaoyu Wang 已提交
1923
    }
1924

1925
    if (timeout != -1) {
L
Liu Jicong 已提交
1926 1927 1928
      int64_t currentTime = taosGetTimestampMs();
      int64_t passedTime = currentTime - startTime;
      if (passedTime > timeout) {
1929
        tscDebug("consumer:0x%" PRIx64 ", (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
L
Liu Jicong 已提交
1930
                 tmq->consumerId, tmq->epoch, startTime, currentTime);
X
Xiaoyu Wang 已提交
1931 1932
        return NULL;
      }
1933
      /*tscInfo("consumer:0x%" PRIx64 ", (epoch %d) wait, start time %" PRId64 ", current time %" PRId64*/
L
Liu Jicong 已提交
1934 1935 1936
      /*", left time %" PRId64,*/
      /*tmq->consumerId, tmq->epoch, startTime, currentTime, (timeout - passedTime));*/
      tsem_timewait(&tmq->rspSem, (timeout - passedTime));
L
Liu Jicong 已提交
1937 1938
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
L
Liu Jicong 已提交
1939
      tsem_timewait(&tmq->rspSem, 1000);
X
Xiaoyu Wang 已提交
1940 1941 1942 1943
    }
  }
}

L
Liu Jicong 已提交
1944
int32_t tmq_consumer_close(tmq_t* tmq) {
1945
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
L
Liu Jicong 已提交
1946 1947
    int32_t rsp = tmq_commit_sync(tmq, NULL);
    if (rsp != 0) {
L
Liu Jicong 已提交
1948
      return rsp;
1949 1950
    }

L
Liu Jicong 已提交
1951
    int32_t     retryCnt = 0;
1952
    tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
1953 1954 1955 1956 1957 1958 1959 1960 1961 1962
    while (1) {
      rsp = tmq_subscribe(tmq, lst);
      if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
        break;
      } else {
        retryCnt++;
        taosMsleep(500);
      }
    }

1963
    tmq_list_destroy(lst);
L
Liu Jicong 已提交
1964
  }
1965
  taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
L
Liu Jicong 已提交
1966
  return 0;
1967
}
L
Liu Jicong 已提交
1968

L
Liu Jicong 已提交
1969 1970
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
1971
    return "success";
L
Liu Jicong 已提交
1972
  } else if (err == -1) {
L
Liu Jicong 已提交
1973 1974 1975
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
1976 1977
  }
}
L
Liu Jicong 已提交
1978

L
Liu Jicong 已提交
1979 1980 1981 1982 1983
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
    return TMQ_RES_TABLE_META;
1984 1985
  } else if (TD_RES_TMQ_METADATA(res)) {
    return TMQ_RES_METADATA;
L
Liu Jicong 已提交
1986 1987 1988 1989 1990
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
1991
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
1992 1993
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
1994
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1995 1996 1997
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
1998 1999 2000
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
2001 2002 2003 2004 2005
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
2006 2007 2008 2009
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
2010 2011 2012
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
2013 2014 2015
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
2016 2017 2018 2019 2020
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
2021 2022 2023 2024
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
2025 2026 2027
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
2028
  } else if (TD_RES_TMQ_METADATA(res)) {
2029 2030
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
2031 2032 2033 2034
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
2035 2036 2037 2038 2039 2040 2041 2042

const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
2043
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
2044 2045
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
L
Liu Jicong 已提交
2046 2047 2048
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
2049
    }
L
Liu Jicong 已提交
2050 2051
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
  }
L
Liu Jicong 已提交
2052 2053
  return NULL;
}
2054

L
Liu Jicong 已提交
2055
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
2056
  //
L
Liu Jicong 已提交
2057
  tmqCommitInner(tmq, msg, 0, 1, cb, param);
L
Liu Jicong 已提交
2058 2059
}

2060 2061
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) {
  //
L
Liu Jicong 已提交
2062
  return tmqCommitInner(tmq, msg, 0, 0, NULL, NULL);
2063
}