clientTmq.c 61.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "cJSON.h"
17 18 19
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22 23
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
24
#include "tqueue.h"
25
#include "tref.h"
L
Liu Jicong 已提交
26 27
#include "ttimer.h"

L
Liu Jicong 已提交
28 29 30 31 32 33 34
#if 0
#undef tsem_post
#define tsem_post(x)                                         \
  tscInfo("call sem post at %s %d", __FUNCTION__, __LINE__); \
  sem_post(x)
#endif

X
Xiaoyu Wang 已提交
35
struct SMqMgmt {
36 37 38
  int8_t  inited;
  tmr_h   timer;
  int32_t rsetId;
39
};
L
Liu Jicong 已提交
40

X
Xiaoyu Wang 已提交
41 42
static TdThreadOnce   tmqInit = PTHREAD_ONCE_INIT;  // initialize only once
volatile int32_t      tmqInitRes = 0;               // initialize rsp code
43
static struct SMqMgmt tmqMgmt = {0};
44

L
Liu Jicong 已提交
45 46 47 48 49 50
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
51 52 53
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
54 55
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
56
struct tmq_list_t {
L
Liu Jicong 已提交
57
  SArray container;
L
Liu Jicong 已提交
58
};
L
Liu Jicong 已提交
59

L
Liu Jicong 已提交
60
struct tmq_conf_t {
61 62 63 64 65
  char    clientId[256];
  char    groupId[TSDB_CGROUP_LEN];
  int8_t  autoCommit;
  int8_t  resetOffset;
  int8_t  withTbName;
L
Liu Jicong 已提交
66 67
  int8_t  snapEnable;
  int32_t snapBatchSize;
68
  bool    hbBgEnable;
69

70 71 72 73 74
  uint16_t       port;
  int32_t        autoCommitInterval;
  char*          ip;
  char*          user;
  char*          pass;
75
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
76
  void*          commitCbUserParam;
L
Liu Jicong 已提交
77 78 79
};

struct tmq_t {
80
  int64_t refId;
L
Liu Jicong 已提交
81
  // conf
82 83 84 85 86 87 88 89 90
  char     groupId[TSDB_CGROUP_LEN];
  char     clientId[256];
  int8_t   withTbName;
  int8_t   useSnapshot;
  int8_t   autoCommit;
  int32_t  autoCommitInterval;
  int32_t  resetOffsetCfg;
  uint64_t consumerId;
  bool     hbBgEnable;
91

L
Liu Jicong 已提交
92 93
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
94 95 96 97

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
98 99
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
100
  int32_t epSkipCnt;
L
Liu Jicong 已提交
101
#endif
L
Liu Jicong 已提交
102 103
  int64_t pollCnt;

L
Liu Jicong 已提交
104
  // timer
105 106
  tmr_h hbLiveTimer;
  tmr_h epTimer;
L
Liu Jicong 已提交
107 108 109
  tmr_h reportTimer;
  tmr_h commitTimer;

L
Liu Jicong 已提交
110 111 112 113
  // connection
  STscObj* pTscObj;

  // container
L
Liu Jicong 已提交
114
  SArray*     clientTopics;  // SArray<SMqClientTopic>
L
Liu Jicong 已提交
115
  STaosQueue* mqueue;        // queue of rsp
L
Liu Jicong 已提交
116
  STaosQall*  qall;
L
Liu Jicong 已提交
117 118 119 120
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit

  // ctl
  tsem_t rspSem;
L
Liu Jicong 已提交
121 122
};

X
Xiaoyu Wang 已提交
123 124 125 126 127 128 129 130
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
131
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
132
  TMQ_CONSUMER_STATUS__RECOVER,
L
Liu Jicong 已提交
133 134
};

L
Liu Jicong 已提交
135
enum {
136
  TMQ_DELAYED_TASK__ASK_EP = 1,
L
Liu Jicong 已提交
137 138 139 140
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

L
Liu Jicong 已提交
141
typedef struct {
142 143 144
  // statistics
  int64_t pollCnt;
  // offset
L
Liu Jicong 已提交
145 146
  STqOffsetVal committedOffset;
  STqOffsetVal currentOffset;
L
Liu Jicong 已提交
147
  // connection info
148
  int32_t vgId;
X
Xiaoyu Wang 已提交
149
  int32_t vgStatus;
L
Liu Jicong 已提交
150
  int32_t vgSkipCnt;
151 152 153
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
154
typedef struct {
155
  // subscribe info
156 157 158
  char           topicName[TSDB_TOPIC_FNAME_LEN];
  char           db[TSDB_DB_FNAME_LEN];
  SArray*        vgs;  // SArray<SMqClientVg>
L
Liu Jicong 已提交
159
  SSchemaWrapper schema;
160 161
} SMqClientTopic;

L
Liu Jicong 已提交
162 163 164 165 166
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
167
  union {
L
Liu Jicong 已提交
168 169
    SMqDataRsp dataRsp;
    SMqMetaRsp metaRsp;
L
Liu Jicong 已提交
170
    STaosxRsp  taosxRsp;
L
Liu Jicong 已提交
171
  };
L
Liu Jicong 已提交
172 173
} SMqPollRspWrapper;

L
Liu Jicong 已提交
174
typedef struct {
175 176
  int64_t refId;
  int32_t epoch;
L
Liu Jicong 已提交
177 178
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
179
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
180

L
Liu Jicong 已提交
181
typedef struct {
182 183
  int64_t refId;
  int32_t epoch;
L
Liu Jicong 已提交
184
  int32_t code;
L
Liu Jicong 已提交
185
  int32_t async;
X
Xiaoyu Wang 已提交
186
  tsem_t  rspSem;
187 188
} SMqAskEpCbParam;

L
Liu Jicong 已提交
189
typedef struct {
190 191
  int64_t         refId;
  int32_t         epoch;
L
Liu Jicong 已提交
192
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
193
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
194
  int32_t         vgId;
L
Liu Jicong 已提交
195
  tsem_t          rspSem;
X
Xiaoyu Wang 已提交
196
} SMqPollCbParam;
197

198
typedef struct {
199 200
  int64_t        refId;
  int32_t        epoch;
L
Liu Jicong 已提交
201 202
  int8_t         automatic;
  int8_t         async;
L
Liu Jicong 已提交
203 204
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
L
Liu Jicong 已提交
205
  int32_t        rspErr;
206
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
207 208 209 210
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
  void*  userParam;
  tsem_t rspSem;
211 212 213 214 215
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
  STqOffset*           pOffset;
L
Liu Jicong 已提交
216 217
  /*char                 topicName[TSDB_TOPIC_FNAME_LEN];*/
  /*int32_t              vgId;*/
218
} SMqCommitCbParam;
219

220 221
static int32_t tmqAskEp(tmq_t* tmq, bool async);

222
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
223
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
224 225 226 227 228
  if (conf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return conf;
  }

229
  conf->withTbName = false;
L
Liu Jicong 已提交
230
  conf->autoCommit = true;
L
Liu Jicong 已提交
231
  conf->autoCommitInterval = 5000;
X
Xiaoyu Wang 已提交
232
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
233
  conf->hbBgEnable = true;
234

235 236 237
  return conf;
}

L
Liu Jicong 已提交
238
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
239 240 241 242 243 244
  if (conf) {
    if (conf->ip) taosMemoryFree(conf->ip);
    if (conf->user) taosMemoryFree(conf->user);
    if (conf->pass) taosMemoryFree(conf->pass);
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
245 246 247
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
248
  if (strcmp(key, "group.id") == 0) {
L
Liu Jicong 已提交
249
    tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN);
L
Liu Jicong 已提交
250
    return TMQ_CONF_OK;
251
  }
L
Liu Jicong 已提交
252

253
  if (strcmp(key, "client.id") == 0) {
L
Liu Jicong 已提交
254
    tstrncpy(conf->clientId, value, 256);
L
Liu Jicong 已提交
255 256
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
257

L
Liu Jicong 已提交
258 259
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
260
      conf->autoCommit = true;
L
Liu Jicong 已提交
261 262
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
263
      conf->autoCommit = false;
L
Liu Jicong 已提交
264 265 266 267
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
268
  }
L
Liu Jicong 已提交
269

L
Liu Jicong 已提交
270 271 272 273 274
  if (strcmp(key, "auto.commit.interval.ms") == 0) {
    conf->autoCommitInterval = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
275 276 277 278 279 280 281 282 283 284 285 286 287 288
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
289

290 291
  if (strcmp(key, "msg.with.table.name") == 0) {
    if (strcmp(value, "true") == 0) {
292
      conf->withTbName = true;
L
Liu Jicong 已提交
293
      return TMQ_CONF_OK;
294
    } else if (strcmp(value, "false") == 0) {
295
      conf->withTbName = false;
L
Liu Jicong 已提交
296
      return TMQ_CONF_OK;
297 298 299 300 301
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
302
  if (strcmp(key, "experimental.snapshot.enable") == 0) {
L
Liu Jicong 已提交
303
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
304
      conf->snapEnable = true;
305 306
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
307
      conf->snapEnable = false;
308 309 310 311 312 313
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
314 315 316 317 318
  if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
    conf->snapBatchSize = atoi(value);
    return TMQ_CONF_OK;
  }

319 320 321
  if (strcmp(key, "enable.heartbeat.background") == 0) {
    if (strcmp(value, "true") == 0) {
      conf->hbBgEnable = true;
L
Liu Jicong 已提交
322 323
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
324
      conf->hbBgEnable = false;
L
Liu Jicong 已提交
325 326 327 328
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
329
    return TMQ_CONF_OK;
L
Liu Jicong 已提交
330 331
  }

L
Liu Jicong 已提交
332
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
333 334 335
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
336
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
337 338 339
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
340
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
341 342 343
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
344
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
345 346 347
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
348
  if (strcmp(key, "td.connect.db") == 0) {
349
    /*conf->db = strdup(value);*/
L
Liu Jicong 已提交
350 351 352
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
353
  return TMQ_CONF_UNKNOWN;
354 355 356
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
357 358
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
359 360
}

L
Liu Jicong 已提交
361 362
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
363 364 365 366 367
  if (src == NULL || src[0] == 0) return -1;
  char* topic = strdup(src);
  if (topic[0] != '`') {
    strtolower(topic, src);
  }
L
fix  
Liu Jicong 已提交
368
  if (taosArrayPush(container, &topic) == NULL) return -1;
369 370 371
  return 0;
}

L
Liu Jicong 已提交
372
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
373
  SArray* container = &list->container;
L
Liu Jicong 已提交
374
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
375 376
}

L
Liu Jicong 已提交
377 378 379 380 381 382 383 384 385 386
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

L
Liu Jicong 已提交
387 388 389 390
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422
int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParamSet->refId);
  if (tmq == NULL) {
    if (!pParamSet->async) {
      tsem_destroy(&pParamSet->rspSem);
    }
    taosMemoryFree(pParamSet);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  // if no more waiting rsp
  if (pParamSet->async) {
    // call async cb func
    if (pParamSet->automatic && tmq->commitCb) {
      tmq->commitCb(tmq, pParamSet->rspErr, tmq->commitCbUserParam);
    } else if (!pParamSet->automatic && pParamSet->userCb) {
      // sem post
      pParamSet->userCb(tmq, pParamSet->rspErr, pParamSet->userParam);
    }
    taosMemoryFree(pParamSet);
  } else {
    tsem_post(&pParamSet->rspSem);
  }

#if 0
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
#endif
  return 0;
}

L
Liu Jicong 已提交
423 424
static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet) {
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
X
Xiaoyu Wang 已提交
425
  if (ASSERT(waitingRspNum >= 0)) {
426 427 428
    tscError("tmqCommitRspCountDown error:%d", waitingRspNum);
    return;
  }
L
Liu Jicong 已提交
429 430 431 432 433
  if (waitingRspNum == 0) {
    tmqCommitDone(pParamSet);
  }
}

434 435
int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
436 437
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
  // push into array
L
Liu Jicong 已提交
438
#if 0
439 440 441 442 443
  if (code == 0) {
    taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset);
  } else {
    taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
  }
