clientTmq.c 60.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "cJSON.h"
17 18 19
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22 23
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
24
#include "tqueue.h"
25
#include "tref.h"
L
Liu Jicong 已提交
26 27
#include "ttimer.h"

L
Liu Jicong 已提交
28 29 30 31 32 33 34
#if 0
#undef tsem_post
#define tsem_post(x)                                         \
  tscInfo("call sem post at %s %d", __FUNCTION__, __LINE__); \
  sem_post(x)
#endif

35
struct SMqMgmt{
36 37 38
  int8_t  inited;
  tmr_h   timer;
  int32_t rsetId;
39
};
L
Liu Jicong 已提交
40

41 42 43
static TdThreadOnce tmqInit = PTHREAD_ONCE_INIT;   // initialize only once
volatile int32_t    tmqInitRes = 0;                // initialize rsp code
static struct SMqMgmt tmqMgmt = {0};
44

L
Liu Jicong 已提交
45 46 47 48 49 50
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
51 52 53
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
54 55
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
56
struct tmq_list_t {
L
Liu Jicong 已提交
57
  SArray container;
L
Liu Jicong 已提交
58
};
L
Liu Jicong 已提交
59

L
Liu Jicong 已提交
60
struct tmq_conf_t {
61 62 63 64 65
  char    clientId[256];
  char    groupId[TSDB_CGROUP_LEN];
  int8_t  autoCommit;
  int8_t  resetOffset;
  int8_t  withTbName;
L
Liu Jicong 已提交
66 67
  int8_t  snapEnable;
  int32_t snapBatchSize;
68
  bool    hbBgEnable;
69

70 71 72 73 74
  uint16_t       port;
  int32_t        autoCommitInterval;
  char*          ip;
  char*          user;
  char*          pass;
75
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
76
  void*          commitCbUserParam;
L
Liu Jicong 已提交
77 78 79
};

struct tmq_t {
80
  int64_t  refId;
L
Liu Jicong 已提交
81
  // conf
82 83 84 85 86 87 88 89 90
  char     groupId[TSDB_CGROUP_LEN];
  char     clientId[256];
  int8_t   withTbName;
  int8_t   useSnapshot;
  int8_t   autoCommit;
  int32_t  autoCommitInterval;
  int32_t  resetOffsetCfg;
  uint64_t consumerId;
  bool     hbBgEnable;
91

L
Liu Jicong 已提交
92 93
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
94 95 96 97

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
98 99
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
100
  int32_t epSkipCnt;
L
Liu Jicong 已提交
101
#endif
L
Liu Jicong 已提交
102 103
  int64_t pollCnt;

L
Liu Jicong 已提交
104
  // timer
105 106
  tmr_h hbLiveTimer;
  tmr_h epTimer;
L
Liu Jicong 已提交
107 108 109
  tmr_h reportTimer;
  tmr_h commitTimer;

L
Liu Jicong 已提交
110 111 112 113
  // connection
  STscObj* pTscObj;

  // container
L
Liu Jicong 已提交
114
  SArray*     clientTopics;  // SArray<SMqClientTopic>
L
Liu Jicong 已提交
115
  STaosQueue* mqueue;        // queue of rsp
L
Liu Jicong 已提交
116
  STaosQall*  qall;
L
Liu Jicong 已提交
117 118 119 120
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit

  // ctl
  tsem_t rspSem;
L
Liu Jicong 已提交
121 122
};

X
Xiaoyu Wang 已提交
123 124 125 126 127 128 129 130
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
131
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
132
  TMQ_CONSUMER_STATUS__RECOVER,
L
Liu Jicong 已提交
133 134
};

L
Liu Jicong 已提交
135
enum {
136
  TMQ_DELAYED_TASK__ASK_EP = 1,
L
Liu Jicong 已提交
137 138 139 140
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

L
Liu Jicong 已提交
141
typedef struct {
142 143 144
  // statistics
  int64_t pollCnt;
  // offset
L
Liu Jicong 已提交
145 146
  STqOffsetVal committedOffset;
  STqOffsetVal currentOffset;
L
Liu Jicong 已提交
147
  // connection info
148
  int32_t vgId;
X
Xiaoyu Wang 已提交
149
  int32_t vgStatus;
L
Liu Jicong 已提交
150
  int32_t vgSkipCnt;
151 152 153
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
154
typedef struct {
155
  // subscribe info
156 157
  char topicName[TSDB_TOPIC_FNAME_LEN];
  char db[TSDB_DB_FNAME_LEN];
L
Liu Jicong 已提交
158 159 160

  SArray* vgs;  // SArray<SMqClientVg>

L
Liu Jicong 已提交
161
  SSchemaWrapper schema;
162 163
} SMqClientTopic;

L
Liu Jicong 已提交
164 165 166 167 168
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
169
  union {
L
Liu Jicong 已提交
170 171
    SMqDataRsp dataRsp;
    SMqMetaRsp metaRsp;
L
Liu Jicong 已提交
172
    STaosxRsp  taosxRsp;
L
Liu Jicong 已提交
173
  };
L
Liu Jicong 已提交
174 175
} SMqPollRspWrapper;

L
Liu Jicong 已提交
176
typedef struct {
177 178
  int64_t refId;
  int32_t epoch;
L
Liu Jicong 已提交
179 180
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
181
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
182

L
Liu Jicong 已提交
183
typedef struct {
184 185
  int64_t refId;
  int32_t epoch;
L
Liu Jicong 已提交
186
  int32_t code;
L
Liu Jicong 已提交
187
  int32_t async;
X
Xiaoyu Wang 已提交
188
  tsem_t  rspSem;
189 190
} SMqAskEpCbParam;

L
Liu Jicong 已提交
191
typedef struct {
192 193
  int64_t         refId;
  int32_t         epoch;
L
Liu Jicong 已提交
194
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
195
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
196
  int32_t         vgId;
L
Liu Jicong 已提交
197
  tsem_t          rspSem;
X
Xiaoyu Wang 已提交
198
} SMqPollCbParam;
199

200
typedef struct {
201 202
  int64_t        refId;
  int32_t        epoch;
L
Liu Jicong 已提交
203 204
  int8_t         automatic;
  int8_t         async;
L
Liu Jicong 已提交
205 206
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
L
Liu Jicong 已提交
207
  int32_t        rspErr;
208
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
209 210 211 212
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
  void*  userParam;
  tsem_t rspSem;
213 214 215 216 217
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
  STqOffset*           pOffset;
L
Liu Jicong 已提交
218 219
  /*char                 topicName[TSDB_TOPIC_FNAME_LEN];*/
  /*int32_t              vgId;*/
220
} SMqCommitCbParam;
221

222 223
static int32_t tmqAskEp(tmq_t* tmq, bool async);

224
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
225
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
226 227 228 229 230
  if (conf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return conf;
  }

231
  conf->withTbName = false;
L
Liu Jicong 已提交
232
  conf->autoCommit = true;
L
Liu Jicong 已提交
233
  conf->autoCommitInterval = 5000;
X
Xiaoyu Wang 已提交
234
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
235
  conf->hbBgEnable = true;
236

237 238 239
  return conf;
}

L
Liu Jicong 已提交
240
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
241 242 243 244 245 246
  if (conf) {
    if (conf->ip) taosMemoryFree(conf->ip);
    if (conf->user) taosMemoryFree(conf->user);
    if (conf->pass) taosMemoryFree(conf->pass);
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
247 248 249
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
250
  if (strcmp(key, "group.id") == 0) {
L
Liu Jicong 已提交
251
    tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN);
L
Liu Jicong 已提交
252
    return TMQ_CONF_OK;
253
  }
L
Liu Jicong 已提交
254

255
  if (strcmp(key, "client.id") == 0) {
L
Liu Jicong 已提交
256
    tstrncpy(conf->clientId, value, 256);
L
Liu Jicong 已提交
257 258
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
259

L
Liu Jicong 已提交
260 261
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
262
      conf->autoCommit = true;
L
Liu Jicong 已提交
263 264
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
265
      conf->autoCommit = false;
L
Liu Jicong 已提交
266 267 268 269
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
270
  }
L
Liu Jicong 已提交
271

L
Liu Jicong 已提交
272 273 274 275 276
  if (strcmp(key, "auto.commit.interval.ms") == 0) {
    conf->autoCommitInterval = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
277 278 279 280 281 282 283 284 285 286 287 288 289 290
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
291

292 293
  if (strcmp(key, "msg.with.table.name") == 0) {
    if (strcmp(value, "true") == 0) {
294
      conf->withTbName = true;
L
Liu Jicong 已提交
295
      return TMQ_CONF_OK;
296
    } else if (strcmp(value, "false") == 0) {
297
      conf->withTbName = false;
L
Liu Jicong 已提交
298
      return TMQ_CONF_OK;
299 300 301 302 303
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
304
  if (strcmp(key, "experimental.snapshot.enable") == 0) {
L
Liu Jicong 已提交
305
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
306
      conf->snapEnable = true;
307 308
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
309
      conf->snapEnable = false;
310 311 312 313 314 315
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
316 317 318 319 320
  if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
    conf->snapBatchSize = atoi(value);
    return TMQ_CONF_OK;
  }

321 322 323
  if (strcmp(key, "enable.heartbeat.background") == 0) {
    if (strcmp(value, "true") == 0) {
      conf->hbBgEnable = true;
L
Liu Jicong 已提交
324 325
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
326
      conf->hbBgEnable = false;
L
Liu Jicong 已提交
327 328 329 330
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
331
    return TMQ_CONF_OK;
L
Liu Jicong 已提交
332 333
  }

L
Liu Jicong 已提交
334
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
335 336 337
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
338
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
339 340 341
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
342
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
343 344 345
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
346
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
347 348 349
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
350
  if (strcmp(key, "td.connect.db") == 0) {
351
    /*conf->db = strdup(value);*/
L
Liu Jicong 已提交
352 353 354
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
355
  return TMQ_CONF_UNKNOWN;
356 357 358
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
359 360
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
361 362
}

L
Liu Jicong 已提交
363 364
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
365 366 367 368 369
  if (src == NULL || src[0] == 0) return -1;
  char* topic = strdup(src);
  if (topic[0] != '`') {
    strtolower(topic, src);
  }
L
fix  
Liu Jicong 已提交
370
  if (taosArrayPush(container, &topic) == NULL) return -1;
371 372 373
  return 0;
}

L
Liu Jicong 已提交
374
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
375
  SArray* container = &list->container;
L
Liu Jicong 已提交
376
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
377 378
}

L
Liu Jicong 已提交
379 380 381 382 383 384 385 386 387 388
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

L
Liu Jicong 已提交
389 390 391 392
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424
int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParamSet->refId);
  if (tmq == NULL) {
    if (!pParamSet->async) {
      tsem_destroy(&pParamSet->rspSem);
    }
    taosMemoryFree(pParamSet);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  // if no more waiting rsp
  if (pParamSet->async) {
    // call async cb func
    if (pParamSet->automatic && tmq->commitCb) {
      tmq->commitCb(tmq, pParamSet->rspErr, tmq->commitCbUserParam);
    } else if (!pParamSet->automatic && pParamSet->userCb) {
      // sem post
      pParamSet->userCb(tmq, pParamSet->rspErr, pParamSet->userParam);
    }
    taosMemoryFree(pParamSet);
  } else {
    tsem_post(&pParamSet->rspSem);
  }

#if 0
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
#endif
  return 0;
}

L
Liu Jicong 已提交
425 426
static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet) {
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
427
  ASSERT(waitingRspNum >= 0);
L
Liu Jicong 已提交
428 429 430 431 432
  if (waitingRspNum == 0) {
    tmqCommitDone(pParamSet);
  }
}

433 434
int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
435 436
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
  // push into array
L
Liu Jicong 已提交
437
#if 0
438 439 440 441 442
  if (code == 0) {
    taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset);
  } else {
    taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
  }