L
Liu Jicong 已提交
444
#endif
L
Liu Jicong 已提交
445

L
Liu Jicong 已提交
446
  taosMemoryFree(pParam->pOffset);
L
Liu Jicong 已提交
447
  taosMemoryFree(pBuf->pData);
dengyihao's avatar
dengyihao 已提交
448
  taosMemoryFree(pBuf->pEpSet);
L
Liu Jicong 已提交
449

S
Shengliang Guan 已提交
450
  /*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
L
Liu Jicong 已提交
451 452
   * pOffset->version);*/

L
Liu Jicong 已提交
453
  tmqCommitRspCountDown(pParamSet);
454 455 456 457

  return 0;
}

L
Liu Jicong 已提交
458 459 460 461 462 463
static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pTopic, SMqCommitCbParamSet* pParamSet) {
  STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
  if (pOffset == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
464
  pOffset->val = pVg->currentOffset;
465

L
Liu Jicong 已提交
466 467 468 469
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pOffset->subKey, tmq->groupId, groupLen);
  pOffset->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);
L
Liu Jicong 已提交
470

L
Liu Jicong 已提交
471 472 473 474 475 476 477
  int32_t len;
  int32_t code;
  tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
  if (code < 0) {
    return -1;
  }
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
L
Liu Jicong 已提交
478 479 480 481
  if (buf == NULL) {
    taosMemoryFree(pOffset);
    return -1;
  }
L
Liu Jicong 已提交
482
  ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
483

L
Liu Jicong 已提交
484 485 486 487 488
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
  tEncodeSTqOffset(&encoder, pOffset);
L
Liu Jicong 已提交
489
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
490 491

  // build param
492
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
493
  if (pParam == NULL) {
L
Liu Jicong 已提交
494
    taosMemoryFree(pOffset);
L
Liu Jicong 已提交
495 496 497
    taosMemoryFree(buf);
    return -1;
  }
L
Liu Jicong 已提交
498 499 500 501 502 503
  pParam->params = pParamSet;
  pParam->pOffset = pOffset;

  // build send info
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
L
Liu Jicong 已提交
504
    taosMemoryFree(pOffset);
L
Liu Jicong 已提交
505 506
    taosMemoryFree(buf);
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
507 508 509 510 511 512 513 514
    return -1;
  }
  pMsgSendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = sizeof(SMsgHead) + len,
      .handle = NULL,
  };

X
Xiaoyu Wang 已提交
515 516
  tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d offset:%" PRId64, tmq->consumerId, pOffset->subKey, pVg->vgId,
           pOffset->val.version);
L
Liu Jicong 已提交
517 518

  // TODO: put into cb
L
Liu Jicong 已提交
519
  pVg->committedOffset = pVg->currentOffset;
L
Liu Jicong 已提交
520 521 522 523

  pMsgSendInfo->requestId = generateRequestId();
  pMsgSendInfo->requestObjRefId = 0;
  pMsgSendInfo->param = pParam;
L
Liu Jicong 已提交
524
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
525
  pMsgSendInfo->fp = tmqCommitCb;
L
Liu Jicong 已提交
526
  pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET;
L
Liu Jicong 已提交
527 528
  // send msg

L
Liu Jicong 已提交
529 530 531
  atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
  atomic_add_fetch_32(&pParamSet->totalRspNum, 1);

L
Liu Jicong 已提交
532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
  return 0;
}

int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
  char*   topic;
  int32_t vgId;
  if (TD_RES_TMQ(msg)) {
    SMqRspObj* pRspObj = (SMqRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
  } else if (TD_RES_TMQ_META(msg)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
    topic = pMetaRspObj->topic;
    vgId = pMetaRspObj->vgId;
L
Liu Jicong 已提交
548
  } else if (TD_RES_TMQ_METADATA(msg)) {
549 550 551
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
L
Liu Jicong 已提交
552 553 554 555 556 557 558 559 560
  } else {
    return TSDB_CODE_TMQ_INVALID_MSG;
  }

  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
561 562
  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;
L
Liu Jicong 已提交
563 564 565 566 567 568
  pParamSet->automatic = 0;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

L
Liu Jicong 已提交
569 570
  int32_t code = -1;

L
Liu Jicong 已提交
571 572 573 574 575 576 577
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(pTopic->topicName, topic) != 0) continue;
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      if (pVg->vgId != vgId) continue;