L
Liu Jicong 已提交
443
#endif
L
Liu Jicong 已提交
444

L
Liu Jicong 已提交
445
  taosMemoryFree(pParam->pOffset);
L
Liu Jicong 已提交
446
  taosMemoryFree(pBuf->pData);
dengyihao's avatar
dengyihao 已提交
447
  taosMemoryFree(pBuf->pEpSet);
L
Liu Jicong 已提交
448

S
Shengliang Guan 已提交
449
  /*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
L
Liu Jicong 已提交
450 451
   * pOffset->version);*/

L
Liu Jicong 已提交
452
  tmqCommitRspCountDown(pParamSet);
453 454 455 456

  return 0;
}

L
Liu Jicong 已提交
457 458 459 460 461 462
static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pTopic, SMqCommitCbParamSet* pParamSet) {
  STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
  if (pOffset == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
463
  pOffset->val = pVg->currentOffset;
464

L
Liu Jicong 已提交
465 466 467 468
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pOffset->subKey, tmq->groupId, groupLen);
  pOffset->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);
L
Liu Jicong 已提交
469

L
Liu Jicong 已提交
470 471 472 473 474 475 476
  int32_t len;
  int32_t code;
  tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
  if (code < 0) {
    return -1;
  }
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
L
Liu Jicong 已提交
477 478 479 480
  if (buf == NULL) {
    taosMemoryFree(pOffset);
    return -1;
  }
L
Liu Jicong 已提交
481
  ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
482

L
Liu Jicong 已提交
483 484 485 486 487
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
  tEncodeSTqOffset(&encoder, pOffset);
L
Liu Jicong 已提交
488
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
489 490

  // build param
491
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
492
  if (pParam == NULL) {
L
Liu Jicong 已提交
493
    taosMemoryFree(pOffset);
L
Liu Jicong 已提交
494 495 496
    taosMemoryFree(buf);
    return -1;
  }
L
Liu Jicong 已提交
497 498 499 500 501 502
  pParam->params = pParamSet;
  pParam->pOffset = pOffset;

  // build send info
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
L
Liu Jicong 已提交
503
    taosMemoryFree(pOffset);
L
Liu Jicong 已提交
504 505
    taosMemoryFree(buf);
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
506 507 508 509 510 511 512 513
    return -1;
  }
  pMsgSendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = sizeof(SMsgHead) + len,
      .handle = NULL,
  };

S
Shengliang Guan 已提交
514 515
  tscDebug("consumer:%" PRId64 ", commit offset of %s on vgId:%d, offset is %" PRId64, tmq->consumerId, pOffset->subKey,
           pVg->vgId, pOffset->val.version);
L
Liu Jicong 已提交
516 517

  // TODO: put into cb
L
Liu Jicong 已提交
518
  pVg->committedOffset = pVg->currentOffset;
L
Liu Jicong 已提交
519 520 521 522

  pMsgSendInfo->requestId = generateRequestId();
  pMsgSendInfo->requestObjRefId = 0;
  pMsgSendInfo->param = pParam;
L
Liu Jicong 已提交
523
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
524
  pMsgSendInfo->fp = tmqCommitCb;
L
Liu Jicong 已提交
525
  pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET;
L
Liu Jicong 已提交
526 527
  // send msg

L
Liu Jicong 已提交
528 529 530
  atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
  atomic_add_fetch_32(&pParamSet->totalRspNum, 1);

L
Liu Jicong 已提交
531 532 533 534 535 536 537 538
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
  return 0;
}

int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
  char*   topic;
  int32_t vgId;
539
  ASSERT(msg != NULL);