L
Liu Jicong 已提交
578
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
L
Liu Jicong 已提交
579
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
L
Liu Jicong 已提交
580 581
          tsem_destroy(&pParamSet->rspSem);
          taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
582
          goto FAIL;
L
Liu Jicong 已提交
583
        }
L
Liu Jicong 已提交
584
        goto HANDLE_RSP;
L
Liu Jicong 已提交
585 586
      }
    }
L
Liu Jicong 已提交
587
  }
L
Liu Jicong 已提交
588

L
Liu Jicong 已提交
589 590 591 592
HANDLE_RSP:
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
593 594 595
    return 0;
  }

L
Liu Jicong 已提交
596 597 598 599
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
L
Liu Jicong 已提交
600
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
601 602 603 604 605 606 607 608 609 610 611 612
    return code;
  } else {
    code = 0;
  }

FAIL:
  if (code != 0 && async) {
    userCb(tmq, code, userParam);
  }
  return 0;
}

L
Liu Jicong 已提交
613 614
static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                                     void* userParam) {
L
Liu Jicong 已提交
615 616
  int32_t code = -1;

617 618
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
L
Liu Jicong 已提交
619 620 621 622 623 624 625 626
    code = TSDB_CODE_OUT_OF_MEMORY;
    if (async) {
      if (automatic) {
        tmq->commitCb(tmq, code, tmq->commitCbUserParam);
      } else {
        userCb(tmq, code, userParam);
      }
    }
627 628
    return -1;
  }
629 630 631 632

  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;

633 634 635 636 637 638
  pParamSet->automatic = automatic;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

639 640 641
  // init as 1 to prevent concurrency issue
  pParamSet->waitingRspNum = 1;

642 643
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
644

645 646
    int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
    for (int32_t j = 0; j < numOfVgroups; j++) {
L
Liu Jicong 已提交
647
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
L
Liu Jicong 已提交
648
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
649 650
        tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, current %" PRId64 ", committed %" PRId64, tmq->consumerId,
                 pTopic->topicName, pVg->vgId, pVg->currentOffset.version, pVg->committedOffset.version);
L
Liu Jicong 已提交
651 652 653
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          continue;
        }
654 655 656
      } else {
        tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, not commit, current:%" PRId64 ", ordinal:%d/%d",
                 tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->currentOffset.version, j + 1, numOfVgroups);
657 658 659 660
      }
    }
  }

L
Liu Jicong 已提交
661
  // no request is sent
L
Liu Jicong 已提交
662 663 664 665 666 667
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
    return 0;
  }

L
Liu Jicong 已提交
668 669
  // count down since waiting rsp num init as 1
  tmqCommitRspCountDown(pParamSet);
670

671 672 673 674
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
675
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
676
#if 0
677 678
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
L
Liu Jicong 已提交
679
#endif
L
Liu Jicong 已提交
680
  }
681

L
Liu Jicong 已提交
682 683 684 685 686 687 688 689 690 691
  return code;
}

int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                       void* userParam) {
  if (msg) {
    return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
  } else {
    return tmqCommitConsumerImpl(tmq, automatic, async, userCb, userParam);
  }
692 693
}

694
void tmqAssignAskEpTask(void* param, void* tmrId) {
695 696 697
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
S
Shengliang Guan 已提交
698
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
699 700 701 702 703
    *pTaskType = TMQ_DELAYED_TASK__ASK_EP;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
704 705 706
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
707 708 709
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
S
Shengliang Guan 已提交
710
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
711 712 713 714 715
    *pTaskType = TMQ_DELAYED_TASK__COMMIT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
716 717 718
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
719 720 721
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
S
Shengliang Guan 已提交
722
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
723 724 725 726 727
    *pTaskType = TMQ_DELAYED_TASK__REPORT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
728 729
}

730
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
dengyihao's avatar
dengyihao 已提交
731 732 733 734
  if (pMsg) {
    taosMemoryFree(pMsg->pData);
    taosMemoryFree(pMsg->pEpSet);
  }
735 736 737 738
  return 0;
}

void tmqSendHbReq(void* param, void* tmrId) {
739 740 741
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq == NULL) {
L
Liu Jicong 已提交
742
    taosMemoryFree(param);
743 744
    return;
  }
D
dapan1121 已提交
745 746 747 748 749

  SMqHbReq req = {0};
  req.consumerId = tmq->consumerId;
  req.epoch = tmq->epoch;

L
Liu Jicong 已提交
750
  int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
D
dapan1121 已提交
751 752 753 754
  if (tlen < 0) {
    tscError("tSerializeSMqHbReq failed");
    return;
  }
L
Liu Jicong 已提交
755
  void* pReq = taosMemoryCalloc(1, tlen);
D
dapan1121 已提交
756 757 758 759 760 761 762 763 764
  if (tlen < 0) {
    tscError("failed to malloc MqHbReq msg, size:%d", tlen);
    return;
  }
  if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) {
    tscError("tSerializeSMqHbReq %d failed", tlen);
    taosMemoryFree(pReq);
    return;
  }
765 766 767 768

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
769
    goto OVER;
770 771 772
  }
  sendInfo->msgInfo = (SDataBuf){
      .pData = pReq,
D
dapan1121 已提交
773
      .len = tlen,
774 775 776 777 778 779 780
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = NULL;
  sendInfo->fp = tmqHbCb;
L
Liu Jicong 已提交
781
  sendInfo->msgType = TDMT_MND_TMQ_HB;
782 783 784 785 786 787 788

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

OVER:
789
  taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer);
790 791
}

792
int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
L
Liu Jicong 已提交
793
  STaosQall* qall = taosAllocateQall();
794
  taosReadAllQitems(pTmq->delayedTask, qall);
L
Liu Jicong 已提交
795

796 797 798 799
  if (qall->numOfItems == 0) {
    taosFreeQall(qall);
    return TSDB_CODE_SUCCESS;
  }
800

X
Xiaoyu Wang 已提交
801
  tscDebug("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, qall->numOfItems);
802 803
  int8_t* pTaskType = NULL;
  taosGetQitem(qall, (void**)&pTaskType);
L
Liu Jicong 已提交
804

805
  while (pTaskType != NULL) {
806
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
807
      tmqAskEp(pTmq, true);
808 809

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
810
      *pRefId = pTmq->refId;
811

X
Xiaoyu Wang 已提交
812
      tscDebug("consumer:0x%" PRIx64 " next retrieve ep from mnode in 1s", pTmq->consumerId);
813
      taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer);
L
Liu Jicong 已提交
814
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
815
      tmqCommitInner(pTmq, NULL, 1, 1, pTmq->commitCb, pTmq->commitCbUserParam);
816 817

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
818
      *pRefId = pTmq->refId;
819

X
Xiaoyu Wang 已提交
820 821
      tscDebug("consumer:0x%" PRIx64 " next commit to mnode in %.2fs", pTmq->consumerId,
               pTmq->autoCommitInterval / 1000.0);
822
      taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer);
L
Liu Jicong 已提交
823 824
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    }
825

L
Liu Jicong 已提交
826
    taosFreeQitem(pTaskType);
827
    taosGetQitem(qall, (void**)&pTaskType);
L
Liu Jicong 已提交
828
  }
829

L
Liu Jicong 已提交
830 831 832 833
  taosFreeQall(qall);
  return 0;
}

L
Liu Jicong 已提交
834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860
static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
    // do nothing
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
    SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
    tDeleteSMqAskEpRsp(&pEpRspWrapper->msg);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
    taosArrayDestroyP(pRsp->dataRsp.blockData, taosMemoryFree);
    taosArrayDestroy(pRsp->dataRsp.blockDataLen);
    taosArrayDestroyP(pRsp->dataRsp.blockTbName, taosMemoryFree);
    taosArrayDestroyP(pRsp->dataRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
    taosMemoryFree(pRsp->metaRsp.metaRsp);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
    taosArrayDestroyP(pRsp->taosxRsp.blockData, taosMemoryFree);
    taosArrayDestroy(pRsp->taosxRsp.blockDataLen);
    taosArrayDestroyP(pRsp->taosxRsp.blockTbName, taosMemoryFree);
    taosArrayDestroyP(pRsp->taosxRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
    // taosx
    taosArrayDestroy(pRsp->taosxRsp.createTableLen);
    taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree);
  }
}

L
Liu Jicong 已提交
861
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
862
  SMqRspWrapper* rspWrapper = NULL;
L
Liu Jicong 已提交
863
  while (1) {
L
Liu Jicong 已提交
864 865 866 867 868
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper) {
      tmqFreeRspWrapper(rspWrapper);
      taosFreeQitem(rspWrapper);
    } else {
L
Liu Jicong 已提交
869
      break;
L
Liu Jicong 已提交
870
    }
L
Liu Jicong 已提交
871 872
  }

L
Liu Jicong 已提交
873
  rspWrapper = NULL;
L
Liu Jicong 已提交
874 875
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
L
Liu Jicong 已提交
876 877 878 879 880
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper) {
      tmqFreeRspWrapper(rspWrapper);
      taosFreeQitem(rspWrapper);
    } else {
L
Liu Jicong 已提交
881
      break;
L
Liu Jicong 已提交
882
    }
L
Liu Jicong 已提交
883 884 885
  }
}

D
dapan1121 已提交
886
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
887 888
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
dengyihao's avatar
dengyihao 已提交
889 890

  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
891 892 893
  tsem_post(&pParam->rspSem);
  return 0;
}
894

L
Liu Jicong 已提交
895
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
X
Xiaoyu Wang 已提交
896 897 898 899
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
900
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
901
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
902
  }
L
Liu Jicong 已提交
903
  return 0;
X
Xiaoyu Wang 已提交
904 905
}

L
Liu Jicong 已提交
906
int32_t tmq_unsubscribe(tmq_t* tmq) {
L
Liu Jicong 已提交
907 908
  int32_t     rsp;
  int32_t     retryCnt = 0;
L
Liu Jicong 已提交
909
  tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
910 911 912 913 914 915 916 917 918 919
  while (1) {
    rsp = tmq_subscribe(tmq, lst);
    if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
      break;
    } else {
      retryCnt++;
      taosMsleep(500);
    }
  }

L
Liu Jicong 已提交
920 921
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
922 923
}

924 925
void tmqFreeImpl(void* handle) {
  tmq_t* tmq = (tmq_t*)handle;
L
Liu Jicong 已提交
926

927
  // TODO stop timer
L
Liu Jicong 已提交
928 929 930 931
  if (tmq->mqueue) {
    tmqClearUnhandleMsg(tmq);
    taosCloseQueue(tmq->mqueue);
  }
932
  if (tmq->delayedTask) taosCloseQueue(tmq->delayedTask);
L
Liu Jicong 已提交
933
  taosFreeQall(tmq->qall);
L
Liu Jicong 已提交
934

935
  tsem_destroy(&tmq->rspSem);
L
Liu Jicong 已提交
936

937 938 939
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
940
    taosMemoryFreeClear(pTopic->schema.pSchema);
941 942 943 944 945
    taosArrayDestroy(pTopic->vgs);
  }
  taosArrayDestroy(tmq->clientTopics);
  taos_close_internal(tmq->pTscObj);
  taosMemoryFree(tmq);
L
Liu Jicong 已提交
946 947
}

948 949 950 951 952 953 954 955 956
static void tmqMgmtInit(void) {
  tmqInitRes = 0;
  tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");

  if (tmqMgmt.timer == NULL) {
    tmqInitRes = TSDB_CODE_OUT_OF_MEMORY;
  }

  tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
957
  if (tmqMgmt.rsetId < 0) {
958 959 960 961
    tmqInitRes = terrno;
  }
}

L
Liu Jicong 已提交
962
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
963 964 965 966
  taosThreadOnce(&tmqInit, tmqMgmtInit);
  if (tmqInitRes != 0) {
    terrno = tmqInitRes;
    return NULL;
L
Liu Jicong 已提交
967 968
  }

L
Liu Jicong 已提交
969 970
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
L
Liu Jicong 已提交
971
    terrno = TSDB_CODE_OUT_OF_MEMORY;
972
    tscError("failed to create consumer, consumer group %s, code:%s", conf->groupId, terrstr());
L
Liu Jicong 已提交
973 974
    return NULL;
  }
L
Liu Jicong 已提交
975

L
Liu Jicong 已提交
976 977 978
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

L
Liu Jicong 已提交
979 980 981 982 983 984
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
  pTmq->delayedTask = taosOpenQueue();

  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
L
Liu Jicong 已提交
985
    terrno = TSDB_CODE_OUT_OF_MEMORY;
986
    tscError("consumer:0x%" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
S
Shengliang Guan 已提交
987
             pTmq->groupId);
L
Liu Jicong 已提交
988 989
    goto FAIL;
  }
L
Liu Jicong 已提交
990

L
Liu Jicong 已提交
991 992
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
993 994
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
995 996
  /*pTmq->epStatus = 0;*/
  /*pTmq->epSkipCnt = 0;*/
L
Liu Jicong 已提交
997

L
Liu Jicong 已提交
998 999 1000
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
1001
  pTmq->withTbName = conf->withTbName;
L
Liu Jicong 已提交
1002
  pTmq->useSnapshot = conf->snapEnable;
L
Liu Jicong 已提交
1003
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
1004
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
1005 1006
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
1007 1008
  pTmq->resetOffsetCfg = conf->resetOffset;

1009 1010
  pTmq->hbBgEnable = conf->hbBgEnable;

L
Liu Jicong 已提交
1011
  // assign consumerId
L
Liu Jicong 已提交
1012
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
1013

L
Liu Jicong 已提交
1014 1015
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
1016
    tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
S
Shengliang Guan 已提交
1017
             pTmq->groupId);
L
Liu Jicong 已提交
1018 1019
    goto FAIL;
  }
L
Liu Jicong 已提交
1020

L
Liu Jicong 已提交
1021 1022 1023
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
1024
    tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
S
Shengliang Guan 已提交
1025
             pTmq->groupId);
L
Liu Jicong 已提交
1026 1027 1028
    tsem_destroy(&pTmq->rspSem);
    goto FAIL;
  }
L
Liu Jicong 已提交
1029

1030 1031 1032 1033 1034 1035
  pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
  if (pTmq->refId < 0) {
    tmqFreeImpl(pTmq);
    return NULL;
  }

1036
  if (pTmq->hbBgEnable) {
L
Liu Jicong 已提交
1037 1038
    int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
    *pRefId = pTmq->refId;
1039
    pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
1040 1041
  }

1042
  tscInfo("consumer:0x%" PRIx64 " is setup, consumer groupId %s", pTmq->consumerId, pTmq->groupId);
1043
  return pTmq;
L
Liu Jicong 已提交
1044 1045 1046 1047 1048 1049 1050

FAIL:
  if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
  if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
  if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
  if (pTmq->qall) taosFreeQall(pTmq->qall);
  taosMemoryFree(pTmq);
1051

L
Liu Jicong 已提交
1052
  return NULL;
1053 1054
}

L
Liu Jicong 已提交
1055
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
L
Liu Jicong 已提交
1056 1057 1058
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
L
Liu Jicong 已提交
1059
  SMsgSendInfo*   sendInfo = NULL;
L
Liu Jicong 已提交
1060
  SCMSubscribeReq req = {0};
1061
  int32_t         code = 0;
1062

X
Xiaoyu Wang 已提交
1063
  tscDebug("consumer:0x%" PRIx64 " tmq subscribe start, numOfTopic %d", tmq->consumerId, sz);
L
Liu Jicong 已提交
1064

1065
  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
1066
  tstrncpy(req.clientId, tmq->clientId, 256);