L
Liu Jicong 已提交
540 541 542 543 544 545 546 547
  if (TD_RES_TMQ(msg)) {
    SMqRspObj* pRspObj = (SMqRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
  } else if (TD_RES_TMQ_META(msg)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
    topic = pMetaRspObj->topic;
    vgId = pMetaRspObj->vgId;
L
Liu Jicong 已提交
548
  } else if (TD_RES_TMQ_METADATA(msg)) {
549 550 551
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
L
Liu Jicong 已提交
552 553 554 555 556 557 558 559 560
  } else {
    return TSDB_CODE_TMQ_INVALID_MSG;
  }

  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
561 562
  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;
L
Liu Jicong 已提交
563 564 565 566 567 568
  pParamSet->automatic = 0;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

L
Liu Jicong 已提交
569 570
  int32_t code = -1;

L
Liu Jicong 已提交
571 572 573 574 575 576 577
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(pTopic->topicName, topic) != 0) continue;
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      if (pVg->vgId != vgId) continue;

L
Liu Jicong 已提交
578
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
L
Liu Jicong 已提交
579
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
L
Liu Jicong 已提交
580 581
          tsem_destroy(&pParamSet->rspSem);
          taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
582
          goto FAIL;
L
Liu Jicong 已提交
583
        }
L
Liu Jicong 已提交
584
        goto HANDLE_RSP;
L
Liu Jicong 已提交
585 586
      }
    }
L
Liu Jicong 已提交
587
  }
L
Liu Jicong 已提交
588

L
Liu Jicong 已提交
589 590 591 592
HANDLE_RSP:
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
593 594 595
    return 0;
  }

L
Liu Jicong 已提交
596 597 598 599
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
L
Liu Jicong 已提交
600
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
601 602 603 604 605 606 607 608 609 610 611 612
    return code;
  } else {
    code = 0;
  }

FAIL:
  if (code != 0 && async) {
    userCb(tmq, code, userParam);
  }
  return 0;
}

L
Liu Jicong 已提交
613 614
static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                                     void* userParam) {
L
Liu Jicong 已提交
615 616
  int32_t code = -1;

617 618
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
L
Liu Jicong 已提交
619 620 621 622 623 624 625 626
    code = TSDB_CODE_OUT_OF_MEMORY;
    if (async) {
      if (automatic) {
        tmq->commitCb(tmq, code, tmq->commitCbUserParam);
      } else {
        userCb(tmq, code, userParam);
      }
    }
627 628
    return -1;
  }
629 630 631 632

  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;

633 634 635 636 637 638
  pParamSet->automatic = automatic;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

639 640 641
  // init as 1 to prevent concurrency issue
  pParamSet->waitingRspNum = 1;

642 643
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
644

S
Shengliang Guan 已提交
645
    tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgNum %d", tmq->consumerId, pTopic->topicName,
L
Liu Jicong 已提交
646 647
             (int32_t)taosArrayGetSize(pTopic->vgs));

648
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
L
Liu Jicong 已提交
649 650
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);

S
Shengliang Guan 已提交
651 652
      tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgId:%d", tmq->consumerId, pTopic->topicName,
               pVg->vgId);
L
Liu Jicong 已提交
653

L
Liu Jicong 已提交
654
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
655
        tscDebug("consumer: %" PRId64 ", vg:%d, current %" PRId64 ", committed %" PRId64 "", tmq->consumerId, pVg->vgId,
L
Liu Jicong 已提交
656
                 pVg->currentOffset.version, pVg->committedOffset.version);
L
Liu Jicong 已提交
657 658 659
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          continue;
        }
660 661 662 663
      }
    }
  }

L
Liu Jicong 已提交
664
  // no request is sent
L
Liu Jicong 已提交
665 666 667 668 669 670
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
    return 0;
  }

L
Liu Jicong 已提交
671 672
  // count down since waiting rsp num init as 1
  tmqCommitRspCountDown(pParamSet);
673

674 675 676 677
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
678
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
679
#if 0
680 681
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
L
Liu Jicong 已提交
682
#endif
L
Liu Jicong 已提交
683
  }
684

L
Liu Jicong 已提交
685 686 687 688 689 690 691 692 693 694
  return code;
}

int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                       void* userParam) {
  if (msg) {
    return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
  } else {
    return tmqCommitConsumerImpl(tmq, automatic, async, userCb, userParam);
  }
695 696
}

697
void tmqAssignAskEpTask(void* param, void* tmrId) {
698 699 700
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
S
Shengliang Guan 已提交
701
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
702 703 704 705 706
    *pTaskType = TMQ_DELAYED_TASK__ASK_EP;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
707 708 709
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
710 711 712
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
S
Shengliang Guan 已提交
713
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
714 715 716 717 718
    *pTaskType = TMQ_DELAYED_TASK__COMMIT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
719 720 721
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
722 723 724
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
S
Shengliang Guan 已提交
725
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
726 727 728 729 730
    *pTaskType = TMQ_DELAYED_TASK__REPORT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
731 732
}

733
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
dengyihao's avatar
dengyihao 已提交
734 735 736 737
  if (pMsg) {
    taosMemoryFree(pMsg->pData);
    taosMemoryFree(pMsg->pEpSet);
  }
738 739 740 741
  return 0;
}

void tmqSendHbReq(void* param, void* tmrId) {
742 743 744
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq == NULL) {
L
Liu Jicong 已提交
745
    taosMemoryFree(param);
746 747
    return;
  }
D
dapan1121 已提交
748 749 750 751 752

  SMqHbReq req = {0};
  req.consumerId = tmq->consumerId;
  req.epoch = tmq->epoch;

L
Liu Jicong 已提交
753
  int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
D
dapan1121 已提交
754 755 756 757
  if (tlen < 0) {
    tscError("tSerializeSMqHbReq failed");
    return;
  }
L
Liu Jicong 已提交
758
  void* pReq = taosMemoryCalloc(1, tlen);
D
dapan1121 已提交
759 760 761 762 763 764 765 766 767
  if (tlen < 0) {
    tscError("failed to malloc MqHbReq msg, size:%d", tlen);
    return;
  }
  if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) {
    tscError("tSerializeSMqHbReq %d failed", tlen);
    taosMemoryFree(pReq);
    return;
  }
768 769 770 771

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
772
    goto OVER;
773 774 775
  }
  sendInfo->msgInfo = (SDataBuf){
      .pData = pReq,
D
dapan1121 已提交
776
      .len = tlen,
777 778 779 780 781 782 783
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = NULL;
  sendInfo->fp = tmqHbCb;
L
Liu Jicong 已提交
784
  sendInfo->msgType = TDMT_MND_TMQ_HB;
785 786 787 788 789 790 791

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

OVER:
792
  taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer);
793 794
}

L
Liu Jicong 已提交
795 796 797 798 799 800 801 802
int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
  STaosQall* qall = taosAllocateQall();
  taosReadAllQitems(tmq->delayedTask, qall);
  while (1) {
    int8_t* pTaskType = NULL;
    taosGetQitem(qall, (void**)&pTaskType);
    if (pTaskType == NULL) break;

803
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
L
Liu Jicong 已提交
804
      tmqAskEp(tmq, true);
805 806 807 808 809

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
      *pRefId = tmq->refId;

      taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &tmq->epTimer);
L
Liu Jicong 已提交
810
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
L
Liu Jicong 已提交
811
      tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
812 813 814 815 816

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
      *pRefId = tmq->refId;

      taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId, tmqMgmt.timer, &tmq->commitTimer);
L
Liu Jicong 已提交
817 818
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    } else {
819
      ASSERT(0);
L
Liu Jicong 已提交
820
    }
L
Liu Jicong 已提交
821
    taosFreeQitem(pTaskType);
L
Liu Jicong 已提交
822 823 824 825 826
  }
  taosFreeQall(qall);
  return 0;
}

L
Liu Jicong 已提交
827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853
static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
    // do nothing
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
    SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
    tDeleteSMqAskEpRsp(&pEpRspWrapper->msg);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
    taosArrayDestroyP(pRsp->dataRsp.blockData, taosMemoryFree);
    taosArrayDestroy(pRsp->dataRsp.blockDataLen);
    taosArrayDestroyP(pRsp->dataRsp.blockTbName, taosMemoryFree);
    taosArrayDestroyP(pRsp->dataRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
    taosMemoryFree(pRsp->metaRsp.metaRsp);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
    taosArrayDestroyP(pRsp->taosxRsp.blockData, taosMemoryFree);
    taosArrayDestroy(pRsp->taosxRsp.blockDataLen);
    taosArrayDestroyP(pRsp->taosxRsp.blockTbName, taosMemoryFree);
    taosArrayDestroyP(pRsp->taosxRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
    // taosx
    taosArrayDestroy(pRsp->taosxRsp.createTableLen);
    taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree);
  }
}