L
Liu Jicong 已提交
1067
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
1068 1069
  req.topicNames = taosArrayInit(sz, sizeof(void*));

1070 1071 1072 1073
  if (req.topicNames == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
1074

L
Liu Jicong 已提交
1075 1076
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
1077 1078

    SName name = {0};
L
Liu Jicong 已提交
1079 1080 1081 1082
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
1083 1084
    }

1085
    tNameExtractFullName(&name, topicFName);
X
Xiaoyu Wang 已提交
1086
    tscDebug("consumer:0x%" PRIx64 ", subscribe topic: %s", tmq->consumerId, topicFName);
L
Liu Jicong 已提交
1087 1088

    taosArrayPush(req.topicNames, &topicFName);
1089 1090
  }

L
Liu Jicong 已提交
1091
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
1092

L
Liu Jicong 已提交
1093
  buf = taosMemoryMalloc(tlen);
1094 1095 1096 1097
  if (buf == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
L
Liu Jicong 已提交
1098

1099 1100 1101
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

L
Liu Jicong 已提交
1102
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
1103 1104 1105 1106
  if (sendInfo == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
1107

X
Xiaoyu Wang 已提交
1108
  SMqSubscribeCbParam param = {
L
Liu Jicong 已提交
1109
      .rspErr = 0,
1110 1111
      .refId = tmq->refId,
      .epoch = tmq->epoch,
X
Xiaoyu Wang 已提交
1112
  };
L
Liu Jicong 已提交
1113

1114 1115 1116
  if (tsem_init(&param.rspSem, 0, 0) != 0) {
    goto FAIL;
  }
L
Liu Jicong 已提交
1117 1118

  sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1119 1120 1121 1122
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
1123

L
Liu Jicong 已提交
1124 1125
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1126 1127
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
1128
  sendInfo->msgType = TDMT_MND_TMQ_SUBSCRIBE;
L
Liu Jicong 已提交
1129

1130 1131 1132 1133 1134
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
1135 1136
  // avoid double free if msg is sent
  buf = NULL;
L
Liu Jicong 已提交
1137
  sendInfo = NULL;
L
Liu Jicong 已提交
1138

L
Liu Jicong 已提交
1139 1140
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
1141

1142 1143 1144 1145
  if (param.rspErr != 0) {
    code = param.rspErr;
    goto FAIL;
  }
L
Liu Jicong 已提交
1146

L
Liu Jicong 已提交
1147
  int32_t retryCnt = 0;
L
Liu Jicong 已提交
1148
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
L
Liu Jicong 已提交
1149 1150 1151
    if (retryCnt++ > 10) {
      goto FAIL;
    }
1152

X
Xiaoyu Wang 已提交
1153
    tscDebug("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
L
Liu Jicong 已提交
1154 1155
    taosMsleep(500);
  }
1156

1157 1158
  // init ep timer
  if (tmq->epTimer == NULL) {
1159 1160 1161
    int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId1 = tmq->refId;
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, pRefId1, tmqMgmt.timer);
1162
  }
L
Liu Jicong 已提交
1163 1164

  // init auto commit timer
1165
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
1166 1167 1168
    int64_t* pRefId2 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId2 = tmq->refId;
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId2, tmqMgmt.timer);
L
Liu Jicong 已提交
1169 1170
  }

L
Liu Jicong 已提交
1171
FAIL:
L
Liu Jicong 已提交
1172
  taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
1173
  taosMemoryFree(buf);
L
Liu Jicong 已提交
1174
  taosMemoryFree(sendInfo);
L
Liu Jicong 已提交
1175

L
Liu Jicong 已提交
1176
  return code;
1177 1178
}

L
Liu Jicong 已提交
1179
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
1180
  //
1181
  conf->commitCb = cb;
L
Liu Jicong 已提交
1182
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
1183
}
1184

D
dapan1121 已提交
1185
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1186 1187
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
1188
  SMqClientTopic* pTopic = pParam->pTopic;
1189 1190 1191 1192 1193 1194

  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
  if (tmq == NULL) {
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1195
    taosMemoryFree(pMsg->pEpSet);
1196 1197 1198 1199 1200 1201
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  int32_t epoch = pParam->epoch;
  int32_t vgId = pParam->vgId;
L
Liu Jicong 已提交
1202
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
1203
  if (code != 0) {
L
Liu Jicong 已提交
1204
    tscWarn("msg discard from vgId:%d, epoch %d, since %s", vgId, epoch, terrstr());
L
Liu Jicong 已提交
1205
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1206 1207
    if (pMsg->pEpSet) taosMemoryFree(pMsg->pEpSet);

L
Liu Jicong 已提交
1208 1209 1210 1211
    if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
      goto CREATE_MSG_FAIL;
    }
L
Liu Jicong 已提交
1212
    if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
S
Shengliang Guan 已提交
1213
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1214
      if (pRspWrapper == NULL) {
S
Shengliang Guan 已提交
1215
        tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
Liu Jicong 已提交
1216 1217 1218 1219 1220 1221 1222 1223
        goto CREATE_MSG_FAIL;
      }
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      /*pRspWrapper->vgHandle = pVg;*/
      /*pRspWrapper->topicHandle = pTopic;*/
      taosWriteQitem(tmq->mqueue, pRspWrapper);
      tsem_post(&tmq->rspSem);
    }
L
fix txn  
Liu Jicong 已提交
1224
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1225 1226
  }

X
Xiaoyu Wang 已提交
1227 1228 1229
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
1230
    // do not write into queue since updating epoch reset
S
Shengliang Guan 已提交
1231
    tscWarn("msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d", vgId, msgEpoch,
L
Liu Jicong 已提交
1232
            tmqEpoch);
1233
    tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1234
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1235
    taosMemoryFree(pMsg->pEpSet);
X
Xiaoyu Wang 已提交
1236 1237 1238 1239
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
S
Shengliang Guan 已提交
1240
    tscWarn("mismatch rsp from vgId:%d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
X
Xiaoyu Wang 已提交
1241 1242
  }

L
Liu Jicong 已提交
1243 1244 1245
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

S
Shengliang Guan 已提交
1246
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1247
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1248
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1249
    taosMemoryFree(pMsg->pEpSet);
S
Shengliang Guan 已提交
1250
    tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
fix txn  
Liu Jicong 已提交
1251
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1252
  }
L
Liu Jicong 已提交
1253

L
Liu Jicong 已提交
1254
  pRspWrapper->tmqRspType = rspType;
L
Liu Jicong 已提交
1255 1256
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
1257

L
Liu Jicong 已提交
1258
  if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1259 1260 1261
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
wmmhello's avatar
wmmhello 已提交
1262
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1263
    memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1264

1265
    tscDebug("consumer:0x%" PRIx64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d",
L
Liu Jicong 已提交
1266 1267 1268 1269
             tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version,
             rspType);

  } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
1270 1271 1272 1273
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqMetaRsp(&decoder, &pRspWrapper->metaRsp);
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1274
    memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1275 1276 1277 1278 1279 1280
  } else if (rspType == TMQ_MSG_TYPE__TAOSX_RSP) {
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp);
    tDecoderClear(&decoder);
    memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1281
  }
L
Liu Jicong 已提交
1282

L
Liu Jicong 已提交
1283
  taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1284
  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
1285

1286
  tscDebug("consumer:0x%" PRIx64 ", put poll res into mqueue %p", tmq->consumerId, pRspWrapper);
L
Liu Jicong 已提交
1287

L
Liu Jicong 已提交
1288
  taosWriteQitem(tmq->mqueue, pRspWrapper);
1289
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1290

L
Liu Jicong 已提交
1291
  return 0;
L
fix txn  
Liu Jicong 已提交
1292
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1293
  if (epoch == tmq->epoch) {
L
Liu Jicong 已提交
1294 1295
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
1296
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1297
  return -1;
1298 1299
}

L
Liu Jicong 已提交
1300
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
1301 1302
  bool set = false;

1303
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
1304
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
1305

X
Xiaoyu Wang 已提交
1306 1307
  char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
  tscDebug("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
1308
           tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur);
1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }
1320

1321 1322 1323 1324 1325
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
1326
      tscDebug("consumer:0x%" PRIx64 ", new vg num: %d", tmq->consumerId, vgNumCur);
1327 1328 1329
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
        sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
L
Liu Jicong 已提交
1330
        char buf[80];
L
Liu Jicong 已提交
1331
        tFormatOffset(buf, 80, &pVgCur->currentOffset);
1332
        tscDebug("consumer:0x%" PRIx64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch,
L
Liu Jicong 已提交
1333
                 pVgCur->vgId, vgKey, buf);
L
Liu Jicong 已提交
1334
        taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal));
1335 1336 1337 1338 1339 1340 1341 1342
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
    topic.schema = pTopicEp->schema;
L
Liu Jicong 已提交
1343 1344
    pTopicEp->schema.nCols = 0;
    pTopicEp->schema.pSchema = NULL;
1345
    tstrncpy(topic.topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
1346 1347
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);

1348
    tscDebug("consumer:0x%" PRIx64 ", update topic: %s", tmq->consumerId, topic.topicName);
1349 1350 1351 1352 1353 1354

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
L
Liu Jicong 已提交
1355 1356
      STqOffsetVal* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      STqOffsetVal  offsetNew = {.type = tmq->resetOffsetCfg};
1357
      if (pOffset != NULL) {
L
Liu Jicong 已提交
1358
        offsetNew = *pOffset;
1359 1360 1361 1362
      }

      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1363
          .currentOffset = offsetNew,
1364 1365 1366 1367 1368 1369 1370 1371 1372 1373
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
          .vgSkipCnt = 0,
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
    taosArrayPush(newTopics, &topic);
  }
1374 1375

  // destroy current buffered existed topics info
1376 1377 1378 1379
  if (tmq->clientTopics) {
    int32_t sz = taosArrayGetSize(tmq->clientTopics);
    for (int32_t i = 0; i < sz; i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
1380
      if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
1381
      taosArrayDestroy(pTopic->vgs);
L
Liu Jicong 已提交
1382
    }
1383

1384
    taosArrayDestroy(tmq->clientTopics);
X
Xiaoyu Wang 已提交
1385
  }
1386

L
Liu Jicong 已提交
1387
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
1388
  tmq->clientTopics = newTopics;
1389

1390
  if (taosArrayGetSize(tmq->clientTopics) == 0) {
1391
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
1392
  } else {
1393
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
1394
  }
1395

X
Xiaoyu Wang 已提交
1396
  atomic_store_32(&tmq->epoch, epoch);
1397
  tscDebug("consumer:0x%" PRIx64 ", update topic info completed", tmq->consumerId);
X
Xiaoyu Wang 已提交
1398 1399 1400
  return set;
}

D
dapan1121 已提交
1401
int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
1402
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1403
  int8_t           async = pParam->async;
1404 1405 1406 1407 1408 1409 1410 1411 1412
  tmq_t*           tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);

  if (tmq == NULL) {
    if (!async) {
      tsem_destroy(&pParam->rspSem);
    } else {
      taosMemoryFree(pParam);
    }
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1413
    taosMemoryFree(pMsg->pEpSet);
1414 1415 1416 1417
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

L
Liu Jicong 已提交
1418
  pParam->code = code;
1419
  if (code != 0) {
X
Xiaoyu Wang 已提交
1420 1421
    tscError("consumer:0x%" PRIx64 ", get topic endpoint error, async:%d, code:%s", tmq->consumerId, pParam->async,
             tstrerror(code));
L
Liu Jicong 已提交
1422
    goto END;
1423
  }
L
Liu Jicong 已提交
1424

L
Liu Jicong 已提交
1425
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1426
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1427
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1428 1429 1430
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
  if (head->epoch <= epoch) {
1431 1432
    tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep",
             tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1433
    goto END;
1434
  }
L
Liu Jicong 已提交
1435

1436 1437 1438
  tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId,
           head->epoch, epoch);

L
Liu Jicong 已提交
1439
  if (!async) {
L
Liu Jicong 已提交
1440 1441
    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
S
Shengliang Guan 已提交
1442 1443
    /*printf("rsp epoch %" PRId64 " sz %" PRId64 "\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %" PRId64 " sz %" PRId64 "\n", tmq->epoch, tmq->clientTopics->size);*/
L
Liu Jicong 已提交
1444
    tmqUpdateEp(tmq, head->epoch, &rsp);
L
Liu Jicong 已提交
1445
    tDeleteSMqAskEpRsp(&rsp);
X
Xiaoyu Wang 已提交
1446
  } else {
S
Shengliang Guan 已提交
1447
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1448
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1449
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1450 1451
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1452
    }
1453

L
Liu Jicong 已提交
1454 1455 1456
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1457
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1458

L
Liu Jicong 已提交
1459
    taosWriteQitem(tmq->mqueue, pWrapper);
1460
    tsem_post(&tmq->rspSem);
1461
  }
L
Liu Jicong 已提交
1462 1463

END:
L
Liu Jicong 已提交
1464
  /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1465
  if (!async) {
L
Liu Jicong 已提交
1466
    tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
1467 1468
  } else {
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
1469
  }
dengyihao's avatar
dengyihao 已提交
1470 1471

  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
1472
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1473
  return code;
1474 1475
}

L
Liu Jicong 已提交
1476
int32_t tmqAskEp(tmq_t* tmq, bool async) {
1477
  int32_t code = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1478
#if 0
L
Liu Jicong 已提交
1479
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
L
fix txn  
Liu Jicong 已提交
1480
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
1481
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
1482
    tscTrace("consumer:0x%" PRIx64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
1483
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
1484
  }
L
temp  
Liu Jicong 已提交
1485
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
1486
#endif
1487

D
dapan1121 已提交
1488 1489 1490 1491 1492
  SMqAskEpReq req = {0};
  req.consumerId = tmq->consumerId;
  req.epoch = tmq->epoch;
  strcpy(req.cgroup, tmq->groupId);

L
Liu Jicong 已提交
1493
  int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
D
dapan1121 已提交
1494
  if (tlen < 0) {
X
Xiaoyu Wang 已提交
1495
    tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", tmq->consumerId);
D
dapan1121 已提交
1496 1497
    return -1;
  }
1498

L
Liu Jicong 已提交
1499
  void* pReq = taosMemoryCalloc(1, tlen);
L
Liu Jicong 已提交
1500
  if (pReq == NULL) {
X
Xiaoyu Wang 已提交
1501
    tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", tmq->consumerId, tlen);
1502
    terrno = TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
1503 1504
    return -1;
  }
1505

D
dapan1121 已提交
1506
  if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
X
Xiaoyu Wang 已提交
1507
    tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", tmq->consumerId, tlen);
D
dapan1121 已提交
1508
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
1509
    return -1;
L
Liu Jicong 已提交
1510
  }
1511

L
Liu Jicong 已提交
1512
  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
1513
  if (pParam == NULL) {
X
Xiaoyu Wang 已提交
1514
    tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", tmq->consumerId);
D
dapan1121 已提交
1515
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
1516
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1517
    return -1;
L
Liu Jicong 已提交
1518
  }
1519

1520 1521
  pParam->refId = tmq->refId;
  pParam->epoch = tmq->epoch;
L
Liu Jicong 已提交
1522
  pParam->async = async;
X
Xiaoyu Wang 已提交
1523
  tsem_init(&pParam->rspSem, 0, 0);
1524

1525
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1526 1527
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1528
    taosMemoryFree(pParam);
D
dapan1121 已提交
1529
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
1530
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1531 1532 1533 1534
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
D
dapan1121 已提交
1535
      .pData = pReq,
L
Liu Jicong 已提交
1536 1537 1538 1539
      .len = tlen,
      .handle = NULL,
  };

1540
  sendInfo->requestId = tmq->consumerId;
L
Liu Jicong 已提交
1541 1542 1543
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1544
  sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
1545

L
Liu Jicong 已提交
1546
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
1547
  tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, async:%d", tmq->consumerId, async);
L
add log  
Liu Jicong 已提交
1548