L
Liu Jicong 已提交
854
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
855
  SMqRspWrapper* rspWrapper = NULL;
L
Liu Jicong 已提交
856
  while (1) {
L
Liu Jicong 已提交
857 858 859 860 861
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper) {
      tmqFreeRspWrapper(rspWrapper);
      taosFreeQitem(rspWrapper);
    } else {
L
Liu Jicong 已提交
862
      break;
L
Liu Jicong 已提交
863
    }
L
Liu Jicong 已提交
864 865
  }

L
Liu Jicong 已提交
866
  rspWrapper = NULL;
L
Liu Jicong 已提交
867 868
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
L
Liu Jicong 已提交
869 870 871 872 873
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper) {
      tmqFreeRspWrapper(rspWrapper);
      taosFreeQitem(rspWrapper);
    } else {
L
Liu Jicong 已提交
874
      break;
L
Liu Jicong 已提交
875
    }
L
Liu Jicong 已提交
876 877 878
  }
}

D
dapan1121 已提交
879
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
880 881
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
dengyihao's avatar
dengyihao 已提交
882 883

  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
884 885 886
  tsem_post(&pParam->rspSem);
  return 0;
}
887

L
Liu Jicong 已提交
888
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
X
Xiaoyu Wang 已提交
889 890 891 892
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
893
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
894
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
895
  }
L
Liu Jicong 已提交
896
  return 0;
X
Xiaoyu Wang 已提交
897 898
}

L
Liu Jicong 已提交
899
int32_t tmq_unsubscribe(tmq_t* tmq) {
L
Liu Jicong 已提交
900 901
  int32_t     rsp;
  int32_t     retryCnt = 0;
L
Liu Jicong 已提交
902
  tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
903 904 905 906 907 908 909 910 911 912
  while (1) {
    rsp = tmq_subscribe(tmq, lst);
    if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
      break;
    } else {
      retryCnt++;
      taosMsleep(500);
    }
  }

L
Liu Jicong 已提交
913 914
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
915 916
}

917 918
void tmqFreeImpl(void* handle) {
  tmq_t* tmq = (tmq_t*)handle;
L
Liu Jicong 已提交
919

920
  // TODO stop timer
L
Liu Jicong 已提交
921 922 923 924
  if (tmq->mqueue) {
    tmqClearUnhandleMsg(tmq);
    taosCloseQueue(tmq->mqueue);
  }
925
  if (tmq->delayedTask) taosCloseQueue(tmq->delayedTask);
L
Liu Jicong 已提交
926
  taosFreeQall(tmq->qall);
L
Liu Jicong 已提交
927

928
  tsem_destroy(&tmq->rspSem);
L
Liu Jicong 已提交
929

930 931 932
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
933
    taosMemoryFreeClear(pTopic->schema.pSchema);
934 935 936 937 938
    taosArrayDestroy(pTopic->vgs);
  }
  taosArrayDestroy(tmq->clientTopics);
  taos_close_internal(tmq->pTscObj);
  taosMemoryFree(tmq);
L
Liu Jicong 已提交
939 940
}

941 942 943 944 945 946 947 948 949 950 951 952 953 954
static void tmqMgmtInit(void) {
  tmqInitRes = 0;
  tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");

  if (tmqMgmt.timer == NULL) {
    tmqInitRes = TSDB_CODE_OUT_OF_MEMORY;
  }

  tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
  if (tmqMgmt.rsetId != 0) {
    tmqInitRes = terrno;
  }
}

L
Liu Jicong 已提交
955
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
956 957 958 959
  taosThreadOnce(&tmqInit, tmqMgmtInit);
  if (tmqInitRes != 0) {
    terrno = tmqInitRes;
    return NULL;
L
Liu Jicong 已提交
960 961
  }

L
Liu Jicong 已提交
962 963
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
L
Liu Jicong 已提交
964
    terrno = TSDB_CODE_OUT_OF_MEMORY;
965
    tscError("failed to create consumer, consumer group %s, code:%s", conf->groupId, terrstr());
L
Liu Jicong 已提交
966 967
    return NULL;
  }
L
Liu Jicong 已提交
968

L
Liu Jicong 已提交
969 970 971
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

972
  ASSERT(conf->groupId[0]);
L
Liu Jicong 已提交
973

L
Liu Jicong 已提交
974 975 976 977 978 979
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
  pTmq->delayedTask = taosOpenQueue();

  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
L
Liu Jicong 已提交
980
    terrno = TSDB_CODE_OUT_OF_MEMORY;
981
    tscError("consumer:0x%" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
S
Shengliang Guan 已提交
982
             pTmq->groupId);
L
Liu Jicong 已提交
983 984
    goto FAIL;
  }
L
Liu Jicong 已提交
985

L
Liu Jicong 已提交
986 987
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
988 989
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
990 991
  /*pTmq->epStatus = 0;*/
  /*pTmq->epSkipCnt = 0;*/
L
Liu Jicong 已提交
992

L
Liu Jicong 已提交
993 994 995
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
996
  pTmq->withTbName = conf->withTbName;
L
Liu Jicong 已提交
997
  pTmq->useSnapshot = conf->snapEnable;
L
Liu Jicong 已提交
998
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
999
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
1000 1001
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
1002 1003
  pTmq->resetOffsetCfg = conf->resetOffset;

1004 1005
  pTmq->hbBgEnable = conf->hbBgEnable;

L
Liu Jicong 已提交
1006
  // assign consumerId
L
Liu Jicong 已提交
1007
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
1008

L
Liu Jicong 已提交
1009 1010
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
1011
    tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
S
Shengliang Guan 已提交
1012
             pTmq->groupId);
L
Liu Jicong 已提交
1013 1014
    goto FAIL;
  }
L
Liu Jicong 已提交
1015

L
Liu Jicong 已提交
1016 1017 1018
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
1019
    tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
S
Shengliang Guan 已提交
1020
             pTmq->groupId);
L
Liu Jicong 已提交
1021 1022 1023
    tsem_destroy(&pTmq->rspSem);
    goto FAIL;
  }
L
Liu Jicong 已提交
1024

1025 1026 1027 1028 1029 1030
  pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
  if (pTmq->refId < 0) {
    tmqFreeImpl(pTmq);
    return NULL;
  }

1031
  if (pTmq->hbBgEnable) {
L
Liu Jicong 已提交
1032 1033
    int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
    *pRefId = pTmq->refId;
1034
    pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
1035 1036
  }

1037
  tscInfo("consumer:0x%" PRIx64 " is setup, consumer groupId %s", pTmq->consumerId, pTmq->groupId);
1038
  return pTmq;
L
Liu Jicong 已提交
1039 1040 1041 1042 1043 1044 1045

FAIL:
  if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
  if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
  if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
  if (pTmq->qall) taosFreeQall(pTmq->qall);
  taosMemoryFree(pTmq);
1046

L
Liu Jicong 已提交
1047
  return NULL;
1048 1049
}

L
Liu Jicong 已提交
1050
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
L
Liu Jicong 已提交
1051 1052 1053
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
L
Liu Jicong 已提交
1054
  SMsgSendInfo*   sendInfo = NULL;
L
Liu Jicong 已提交
1055
  SCMSubscribeReq req = {0};
1056
  int32_t         code = 0;
1057

1058
  tscDebug("consumer:0x%"PRIx64", tmq subscribe start, numOfTopic %d", tmq->consumerId, sz);
L
Liu Jicong 已提交
1059

1060
  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