L
Liu Jicong 已提交
1549 1550
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1551

L
Liu Jicong 已提交
1552
  if (!async) {
L
Liu Jicong 已提交
1553 1554 1555 1556
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }
1557

L
Liu Jicong 已提交
1558
  return code;
1559 1560
}

L
Liu Jicong 已提交
1561
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1562 1563 1564
  /*strcpy(pReq->topic, pTopic->topicName);*/
  /*strcpy(pReq->cgroup, tmq->groupId);*/

L
Liu Jicong 已提交
1565 1566 1567 1568
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1569

1570
  pReq->withTbName = tmq->withTbName;
1571
  pReq->timeout = timeout;
L
Liu Jicong 已提交
1572
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1573
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1574
  /*pReq->currentOffset = reqOffset;*/
L
Liu Jicong 已提交
1575
  pReq->reqOffset = pVg->currentOffset;
L
Liu Jicong 已提交
1576
  pReq->reqId = generateRequestId();
1577

L
Liu Jicong 已提交
1578 1579
  pReq->useSnapshot = tmq->useSnapshot;

D
dapan1121 已提交
1580
  pReq->head.vgId = pVg->vgId;
1581 1582
}

L
Liu Jicong 已提交
1583 1584
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
L
Liu Jicong 已提交
1585
  pRspObj->resType = RES_TYPE__TMQ_META;
L
Liu Jicong 已提交
1586 1587 1588 1589 1590 1591 1592 1593
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

L
Liu Jicong 已提交
1594 1595 1596
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1597 1598
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1599
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1600
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1601
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
L
Liu Jicong 已提交
1602

L
Liu Jicong 已提交
1603 1604
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
L
Liu Jicong 已提交
1605
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1606 1607
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1608

L
Liu Jicong 已提交
1609
  return pRspObj;
X
Xiaoyu Wang 已提交
1610 1611
}

L
Liu Jicong 已提交
1612 1613
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
1614
  pRspObj->resType = RES_TYPE__TMQ_METADATA;
L
Liu Jicong 已提交
1615 1616 1617 1618
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;
  pRspObj->resIter = -1;
1619
  memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp));
L
Liu Jicong 已提交
1620 1621 1622

  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
1623
  if (!pWrapper->taosxRsp.withSchema) {
L
Liu Jicong 已提交
1624 1625 1626 1627 1628 1629
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }

  return pRspObj;
}

1630
// broadcast the poll request to all related vnodes
1631
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
1632
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
X
Xiaoyu Wang 已提交
1633
  tscDebug("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics);
1634 1635

  for (int i = 0; i < numOfTopics; i++) {
X
Xiaoyu Wang 已提交
1636 1637 1638 1639
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
1640
      if (vgStatus == TMQ_VG_STATUS__WAIT) {
L
Liu Jicong 已提交
1641
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
X
Xiaoyu Wang 已提交
1642 1643
        tscDebug("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
                 pVg->vgId, vgSkipCnt);
X
Xiaoyu Wang 已提交
1644
        continue;
L
Liu Jicong 已提交
1645
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1646 1647 1648 1649
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
1650
        tscDebug("consumer:0x%" PRIx64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId);
L
temp  
Liu Jicong 已提交
1651 1652
        }
#endif
X
Xiaoyu Wang 已提交
1653
      }
1654

L
Liu Jicong 已提交
1655
      atomic_store_32(&pVg->vgSkipCnt, 0);
D
dapan1121 已提交
1656 1657 1658 1659 1660 1661 1662 1663 1664

      SMqPollReq req = {0};
      tmqBuildConsumeReqImpl(&req, tmq, timeout, pTopic, pVg);
      int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
      if (msgSize < 0) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        tsem_post(&tmq->rspSem);
        return -1;
      }
1665

L
Liu Jicong 已提交
1666
      char* msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
1667 1668 1669 1670 1671
      if (NULL == msg) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        tsem_post(&tmq->rspSem);
        return -1;
      }
L
Liu Jicong 已提交
1672

D
dapan1121 已提交
1673 1674
      if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
        taosMemoryFree(msg);
X
Xiaoyu Wang 已提交
1675
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1676
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1677 1678
        return -1;
      }
L
Liu Jicong 已提交
1679

wafwerar's avatar
wafwerar 已提交
1680
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1681
      if (pParam == NULL) {
D
dapan1121 已提交
1682
        taosMemoryFree(msg);
X
Xiaoyu Wang 已提交
1683
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1684
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1685 1686
        return -1;
      }
1687

1688 1689 1690
      pParam->refId = tmq->refId;
      pParam->epoch = tmq->epoch;

L
Liu Jicong 已提交
1691
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1692
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1693
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1694

1695
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1696
      if (sendInfo == NULL) {
D
dapan1121 已提交
1697
        taosMemoryFree(msg);
wafwerar's avatar
wafwerar 已提交
1698
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1699
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1700
        tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1701 1702 1703 1704
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
D
dapan1121 已提交
1705 1706
          .pData = msg,
          .len = msgSize,
X
Xiaoyu Wang 已提交
1707 1708
          .handle = NULL,
      };
1709

D
dapan1121 已提交
1710
      sendInfo->requestId = req.reqId;
X
Xiaoyu Wang 已提交
1711
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1712
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1713
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1714
      sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
X
Xiaoyu Wang 已提交
1715 1716

      int64_t transporterId = 0;
L
Liu Jicong 已提交
1717

1718
      char offsetFormatBuf[80];
L
Liu Jicong 已提交
1719
      tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffset);
1720 1721

      tscDebug("consumer:0x%" PRIx64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:0x%" PRIx64,
D
dapan1121 已提交
1722
               tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
X
Xiaoyu Wang 已提交
1723
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
1724

X
Xiaoyu Wang 已提交
1725 1726 1727 1728
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
1729

X
Xiaoyu Wang 已提交
1730 1731 1732
  return 0;
}

L
Liu Jicong 已提交
1733 1734
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1735
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1736 1737
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1738
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
L
Liu Jicong 已提交
1739
      tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1740
      /*tmqClearUnhandleMsg(tmq);*/
L
Liu Jicong 已提交
1741
      tDeleteSMqAskEpRsp(rspMsg);
X
Xiaoyu Wang 已提交
1742 1743
      *pReset = true;
    } else {
L
Liu Jicong 已提交
1744
      tmqFreeRspWrapper(rspWrapper);
X
Xiaoyu Wang 已提交
1745 1746 1747 1748 1749 1750 1751 1752
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
1753
void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
X
Xiaoyu Wang 已提交
1754
  while (1) {
L
Liu Jicong 已提交
1755 1756 1757
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1758
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1759
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
L
Liu Jicong 已提交
1760 1761

      if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1762
        /*tscDebug("consumer %" PRId64 " mqueue empty", tmq->consumerId);*/
L
Liu Jicong 已提交
1763 1764
        return NULL;
      }
X
Xiaoyu Wang 已提交
1765 1766
    }

1767
    tscDebug("consumer:0x%" PRIx64 " handle rsp %p", tmq->consumerId, rspWrapper);
L
Liu Jicong 已提交
1768

L
Liu Jicong 已提交
1769 1770 1771 1772 1773
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(rspWrapper);
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
      return NULL;
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1774
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
1775
      tscDebug("consumer:0x%" PRIx64 " actual process poll rsp", tmq->consumerId);
L
Liu Jicong 已提交
1776
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
1777
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1778
      if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1779
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1780
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1781
         * rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1782
        pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;
X
Xiaoyu Wang 已提交
1783
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1784
        if (pollRspWrapper->dataRsp.blockNum == 0) {
L
Liu Jicong 已提交
1785 1786
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1787 1788
          continue;
        }
L
Liu Jicong 已提交
1789
        // build rsp
L
Liu Jicong 已提交
1790
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1791
        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1792
        return pRsp;
X
Xiaoyu Wang 已提交
1793
      } else {
X
Xiaoyu Wang 已提交
1794
        tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
1795
                 tmq->consumerId, pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1796
        tmqFreeRspWrapper(rspWrapper);
L
Liu Jicong 已提交
1797 1798 1799 1800 1801
        taosFreeQitem(pollRspWrapper);
      }
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1802
      if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1803
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1804
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1805
         * rspMsg->msg.rspOffset);*/