1061
  tstrncpy(req.clientId, tmq->clientId, 256);
L
Liu Jicong 已提交
1062
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
1063 1064
  req.topicNames = taosArrayInit(sz, sizeof(void*));

1065 1066 1067 1068
  if (req.topicNames == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
1069

L
Liu Jicong 已提交
1070 1071
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
1072 1073

    SName name = {0};
L
Liu Jicong 已提交
1074 1075 1076 1077
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
1078 1079
    }

1080 1081
    tNameExtractFullName(&name, topicFName);
    tscDebug("consumer:0x%"PRIx64", subscribe topic: %s", tmq->consumerId, topicFName);
L
Liu Jicong 已提交
1082 1083

    taosArrayPush(req.topicNames, &topicFName);
1084 1085
  }

L
Liu Jicong 已提交
1086
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
1087

L
Liu Jicong 已提交
1088
  buf = taosMemoryMalloc(tlen);
1089 1090 1091 1092
  if (buf == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
L
Liu Jicong 已提交
1093

1094 1095 1096
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

L
Liu Jicong 已提交
1097
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
1098 1099 1100 1101
  if (sendInfo == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
1102

X
Xiaoyu Wang 已提交
1103
  SMqSubscribeCbParam param = {
L
Liu Jicong 已提交
1104
      .rspErr = 0,
1105 1106
      .refId = tmq->refId,
      .epoch = tmq->epoch,
X
Xiaoyu Wang 已提交
1107
  };
L
Liu Jicong 已提交
1108

1109 1110 1111
  if (tsem_init(&param.rspSem, 0, 0) != 0) {
    goto FAIL;
  }
L
Liu Jicong 已提交
1112 1113

  sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1114 1115 1116 1117
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
1118

L
Liu Jicong 已提交
1119 1120
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1121 1122
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
1123
  sendInfo->msgType = TDMT_MND_TMQ_SUBSCRIBE;
L
Liu Jicong 已提交
1124

1125 1126 1127 1128 1129
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
1130 1131
  // avoid double free if msg is sent
  buf = NULL;
L
Liu Jicong 已提交
1132
  sendInfo = NULL;
L
Liu Jicong 已提交
1133

L
Liu Jicong 已提交
1134 1135
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
1136

1137 1138 1139 1140
  if (param.rspErr != 0) {
    code = param.rspErr;
    goto FAIL;
  }
L
Liu Jicong 已提交
1141

L
Liu Jicong 已提交
1142
  int32_t retryCnt = 0;
L
Liu Jicong 已提交
1143
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
L
Liu Jicong 已提交
1144 1145 1146
    if (retryCnt++ > 10) {
      goto FAIL;
    }
1147 1148

    tscDebug("consumer:0x%"PRIx64", mnd not ready for subscribe, retry count:%d in 500ms", tmq->consumerId, retryCnt);
L
Liu Jicong 已提交
1149 1150
    taosMsleep(500);
  }
1151

1152 1153
  // init ep timer
  if (tmq->epTimer == NULL) {
1154 1155 1156
    int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId1 = tmq->refId;
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, pRefId1, tmqMgmt.timer);
1157
  }
L
Liu Jicong 已提交
1158 1159

  // init auto commit timer
1160
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
1161 1162 1163
    int64_t* pRefId2 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId2 = tmq->refId;
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId2, tmqMgmt.timer);
L
Liu Jicong 已提交
1164 1165
  }

L
Liu Jicong 已提交
1166
FAIL:
L
Liu Jicong 已提交
1167
  taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
1168
  taosMemoryFree(buf);
L
Liu Jicong 已提交
1169
  taosMemoryFree(sendInfo);
L
Liu Jicong 已提交
1170

L
Liu Jicong 已提交
1171
  return code;
1172 1173
}

L
Liu Jicong 已提交
1174
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
1175
  //
1176
  conf->commitCb = cb;
L
Liu Jicong 已提交
1177
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
1178
}
1179

D
dapan1121 已提交
1180
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1181 1182
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
1183
  SMqClientTopic* pTopic = pParam->pTopic;
1184 1185 1186 1187 1188 1189

  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
  if (tmq == NULL) {
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1190
    taosMemoryFree(pMsg->pEpSet);
1191 1192 1193 1194 1195 1196
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  int32_t epoch = pParam->epoch;
  int32_t vgId = pParam->vgId;
L
Liu Jicong 已提交
1197
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
1198
  if (code != 0) {
L
Liu Jicong 已提交
1199
    tscWarn("msg discard from vgId:%d, epoch %d, since %s", vgId, epoch, terrstr());
L
Liu Jicong 已提交
1200
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1201 1202
    if (pMsg->pEpSet) taosMemoryFree(pMsg->pEpSet);

L
Liu Jicong 已提交
1203 1204 1205 1206
    if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
      goto CREATE_MSG_FAIL;
    }
L
Liu Jicong 已提交
1207
    if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
S
Shengliang Guan 已提交
1208
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1209
      if (pRspWrapper == NULL) {
S
Shengliang Guan 已提交
1210
        tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
Liu Jicong 已提交
1211 1212 1213 1214 1215 1216 1217 1218
        goto CREATE_MSG_FAIL;
      }
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      /*pRspWrapper->vgHandle = pVg;*/
      /*pRspWrapper->topicHandle = pTopic;*/
      taosWriteQitem(tmq->mqueue, pRspWrapper);
      tsem_post(&tmq->rspSem);
    }
L
fix txn  
Liu Jicong 已提交
1219
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1220 1221
  }

X
Xiaoyu Wang 已提交
1222 1223 1224
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
1225
    // do not write into queue since updating epoch reset
S
Shengliang Guan 已提交
1226
    tscWarn("msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d", vgId, msgEpoch,
L
Liu Jicong 已提交
1227
            tmqEpoch);
1228
    tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1229
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1230
    taosMemoryFree(pMsg->pEpSet);
X
Xiaoyu Wang 已提交
1231 1232 1233 1234
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
S
Shengliang Guan 已提交
1235
    tscWarn("mismatch rsp from vgId:%d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
X
Xiaoyu Wang 已提交
1236 1237
  }

L
Liu Jicong 已提交
1238 1239 1240
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

S
Shengliang Guan 已提交
1241
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1242
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1243
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1244
    taosMemoryFree(pMsg->pEpSet);
S
Shengliang Guan 已提交
1245
    tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
fix txn  
Liu Jicong 已提交
1246
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1247
  }
L
Liu Jicong 已提交
1248

L
Liu Jicong 已提交
1249
  pRspWrapper->tmqRspType = rspType;
L
Liu Jicong 已提交
1250 1251
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
1252

L
Liu Jicong 已提交
1253
  if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1254 1255 1256
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
wmmhello's avatar
wmmhello 已提交
1257
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1258
    memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1259 1260 1261 1262 1263 1264

    tscDebug("consumer:%" PRId64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d",
             tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version,
             rspType);

  } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
1265 1266 1267 1268
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqMetaRsp(&decoder, &pRspWrapper->metaRsp);
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1269
    memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1270 1271 1272 1273 1274 1275 1276
  } else if (rspType == TMQ_MSG_TYPE__TAOSX_RSP) {
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp);
    tDecoderClear(&decoder);
    memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead));
  } else {
1277
    ASSERT(0);
L
Liu Jicong 已提交
1278
  }
L
Liu Jicong 已提交
1279

L
Liu Jicong 已提交
1280
  taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1281
  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
1282

L
Liu Jicong 已提交
1283 1284
  tscDebug("consumer:%" PRId64 ", put poll res into mqueue %p", tmq->consumerId, pRspWrapper);

L
Liu Jicong 已提交
1285
  taosWriteQitem(tmq->mqueue, pRspWrapper);
1286
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1287

L
Liu Jicong 已提交
1288
  return 0;
L
fix txn  
Liu Jicong 已提交
1289
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1290
  if (epoch == tmq->epoch) {
L
Liu Jicong 已提交
1291 1292
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
1293
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1294
  return -1;
1295 1296
}

L
Liu Jicong 已提交
1297
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
1298 1299 1300 1301
  bool set = false;

  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
S
Shengliang Guan 已提交
1302
  tscDebug("consumer:%" PRId64 ", update ep epoch %d to epoch %d, topic num:%d", tmq->consumerId, tmq->epoch, epoch,
1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320
           topicNumGet);

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
S
Shengliang Guan 已提交
1321
      tscDebug("consumer:%" PRId64 ", new vg num: %d", tmq->consumerId, vgNumCur);
1322 1323 1324
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
        sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
L
Liu Jicong 已提交
1325
        char buf[80];
L
Liu Jicong 已提交
1326
        tFormatOffset(buf, 80, &pVgCur->currentOffset);
L
Liu Jicong 已提交
1327 1328
        tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch,
                 pVgCur->vgId, vgKey, buf);
L
Liu Jicong 已提交
1329
        taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal));
1330 1331 1332 1333 1334 1335 1336 1337
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
    topic.schema = pTopicEp->schema;
L
Liu Jicong 已提交
1338 1339
    pTopicEp->schema.nCols = 0;
    pTopicEp->schema.pSchema = NULL;
1340
    tstrncpy(topic.topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
1341 1342
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);

S
Shengliang Guan 已提交
1343
    tscDebug("consumer:%" PRId64 ", update topic: %s", tmq->consumerId, topic.topicName);
1344 1345 1346 1347 1348 1349

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
L
Liu Jicong 已提交
1350 1351
      STqOffsetVal* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      STqOffsetVal  offsetNew = {.type = tmq->resetOffsetCfg};
1352
      if (pOffset != NULL) {
L
Liu Jicong 已提交
1353
        offsetNew = *pOffset;
1354 1355 1356 1357
      }

      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1358
          .currentOffset = offsetNew,
1359 1360 1361 1362 1363 1364 1365 1366 1367 1368
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
          .vgSkipCnt = 0,
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
    taosArrayPush(newTopics, &topic);
  }
1369 1370 1371 1372
  if (tmq->clientTopics) {
    int32_t sz = taosArrayGetSize(tmq->clientTopics);
    for (int32_t i = 0; i < sz; i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
1373
      if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
1374
      taosArrayDestroy(pTopic->vgs);
L
Liu Jicong 已提交
1375
    }
1376
    taosArrayDestroy(tmq->clientTopics);
X
Xiaoyu Wang 已提交
1377
  }
L
Liu Jicong 已提交
1378
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
1379
  tmq->clientTopics = newTopics;
1380

1381 1382 1383 1384
  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
1385

X
Xiaoyu Wang 已提交
1386 1387 1388 1389
  atomic_store_32(&tmq->epoch, epoch);
  return set;
}

D
dapan1121 已提交
1390
int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
1391
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1392
  int8_t           async = pParam->async;
1393 1394 1395 1396 1397 1398 1399 1400 1401
  tmq_t*           tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);

  if (tmq == NULL) {
    if (!async) {
      tsem_destroy(&pParam->rspSem);
    } else {
      taosMemoryFree(pParam);
    }
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1402
    taosMemoryFree(pMsg->pEpSet);
1403 1404 1405 1406
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

L
Liu Jicong 已提交
1407
  pParam->code = code;
1408
  if (code != 0) {
L
Liu Jicong 已提交
1409 1410
    tscError("consumer:%" PRId64 ", get topic endpoint error, not ready, wait:%d, code %x", tmq->consumerId,
             pParam->async, code);
L
Liu Jicong 已提交
1411
    goto END;
1412
  }
L
Liu Jicong 已提交
1413

L
Liu Jicong 已提交
1414
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1415
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1416
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1417 1418
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
S
Shengliang Guan 已提交
1419
  tscDebug("consumer:%" PRId64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1420 1421
  if (head->epoch <= epoch) {
    goto END;
1422
  }
L
Liu Jicong 已提交
1423

L
Liu Jicong 已提交
1424
  if (!async) {
L
Liu Jicong 已提交
1425 1426
    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
S
Shengliang Guan 已提交
1427 1428
    /*printf("rsp epoch %" PRId64 " sz %" PRId64 "\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %" PRId64 " sz %" PRId64 "\n", tmq->epoch, tmq->clientTopics->size);*/
L
Liu Jicong 已提交
1429
    tmqUpdateEp(tmq, head->epoch, &rsp);
L
Liu Jicong 已提交
1430
    tDeleteSMqAskEpRsp(&rsp);
X
Xiaoyu Wang 已提交
1431
  } else {
S
Shengliang Guan 已提交
1432
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1433
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1434
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1435 1436
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1437
    }
L
Liu Jicong 已提交
1438 1439 1440
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1441
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1442

L
Liu Jicong 已提交
1443
    taosWriteQitem(tmq->mqueue, pWrapper);
1444
    tsem_post(&tmq->rspSem);
1445
  }
L
Liu Jicong 已提交
1446 1447

END:
L
Liu Jicong 已提交
1448
  /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1449
  if (!async) {
L
Liu Jicong 已提交
1450
    tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
1451 1452
  } else {
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
1453
  }
dengyihao's avatar
dengyihao 已提交
1454 1455

  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
1456
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1457
  return code;
1458 1459
}

L
Liu Jicong 已提交
1460
int32_t tmqAskEp(tmq_t* tmq, bool async) {
1461
  int32_t code = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1462
#if 0
L
Liu Jicong 已提交
1463
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
L
fix txn  
Liu Jicong 已提交
1464
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
1465
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
S
Shengliang Guan 已提交
1466
    tscTrace("consumer:%" PRId64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
1467
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
1468
  }
L
temp  
Liu Jicong 已提交
1469
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
1470
#endif
1471

D
dapan1121 已提交
1472 1473 1474 1475 1476
  SMqAskEpReq req = {0};
  req.consumerId = tmq->consumerId;
  req.epoch = tmq->epoch;
  strcpy(req.cgroup, tmq->groupId);

L
Liu Jicong 已提交
1477
  int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
D
dapan1121 已提交
1478
  if (tlen < 0) {
1479
    tscError("consumer:0x%"PRIx64", tSerializeSMqAskEpReq failed", tmq->consumerId);
D
dapan1121 已提交
1480 1481
    return -1;
  }
1482

L
Liu Jicong 已提交
1483
  void* pReq = taosMemoryCalloc(1, tlen);
L
Liu Jicong 已提交
1484
  if (pReq == NULL) {
1485 1486
    tscError("consumer:0x%"PRIx64", failed to malloc askEpReq msg, size:%d", tmq->consumerId, tlen);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
1487 1488
    return -1;
  }
1489

D
dapan1121 已提交
1490
  if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
1491
    tscError("consumer:0x%"PRIx64", tSerializeSMqAskEpReq %d failed", tmq->consumerId, tlen);
D
dapan1121 已提交
1492
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
1493
    return -1;
L
Liu Jicong 已提交
1494
  }
1495

L
Liu Jicong 已提交
1496
  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
1497
  if (pParam == NULL) {
1498
    tscError("consumer:0x%"PRIx64", failed to malloc subscribe param", tmq->consumerId);
D
dapan1121 已提交
1499
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
1500
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1501
    return -1;
L
Liu Jicong 已提交
1502
  }
1503

1504 1505
  pParam->refId = tmq->refId;
  pParam->epoch = tmq->epoch;
L
Liu Jicong 已提交
1506
  pParam->async = async;
X
Xiaoyu Wang 已提交
1507
  tsem_init(&pParam->rspSem, 0, 0);
1508

1509
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1510 1511
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1512
    taosMemoryFree(pParam);
D
dapan1121 已提交
1513
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
1514
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1515 1516 1517 1518
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
D
dapan1121 已提交
1519
      .pData = pReq,
L
Liu Jicong 已提交
1520 1521 1522 1523 1524
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
L
Liu Jicong 已提交
1525 1526 1527
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1528
  sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
1529