wmmhello's avatar
wmmhello 已提交
1806
        pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset;
L
Liu Jicong 已提交
1807 1808
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // build rsp
L
Liu Jicong 已提交
1809
        SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1810 1811 1812
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
X
Xiaoyu Wang 已提交
1813
        tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
1814
                 tmq->consumerId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1815
        tmqFreeRspWrapper(rspWrapper);
L
Liu Jicong 已提交
1816
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1817
      }
L
Liu Jicong 已提交
1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
      if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
         * rspMsg->msg.rspOffset);*/
        pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset;
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        if (pollRspWrapper->taosxRsp.blockNum == 0) {
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
          continue;
        }
wmmhello's avatar
wmmhello 已提交
1833

L
Liu Jicong 已提交
1834
        // build rsp
wmmhello's avatar
wmmhello 已提交
1835
        void* pRsp = NULL;
L
Liu Jicong 已提交
1836
        if (pollRspWrapper->taosxRsp.createTableNum == 0) {
wmmhello's avatar
wmmhello 已提交
1837
          pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1838
        } else {
wmmhello's avatar
wmmhello 已提交
1839 1840
          pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper);
        }
L
Liu Jicong 已提交
1841 1842 1843
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
X
Xiaoyu Wang 已提交
1844
        tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
1845
                 tmq->consumerId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1846
        tmqFreeRspWrapper(rspWrapper);
L
Liu Jicong 已提交
1847 1848
        taosFreeQitem(pollRspWrapper);
      }
X
Xiaoyu Wang 已提交
1849
    } else {
L
fix  
Liu Jicong 已提交
1850
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1851
      bool reset = false;
L
Liu Jicong 已提交
1852 1853
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1854
      if (pollIfReset && reset) {
1855
        tscDebug("consumer:0x%" PRIx64 ", reset and repoll", tmq->consumerId);
1856
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
1857 1858 1859 1860 1861
      }
    }
  }
}

1862
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1863 1864
  void*   rspObj;
  int64_t startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1865

1866
  tscDebug("consumer:0x%" PRIx64 ", start poll at %" PRId64, tmq->consumerId, startTime);
L
Liu Jicong 已提交
1867

1868 1869 1870
#if 0
  tmqHandleAllDelayedTask(tmq);
  tmqPollImpl(tmq, timeout);
1871
  rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1872 1873
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1874
  }
1875
#endif
X
Xiaoyu Wang 已提交
1876

1877
  // in no topic status, delayed task also need to be processed
L
Liu Jicong 已提交
1878
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1879
    tscDebug("consumer:0x%" PRIx64 ", poll return since consumer status is init", tmq->consumerId);
1880 1881 1882
    return NULL;
  }

L
Liu Jicong 已提交
1883 1884 1885 1886 1887 1888
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
    int32_t retryCnt = 0;
    while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
      if (retryCnt++ > 10) {
        return NULL;
      }
1889

X
Xiaoyu Wang 已提交
1890
      tscDebug("consumer:0x%" PRIx64 " not ready, retry:%d/10 in 500ms", tmq->consumerId, retryCnt);
L
Liu Jicong 已提交
1891 1892 1893 1894
      taosMsleep(500);
    }
  }

X
Xiaoyu Wang 已提交
1895
  while (1) {
L
Liu Jicong 已提交
1896
    tmqHandleAllDelayedTask(tmq);
1897

L
Liu Jicong 已提交
1898
    if (tmqPollImpl(tmq, timeout) < 0) {
1899
      tscDebug("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId);
L
Liu Jicong 已提交
1900 1901
      /*return NULL;*/
    }
L
Liu Jicong 已提交
1902

1903
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1904
    if (rspObj) {
1905
      tscDebug("consumer:0x%" PRIx64 ", return rsp %p", tmq->consumerId, rspObj);
L
Liu Jicong 已提交
1906
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
1907
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
1908
      tscDebug("consumer:0x%" PRIx64 ", return null since no committed offset", tmq->consumerId);
L
Liu Jicong 已提交
1909
      return NULL;
X
Xiaoyu Wang 已提交
1910
    }
1911

1912
    if (timeout != -1) {
L
Liu Jicong 已提交
1913 1914 1915
      int64_t currentTime = taosGetTimestampMs();
      int64_t passedTime = currentTime - startTime;
      if (passedTime > timeout) {
1916
        tscDebug("consumer:0x%" PRIx64 ", (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
L
Liu Jicong 已提交
1917
                 tmq->consumerId, tmq->epoch, startTime, currentTime);
X
Xiaoyu Wang 已提交
1918 1919
        return NULL;
      }
1920
      /*tscInfo("consumer:0x%" PRIx64 ", (epoch %d) wait, start time %" PRId64 ", current time %" PRId64*/
L
Liu Jicong 已提交
1921 1922 1923
      /*", left time %" PRId64,*/
      /*tmq->consumerId, tmq->epoch, startTime, currentTime, (timeout - passedTime));*/
      tsem_timewait(&tmq->rspSem, (timeout - passedTime));
L
Liu Jicong 已提交
1924 1925
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
L
Liu Jicong 已提交
1926
      tsem_timewait(&tmq->rspSem, 1000);
X
Xiaoyu Wang 已提交
1927 1928 1929 1930
    }
  }
}

L
Liu Jicong 已提交
1931
int32_t tmq_consumer_close(tmq_t* tmq) {
1932
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
L
Liu Jicong 已提交
1933 1934
    int32_t rsp = tmq_commit_sync(tmq, NULL);
    if (rsp != 0) {
L
Liu Jicong 已提交
1935
      return rsp;
1936 1937
    }

L
Liu Jicong 已提交
1938
    int32_t     retryCnt = 0;
1939
    tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
1940 1941 1942 1943 1944 1945 1946 1947 1948 1949
    while (1) {
      rsp = tmq_subscribe(tmq, lst);
      if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
        break;
      } else {
        retryCnt++;
        taosMsleep(500);
      }
    }

1950
    tmq_list_destroy(lst);
L
Liu Jicong 已提交
1951
  }
1952
  taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
L
Liu Jicong 已提交
1953
  return 0;
1954
}
L
Liu Jicong 已提交
1955

L
Liu Jicong 已提交
1956 1957
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
1958
    return "success";
L
Liu Jicong 已提交
1959
  } else if (err == -1) {
L
Liu Jicong 已提交
1960 1961 1962
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
1963 1964
  }
}
L
Liu Jicong 已提交
1965

L
Liu Jicong 已提交
1966 1967 1968 1969 1970
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
    return TMQ_RES_TABLE_META;
1971 1972
  } else if (TD_RES_TMQ_METADATA(res)) {
    return TMQ_RES_METADATA;
L
Liu Jicong 已提交
1973 1974 1975 1976 1977
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
1978
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
1979 1980
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
1981
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1982 1983 1984
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
1985 1986 1987
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1988 1989 1990 1991 1992
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1993 1994 1995 1996
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1997 1998 1999
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
2000 2001 2002
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
2003 2004 2005 2006 2007
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
2008 2009 2010 2011
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
2012 2013 2014
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
2015
  } else if (TD_RES_TMQ_METADATA(res)) {
2016 2017
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
2018 2019 2020 2021
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
2022 2023 2024 2025 2026 2027 2028 2029

const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
2030
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
2031 2032
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
L
Liu Jicong 已提交
2033 2034 2035
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
2036
    }
L
Liu Jicong 已提交
2037 2038
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
  }
L
Liu Jicong 已提交
2039 2040
  return NULL;
}
2041

L
Liu Jicong 已提交
2042
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
2043
  //
L
Liu Jicong 已提交
2044
  tmqCommitInner(tmq, msg, 0, 1, cb, param);
L
Liu Jicong 已提交
2045 2046
}

2047 2048
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) {
  //
L
Liu Jicong 已提交
2049
  return tmqCommitInner(tmq, msg, 0, 0, NULL, NULL);
2050
}