L
Liu Jicong 已提交
1530
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
1531
  tscDebug("consumer:0x%" PRIx64 ", ask ep from mnode", tmq->consumerId);
L
add log  
Liu Jicong 已提交
1532

L
Liu Jicong 已提交
1533 1534
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1535

L
Liu Jicong 已提交
1536
  if (!async) {
L
Liu Jicong 已提交
1537 1538 1539 1540
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }
1541

L
Liu Jicong 已提交
1542
  return code;
1543 1544
}

L
Liu Jicong 已提交
1545
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1546 1547 1548
  /*strcpy(pReq->topic, pTopic->topicName);*/
  /*strcpy(pReq->cgroup, tmq->groupId);*/

L
Liu Jicong 已提交
1549 1550 1551 1552
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1553

1554
  pReq->withTbName = tmq->withTbName;
1555
  pReq->timeout = timeout;
L
Liu Jicong 已提交
1556
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1557
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1558
  /*pReq->currentOffset = reqOffset;*/
L
Liu Jicong 已提交
1559
  pReq->reqOffset = pVg->currentOffset;
L
Liu Jicong 已提交
1560
  pReq->reqId = generateRequestId();
1561

L
Liu Jicong 已提交
1562 1563
  pReq->useSnapshot = tmq->useSnapshot;

D
dapan1121 已提交
1564
  pReq->head.vgId = pVg->vgId;
1565 1566
}

L
Liu Jicong 已提交
1567 1568
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
L
Liu Jicong 已提交
1569
  pRspObj->resType = RES_TYPE__TMQ_META;
L
Liu Jicong 已提交
1570 1571 1572 1573 1574 1575 1576 1577
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

L
Liu Jicong 已提交
1578 1579 1580
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1581 1582
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1583
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1584
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1585
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
L
Liu Jicong 已提交
1586

L
Liu Jicong 已提交
1587 1588
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
L
Liu Jicong 已提交
1589
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1590 1591
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1592

L
Liu Jicong 已提交
1593
  return pRspObj;
X
Xiaoyu Wang 已提交
1594 1595
}

L
Liu Jicong 已提交
1596 1597
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
1598
  pRspObj->resType = RES_TYPE__TMQ_METADATA;
L
Liu Jicong 已提交
1599 1600 1601 1602
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;
  pRspObj->resIter = -1;
1603
  memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp));
L
Liu Jicong 已提交
1604 1605 1606

  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
1607
  if (!pWrapper->taosxRsp.withSchema) {
L
Liu Jicong 已提交
1608 1609 1610 1611 1612 1613
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }

  return pRspObj;
}

1614
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
X
Xiaoyu Wang 已提交
1615 1616 1617 1618 1619 1620
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
      if (vgStatus != TMQ_VG_STATUS__IDLE) {
L
Liu Jicong 已提交
1621
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
L
Liu Jicong 已提交
1622 1623
        tscTrace("consumer:%" PRId64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId,
                 vgSkipCnt);
X
Xiaoyu Wang 已提交
1624
        continue;
L
Liu Jicong 已提交
1625
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1626 1627 1628 1629
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
S
Shengliang Guan 已提交
1630
        tscDebug("consumer:%" PRId64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId);
L
temp  
Liu Jicong 已提交
1631 1632
        }
#endif
X
Xiaoyu Wang 已提交
1633
      }
L
Liu Jicong 已提交
1634
      atomic_store_32(&pVg->vgSkipCnt, 0);
D
dapan1121 已提交
1635 1636 1637 1638 1639 1640 1641 1642 1643

      SMqPollReq req = {0};
      tmqBuildConsumeReqImpl(&req, tmq, timeout, pTopic, pVg);
      int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
      if (msgSize < 0) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        tsem_post(&tmq->rspSem);
        return -1;
      }
L
Liu Jicong 已提交
1644
      char* msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
1645 1646 1647 1648 1649
      if (NULL == msg) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        tsem_post(&tmq->rspSem);
        return -1;
      }
L
Liu Jicong 已提交
1650

D
dapan1121 已提交
1651 1652
      if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
        taosMemoryFree(msg);
X
Xiaoyu Wang 已提交
1653
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1654
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1655 1656
        return -1;
      }
L
Liu Jicong 已提交
1657

wafwerar's avatar
wafwerar 已提交
1658
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1659
      if (pParam == NULL) {
D
dapan1121 已提交
1660
        taosMemoryFree(msg);
X
Xiaoyu Wang 已提交
1661
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1662
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1663 1664
        return -1;
      }
1665 1666 1667
      pParam->refId = tmq->refId;
      pParam->epoch = tmq->epoch;

L
Liu Jicong 已提交
1668
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1669
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1670
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1671

1672
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1673
      if (sendInfo == NULL) {
D
dapan1121 已提交
1674
        taosMemoryFree(msg);
wafwerar's avatar
wafwerar 已提交
1675
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1676
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1677
        tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1678 1679 1680 1681
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
D
dapan1121 已提交
1682 1683
          .pData = msg,
          .len = msgSize,
X
Xiaoyu Wang 已提交
1684 1685
          .handle = NULL,
      };
D
dapan1121 已提交
1686
      sendInfo->requestId = req.reqId;
X
Xiaoyu Wang 已提交
1687
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1688
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1689
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1690
      sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
X
Xiaoyu Wang 已提交
1691 1692

      int64_t transporterId = 0;
L
fix  
Liu Jicong 已提交
1693
      /*printf("send poll\n");*/
L
Liu Jicong 已提交
1694

1695
      char offsetFormatBuf[80];
L
Liu Jicong 已提交
1696
      tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffset);
L
Liu Jicong 已提交
1697
      tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64,
D
dapan1121 已提交
1698
               tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
S
Shengliang Guan 已提交
1699
      /*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/
X
Xiaoyu Wang 已提交
1700 1701 1702 1703 1704 1705 1706 1707
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
  return 0;
}

L
Liu Jicong 已提交
1708 1709
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1710
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1711 1712
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1713
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
L
Liu Jicong 已提交
1714
      tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1715
      /*tmqClearUnhandleMsg(tmq);*/
L
Liu Jicong 已提交
1716
      tDeleteSMqAskEpRsp(rspMsg);
X
Xiaoyu Wang 已提交
1717 1718
      *pReset = true;
    } else {
L
Liu Jicong 已提交
1719
      tmqFreeRspWrapper(rspWrapper);
X
Xiaoyu Wang 已提交
1720 1721 1722 1723 1724 1725 1726 1727
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
1728
void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
X
Xiaoyu Wang 已提交
1729
  while (1) {
L
Liu Jicong 已提交
1730 1731 1732
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1733
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1734
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
L
Liu Jicong 已提交
1735 1736

      if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1737
        /*tscDebug("consumer %" PRId64 " mqueue empty", tmq->consumerId);*/
L
Liu Jicong 已提交
1738 1739
        return NULL;
      }
X
Xiaoyu Wang 已提交
1740 1741
    }

L
Liu Jicong 已提交
1742 1743
    tscDebug("consumer:%" PRId64 " handle rsp %p", tmq->consumerId, rspWrapper);

L
Liu Jicong 已提交
1744 1745 1746 1747 1748
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(rspWrapper);
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
      return NULL;
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1749
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
1750
      tscDebug("consumer %" PRId64 " actual process poll rsp", tmq->consumerId);
L
Liu Jicong 已提交
1751
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
1752
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1753
      if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1754
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1755
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1756
         * rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1757
        pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;
X
Xiaoyu Wang 已提交
1758
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1759
        if (pollRspWrapper->dataRsp.blockNum == 0) {
L
Liu Jicong 已提交
1760 1761
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1762 1763
          continue;
        }
L
Liu Jicong 已提交
1764
        // build rsp
L
Liu Jicong 已提交
1765
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1766
        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1767
        return pRsp;
X
Xiaoyu Wang 已提交
1768
      } else {
1769
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
L
Liu Jicong 已提交
1770
                 pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1771
        tmqFreeRspWrapper(rspWrapper);
L
Liu Jicong 已提交
1772 1773 1774 1775 1776
        taosFreeQitem(pollRspWrapper);
      }
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1777
      if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1778
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1779
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1780
         * rspMsg->msg.rspOffset);*/
wmmhello's avatar
wmmhello 已提交
1781
        pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset;
L
Liu Jicong 已提交
1782 1783
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // build rsp
L
Liu Jicong 已提交
1784
        SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1785 1786 1787
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
1788
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
L
Liu Jicong 已提交
1789
                 pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1790
        tmqFreeRspWrapper(rspWrapper);
L
Liu Jicong 已提交
1791
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1792
      }
L
Liu Jicong 已提交
1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
      if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
         * rspMsg->msg.rspOffset);*/
        pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset;
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        if (pollRspWrapper->taosxRsp.blockNum == 0) {
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
          continue;
        }
wmmhello's avatar
wmmhello 已提交
1808

L
Liu Jicong 已提交
1809
        // build rsp
wmmhello's avatar
wmmhello 已提交
1810
        void* pRsp = NULL;
L
Liu Jicong 已提交
1811
        if (pollRspWrapper->taosxRsp.createTableNum == 0) {
wmmhello's avatar
wmmhello 已提交
1812
          pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1813
        } else {
wmmhello's avatar
wmmhello 已提交
1814 1815
          pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper);
        }
L
Liu Jicong 已提交
1816 1817 1818
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
L
Liu Jicong 已提交
1819
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
L
Liu Jicong 已提交
1820
                 pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1821
        tmqFreeRspWrapper(rspWrapper);
L
Liu Jicong 已提交
1822 1823
        taosFreeQitem(pollRspWrapper);
      }
X
Xiaoyu Wang 已提交
1824
    } else {
L
fix  
Liu Jicong 已提交
1825
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1826
      bool reset = false;
L
Liu Jicong 已提交
1827 1828
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1829
      if (pollIfReset && reset) {
S
Shengliang Guan 已提交
1830
        tscDebug("consumer:%" PRId64 ", reset and repoll", tmq->consumerId);
1831
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
1832 1833 1834 1835 1836
      }
    }
  }
}

1837
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1838 1839
  void*   rspObj;
  int64_t startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1840

L
Liu Jicong 已提交
1841 1842
  tscDebug("consumer:%" PRId64 ", start poll at %" PRId64, tmq->consumerId, startTime);

1843 1844 1845
#if 0
  tmqHandleAllDelayedTask(tmq);
  tmqPollImpl(tmq, timeout);
1846
  rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1847 1848
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1849
  }
1850
#endif
X
Xiaoyu Wang 已提交
1851

1852
  // in no topic status, delayed task also need to be processed
L
Liu Jicong 已提交
1853
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1854
    tscDebug("consumer:%" PRId64 ", poll return since consumer status is init", tmq->consumerId);
1855 1856 1857
    return NULL;
  }

L
Liu Jicong 已提交
1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
    int32_t retryCnt = 0;
    while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
      if (retryCnt++ > 10) {
        return NULL;
      }
      tscDebug("consumer not ready, retry");
      taosMsleep(500);
    }
  }

X
Xiaoyu Wang 已提交
1869
  while (1) {
L
Liu Jicong 已提交
1870
    tmqHandleAllDelayedTask(tmq);
L
Liu Jicong 已提交
1871
    if (tmqPollImpl(tmq, timeout) < 0) {
L
Liu Jicong 已提交
1872
      tscDebug("consumer:%" PRId64 " return since poll err", tmq->consumerId);
L
Liu Jicong 已提交
1873 1874
      /*return NULL;*/
    }
L
Liu Jicong 已提交
1875

1876
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1877
    if (rspObj) {
1878
      tscDebug("consumer:%" PRId64 ", return rsp %p", tmq->consumerId, rspObj);
L
Liu Jicong 已提交
1879
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
1880
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
L
Liu Jicong 已提交
1881
      tscDebug("consumer:%" PRId64 ", return null since no committed offset", tmq->consumerId);
L
Liu Jicong 已提交
1882
      return NULL;
X
Xiaoyu Wang 已提交
1883
    }
1884
    if (timeout != -1) {
L
Liu Jicong 已提交
1885 1886 1887 1888 1889
      int64_t currentTime = taosGetTimestampMs();
      int64_t passedTime = currentTime - startTime;
      if (passedTime > timeout) {
        tscDebug("consumer:%" PRId64 ", (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
                 tmq->consumerId, tmq->epoch, startTime, currentTime);
X
Xiaoyu Wang 已提交
1890 1891
        return NULL;
      }
L
Liu Jicong 已提交
1892 1893 1894 1895
      /*tscInfo("consumer:%" PRId64 ", (epoch %d) wait, start time %" PRId64 ", current time %" PRId64*/
      /*", left time %" PRId64,*/
      /*tmq->consumerId, tmq->epoch, startTime, currentTime, (timeout - passedTime));*/
      tsem_timewait(&tmq->rspSem, (timeout - passedTime));
L
Liu Jicong 已提交
1896 1897
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
L
Liu Jicong 已提交
1898
      tsem_timewait(&tmq->rspSem, 1000);
X
Xiaoyu Wang 已提交
1899 1900 1901 1902
    }
  }
}

L
Liu Jicong 已提交
1903
int32_t tmq_consumer_close(tmq_t* tmq) {
1904
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
L
Liu Jicong 已提交
1905 1906
    int32_t rsp = tmq_commit_sync(tmq, NULL);
    if (rsp != 0) {
L
Liu Jicong 已提交
1907
      return rsp;
1908 1909
    }

L
Liu Jicong 已提交
1910
    int32_t     retryCnt = 0;
1911
    tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
1912 1913 1914 1915 1916 1917 1918 1919 1920 1921
    while (1) {
      rsp = tmq_subscribe(tmq, lst);
      if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
        break;
      } else {
        retryCnt++;
        taosMsleep(500);
      }
    }

1922
    tmq_list_destroy(lst);
L
Liu Jicong 已提交
1923
  }
1924
  taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
L
Liu Jicong 已提交
1925
  return 0;
1926
}
L
Liu Jicong 已提交
1927

L
Liu Jicong 已提交
1928 1929
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
1930
    return "success";
L
Liu Jicong 已提交
1931
  } else if (err == -1) {
L
Liu Jicong 已提交
1932 1933 1934
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
1935 1936
  }
}
L
Liu Jicong 已提交
1937

L
Liu Jicong 已提交
1938 1939 1940 1941 1942
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
    return TMQ_RES_TABLE_META;
1943 1944
  } else if (TD_RES_TMQ_METADATA(res)) {
    return TMQ_RES_METADATA;
L
Liu Jicong 已提交
1945 1946 1947 1948 1949
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
1950
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
1951 1952
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
1953
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1954 1955 1956
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
1957 1958 1959
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1960 1961 1962 1963 1964
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1965 1966 1967 1968
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1969 1970 1971
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
1972 1973 1974
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1975 1976 1977 1978 1979
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1980 1981 1982 1983
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
1984 1985 1986
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
1987
  } else if (TD_RES_TMQ_METADATA(res)) {
1988 1989
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
1990 1991 1992 1993
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
1994 1995 1996 1997 1998 1999 2000 2001

const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
2002
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
2003 2004
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
L
Liu Jicong 已提交
2005 2006 2007
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
2008
    }
L
Liu Jicong 已提交
2009 2010
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
  }
L
Liu Jicong 已提交
2011 2012
  return NULL;
}
2013

L
Liu Jicong 已提交
2014
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
2015
  //
L
Liu Jicong 已提交
2016
  tmqCommitInner(tmq, msg, 0, 1, cb, param);
L
Liu Jicong 已提交
2017 2018
}

2019 2020
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) {
  //
L
Liu Jicong 已提交
2021
  return tmqCommitInner(tmq, msg, 0, 0, NULL, NULL);
2022
}