clientTmq.c 61.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "cJSON.h"
17 18 19
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22 23
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
24
#include "tqueue.h"
25
#include "tref.h"
L
Liu Jicong 已提交
26 27
#include "ttimer.h"

L
Liu Jicong 已提交
28 29 30 31 32 33 34
#if 0
#undef tsem_post
#define tsem_post(x)                                         \
  tscInfo("call sem post at %s %d", __FUNCTION__, __LINE__); \
  sem_post(x)
#endif

35
struct SMqMgmt{
36 37 38
  int8_t  inited;
  tmr_h   timer;
  int32_t rsetId;
39
};
L
Liu Jicong 已提交
40

41 42 43
static TdThreadOnce tmqInit = PTHREAD_ONCE_INIT;   // initialize only once
volatile int32_t    tmqInitRes = 0;                // initialize rsp code
static struct SMqMgmt tmqMgmt = {0};
44

L
Liu Jicong 已提交
45 46 47 48 49 50
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
51 52 53
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
54 55
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
56
struct tmq_list_t {
L
Liu Jicong 已提交
57
  SArray container;
L
Liu Jicong 已提交
58
};
L
Liu Jicong 已提交
59

L
Liu Jicong 已提交
60
struct tmq_conf_t {
61 62 63 64 65 66 67 68
  char           clientId[256];
  char           groupId[TSDB_CGROUP_LEN];
  int8_t         autoCommit;
  int8_t         resetOffset;
  int8_t         withTbName;
  int8_t         snapEnable;
  int32_t        snapBatchSize;
  bool           hbBgEnable;
69 70 71 72 73
  uint16_t       port;
  int32_t        autoCommitInterval;
  char*          ip;
  char*          user;
  char*          pass;
74
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
75
  void*          commitCbUserParam;
L
Liu Jicong 已提交
76 77 78
};

struct tmq_t {
79
  int64_t  refId;
L
Liu Jicong 已提交
80
  // conf
81 82 83 84 85 86 87 88 89
  char     groupId[TSDB_CGROUP_LEN];
  char     clientId[256];
  int8_t   withTbName;
  int8_t   useSnapshot;
  int8_t   autoCommit;
  int32_t  autoCommitInterval;
  int32_t  resetOffsetCfg;
  uint64_t consumerId;
  bool     hbBgEnable;
90

L
Liu Jicong 已提交
91 92
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
93 94 95 96

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
97 98
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
99
  int32_t epSkipCnt;
L
Liu Jicong 已提交
100
#endif
L
Liu Jicong 已提交
101 102
  int64_t pollCnt;

L
Liu Jicong 已提交
103
  // timer
104 105
  tmr_h hbLiveTimer;
  tmr_h epTimer;
L
Liu Jicong 已提交
106 107 108
  tmr_h reportTimer;
  tmr_h commitTimer;

L
Liu Jicong 已提交
109 110 111 112
  // connection
  STscObj* pTscObj;

  // container
L
Liu Jicong 已提交
113
  SArray*     clientTopics;  // SArray<SMqClientTopic>
L
Liu Jicong 已提交
114
  STaosQueue* mqueue;        // queue of rsp
L
Liu Jicong 已提交
115
  STaosQall*  qall;
L
Liu Jicong 已提交
116 117 118 119
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit

  // ctl
  tsem_t rspSem;
L
Liu Jicong 已提交
120 121
};

X
Xiaoyu Wang 已提交
122 123 124 125 126 127 128 129
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
130
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
131
  TMQ_CONSUMER_STATUS__RECOVER,
L
Liu Jicong 已提交
132 133
};

L
Liu Jicong 已提交
134
enum {
135
  TMQ_DELAYED_TASK__ASK_EP = 1,
L
Liu Jicong 已提交
136 137 138 139
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

L
Liu Jicong 已提交
140
typedef struct {
141 142 143
  // statistics
  int64_t pollCnt;
  // offset
L
Liu Jicong 已提交
144 145
  STqOffsetVal committedOffset;
  STqOffsetVal currentOffset;
L
Liu Jicong 已提交
146
  // connection info
147
  int32_t vgId;
X
Xiaoyu Wang 已提交
148
  int32_t vgStatus;
L
Liu Jicong 已提交
149
  int32_t vgSkipCnt;
150 151 152
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
153
typedef struct {
154
  // subscribe info
155 156 157
  char           topicName[TSDB_TOPIC_FNAME_LEN];
  char           db[TSDB_DB_FNAME_LEN];
  SArray*        vgs;  // SArray<SMqClientVg>
L
Liu Jicong 已提交
158
  SSchemaWrapper schema;
159 160
} SMqClientTopic;

L
Liu Jicong 已提交
161 162 163 164 165
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
166
  union {
L
Liu Jicong 已提交
167 168
    SMqDataRsp dataRsp;
    SMqMetaRsp metaRsp;
L
Liu Jicong 已提交
169
    STaosxRsp  taosxRsp;
L
Liu Jicong 已提交
170
  };
L
Liu Jicong 已提交
171 172
} SMqPollRspWrapper;

L
Liu Jicong 已提交
173
typedef struct {
174 175
  int64_t refId;
  int32_t epoch;
L
Liu Jicong 已提交
176 177
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
178
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
179

L
Liu Jicong 已提交
180
typedef struct {
181 182
  int64_t refId;
  int32_t epoch;
L
Liu Jicong 已提交
183
  int32_t code;
L
Liu Jicong 已提交
184
  int32_t async;
X
Xiaoyu Wang 已提交
185
  tsem_t  rspSem;
186 187
} SMqAskEpCbParam;

L
Liu Jicong 已提交
188
typedef struct {
189 190
  int64_t         refId;
  int32_t         epoch;
L
Liu Jicong 已提交
191
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
192
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
193
  int32_t         vgId;
L
Liu Jicong 已提交
194
  tsem_t          rspSem;
X
Xiaoyu Wang 已提交
195
} SMqPollCbParam;
196

197
typedef struct {
198 199
  int64_t        refId;
  int32_t        epoch;
L
Liu Jicong 已提交
200 201
  int8_t         automatic;
  int8_t         async;
L
Liu Jicong 已提交
202 203
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
L
Liu Jicong 已提交
204
  int32_t        rspErr;
205
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
206 207 208 209
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
  void*  userParam;
  tsem_t rspSem;
210 211 212 213 214
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
  STqOffset*           pOffset;
215
  SMqClientVg*         pMqVg;
L
Liu Jicong 已提交
216 217
  /*char                 topicName[TSDB_TOPIC_FNAME_LEN];*/
  /*int32_t              vgId;*/
218
} SMqCommitCbParam;
219

220 221
static int32_t tmqAskEp(tmq_t* tmq, bool async);

222
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
223
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
224 225 226 227 228
  if (conf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return conf;
  }

229
  conf->withTbName = false;
L
Liu Jicong 已提交
230
  conf->autoCommit = true;
L
Liu Jicong 已提交
231
  conf->autoCommitInterval = 5000;
X
Xiaoyu Wang 已提交
232
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
233
  conf->hbBgEnable = true;
234

235 236 237
  return conf;
}

L
Liu Jicong 已提交
238
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
239 240 241 242 243 244
  if (conf) {
    if (conf->ip) taosMemoryFree(conf->ip);
    if (conf->user) taosMemoryFree(conf->user);
    if (conf->pass) taosMemoryFree(conf->pass);
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
245 246 247
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
248
  if (strcmp(key, "group.id") == 0) {
L
Liu Jicong 已提交
249
    tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN);
L
Liu Jicong 已提交
250
    return TMQ_CONF_OK;
251
  }
L
Liu Jicong 已提交
252

253
  if (strcmp(key, "client.id") == 0) {
L
Liu Jicong 已提交
254
    tstrncpy(conf->clientId, value, 256);
L
Liu Jicong 已提交
255 256
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
257

L
Liu Jicong 已提交
258 259
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
260
      conf->autoCommit = true;
L
Liu Jicong 已提交
261 262
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
263
      conf->autoCommit = false;
L
Liu Jicong 已提交
264 265 266 267
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
268
  }
L
Liu Jicong 已提交
269

L
Liu Jicong 已提交
270 271 272 273 274
  if (strcmp(key, "auto.commit.interval.ms") == 0) {
    conf->autoCommitInterval = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
275 276 277 278 279 280 281 282 283 284 285 286 287 288
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
289

290 291
  if (strcmp(key, "msg.with.table.name") == 0) {
    if (strcmp(value, "true") == 0) {
292
      conf->withTbName = true;
L
Liu Jicong 已提交
293
      return TMQ_CONF_OK;
294
    } else if (strcmp(value, "false") == 0) {
295
      conf->withTbName = false;
L
Liu Jicong 已提交
296
      return TMQ_CONF_OK;
297 298 299 300 301
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
302
  if (strcmp(key, "experimental.snapshot.enable") == 0) {
L
Liu Jicong 已提交
303
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
304
      conf->snapEnable = true;
305 306
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
307
      conf->snapEnable = false;
308 309 310 311 312 313
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
314 315 316 317 318
  if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
    conf->snapBatchSize = atoi(value);
    return TMQ_CONF_OK;
  }

319 320 321
  if (strcmp(key, "enable.heartbeat.background") == 0) {
    if (strcmp(value, "true") == 0) {
      conf->hbBgEnable = true;
L
Liu Jicong 已提交
322 323
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
324
      conf->hbBgEnable = false;
L
Liu Jicong 已提交
325 326 327 328
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
329
    return TMQ_CONF_OK;
L
Liu Jicong 已提交
330 331
  }

L
Liu Jicong 已提交
332
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
333 334 335
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
336
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
337 338 339
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
340
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
341 342 343
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
344
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
345 346 347
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
348
  if (strcmp(key, "td.connect.db") == 0) {
349
    /*conf->db = strdup(value);*/
L
Liu Jicong 已提交
350 351 352
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
353
  return TMQ_CONF_UNKNOWN;
354 355 356
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
357 358
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
359 360
}

L
Liu Jicong 已提交
361 362
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
363 364 365 366 367
  if (src == NULL || src[0] == 0) return -1;
  char* topic = strdup(src);
  if (topic[0] != '`') {
    strtolower(topic, src);
  }
L
fix  
Liu Jicong 已提交
368
  if (taosArrayPush(container, &topic) == NULL) return -1;
369 370 371
  return 0;
}

L
Liu Jicong 已提交
372
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
373
  SArray* container = &list->container;
L
Liu Jicong 已提交
374
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
375 376
}

L
Liu Jicong 已提交
377 378 379 380 381 382 383 384 385 386
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

L
Liu Jicong 已提交
387 388 389 390
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422
int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParamSet->refId);
  if (tmq == NULL) {
    if (!pParamSet->async) {
      tsem_destroy(&pParamSet->rspSem);
    }
    taosMemoryFree(pParamSet);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  // if no more waiting rsp
  if (pParamSet->async) {
    // call async cb func
    if (pParamSet->automatic && tmq->commitCb) {
      tmq->commitCb(tmq, pParamSet->rspErr, tmq->commitCbUserParam);
    } else if (!pParamSet->automatic && pParamSet->userCb) {
      // sem post
      pParamSet->userCb(tmq, pParamSet->rspErr, pParamSet->userParam);
    }
    taosMemoryFree(pParamSet);
  } else {
    tsem_post(&pParamSet->rspSem);
  }

#if 0
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
#endif
  return 0;
}

L
Liu Jicong 已提交
423 424 425 426 427 428 429
static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet) {
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
  if (waitingRspNum == 0) {
    tmqCommitDone(pParamSet);
  }
}

430 431
int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
432 433
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
  // push into array
L
Liu Jicong 已提交
434
#if 0
435 436 437 438 439
  if (code == 0) {
    taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset);
  } else {
    taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
  }
L
Liu Jicong 已提交
440
#endif
L
Liu Jicong 已提交
441

442 443 444 445 446 447 448 449 450 451 452
  // there may be race condition. fix it
  if (pBuf->pEpSet != NULL && pParam->pMqVg != NULL) {
    SMqClientVg* pMqVg = pParam->pMqVg;

    SEp* pEp = GET_ACTIVE_EP(pBuf->pEpSet);
    SEp* pOld = GET_ACTIVE_EP(&(pMqVg->epSet));
    uDebug("subKey:%s update the epset vgId:%d, ep:%s:%d, old ep:%s:%d", pParam->pOffset->subKey, pMqVg->vgId,
           pEp->fqdn, pEp->port, pOld->fqdn, pOld->port);
    pParam->pMqVg->epSet = *pBuf->pEpSet;
  }

L
Liu Jicong 已提交
453
  taosMemoryFree(pParam->pOffset);
L
Liu Jicong 已提交
454
  taosMemoryFree(pBuf->pData);
dengyihao's avatar
dengyihao 已提交
455
  taosMemoryFree(pBuf->pEpSet);
L
Liu Jicong 已提交
456

S
Shengliang Guan 已提交
457
  /*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
L
Liu Jicong 已提交
458 459
   * pOffset->version);*/

L
Liu Jicong 已提交
460
  tmqCommitRspCountDown(pParamSet);
461 462 463
  return 0;
}

L
Liu Jicong 已提交
464 465 466 467 468 469
static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pTopic, SMqCommitCbParamSet* pParamSet) {
  STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
  if (pOffset == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
470

L
Liu Jicong 已提交
471
  pOffset->val = pVg->currentOffset;
472

L
Liu Jicong 已提交
473 474 475 476
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pOffset->subKey, tmq->groupId, groupLen);
  pOffset->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);
L
Liu Jicong 已提交
477

L
Liu Jicong 已提交
478 479 480 481 482 483
  int32_t len;
  int32_t code;
  tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
  if (code < 0) {
    return -1;
  }
484

L
Liu Jicong 已提交
485
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
L
Liu Jicong 已提交
486 487 488 489
  if (buf == NULL) {
    taosMemoryFree(pOffset);
    return -1;
  }
490

L
Liu Jicong 已提交
491
  ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
492

L
Liu Jicong 已提交
493 494 495 496 497
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
  tEncodeSTqOffset(&encoder, pOffset);
L
Liu Jicong 已提交
498
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
499 500

  // build param
501
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
502
  if (pParam == NULL) {
L
Liu Jicong 已提交
503
    taosMemoryFree(pOffset);
L
Liu Jicong 已提交
504 505 506
    taosMemoryFree(buf);
    return -1;
  }
507

L
Liu Jicong 已提交
508 509
  pParam->params = pParamSet;
  pParam->pOffset = pOffset;
510
  pParam->pMqVg = pVg;  // there may be an race condition
L
Liu Jicong 已提交
511 512 513 514

  // build send info
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
L
Liu Jicong 已提交
515
    taosMemoryFree(pOffset);
L
Liu Jicong 已提交
516 517
    taosMemoryFree(buf);
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
518 519
    return -1;
  }
520

L
Liu Jicong 已提交
521 522 523 524 525 526
  pMsgSendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = sizeof(SMsgHead) + len,
      .handle = NULL,
  };

527 528 529
  SEp* pEp = &pVg->epSet.eps[pVg->epSet.inUse];
  tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d offset:%" PRId64" prev:%"PRId64", ep:%s:%d", tmq->consumerId, pOffset->subKey,
           pVg->vgId, pOffset->val.version, pVg->committedOffset.version, pEp->fqdn, pEp->port);
L
Liu Jicong 已提交
530

531
  // TODO: put into cb, the commit offset should be move to the callback function
L
Liu Jicong 已提交
532
  pVg->committedOffset = pVg->currentOffset;
L
Liu Jicong 已提交
533 534 535 536

  pMsgSendInfo->requestId = generateRequestId();
  pMsgSendInfo->requestObjRefId = 0;
  pMsgSendInfo->param = pParam;
L
Liu Jicong 已提交
537
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
538
  pMsgSendInfo->fp = tmqCommitCb;
L
Liu Jicong 已提交
539
  pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET;
L
Liu Jicong 已提交
540 541
  // send msg

L
Liu Jicong 已提交
542 543 544
  atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
  atomic_add_fetch_32(&pParamSet->totalRspNum, 1);

L
Liu Jicong 已提交
545 546 547 548 549 550 551 552
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
  return 0;
}

int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
  char*   topic;
  int32_t vgId;
H
Haojun Liao 已提交
553

L
Liu Jicong 已提交
554 555 556 557 558 559 560 561
  if (TD_RES_TMQ(msg)) {
    SMqRspObj* pRspObj = (SMqRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
  } else if (TD_RES_TMQ_META(msg)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
    topic = pMetaRspObj->topic;
    vgId = pMetaRspObj->vgId;
L
Liu Jicong 已提交
562
  } else if (TD_RES_TMQ_METADATA(msg)) {
563 564 565
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
L
Liu Jicong 已提交
566 567 568 569 570 571 572 573 574
  } else {
    return TSDB_CODE_TMQ_INVALID_MSG;
  }

  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
575 576
  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;
L
Liu Jicong 已提交
577 578 579 580 581 582
  pParamSet->automatic = 0;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

L
Liu Jicong 已提交
583 584
  int32_t code = -1;

L
Liu Jicong 已提交
585 586 587 588 589 590 591
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(pTopic->topicName, topic) != 0) continue;
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      if (pVg->vgId != vgId) continue;

L
Liu Jicong 已提交
592
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
L
Liu Jicong 已提交
593
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
L
Liu Jicong 已提交
594 595
          tsem_destroy(&pParamSet->rspSem);
          taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
596
          goto FAIL;
L
Liu Jicong 已提交
597
        }
L
Liu Jicong 已提交
598
        goto HANDLE_RSP;
L
Liu Jicong 已提交
599 600
      }
    }
L
Liu Jicong 已提交
601
  }
L
Liu Jicong 已提交
602

L
Liu Jicong 已提交
603 604 605 606
HANDLE_RSP:
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
607 608 609
    return 0;
  }

L
Liu Jicong 已提交
610 611 612 613
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
L
Liu Jicong 已提交
614
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
615 616 617 618 619 620 621 622 623 624 625 626
    return code;
  } else {
    code = 0;
  }

FAIL:
  if (code != 0 && async) {
    userCb(tmq, code, userParam);
  }
  return 0;
}

L
Liu Jicong 已提交
627 628
static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                                     void* userParam) {
L
Liu Jicong 已提交
629 630
  int32_t code = -1;

631 632
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
L
Liu Jicong 已提交
633 634 635 636 637 638 639 640
    code = TSDB_CODE_OUT_OF_MEMORY;
    if (async) {
      if (automatic) {
        tmq->commitCb(tmq, code, tmq->commitCbUserParam);
      } else {
        userCb(tmq, code, userParam);
      }
    }
641 642
    return -1;
  }
643 644 645 646

  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;

647 648 649 650 651 652
  pParamSet->automatic = automatic;
  pParamSet->async = async;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

653 654 655
  // init as 1 to prevent concurrency issue
  pParamSet->waitingRspNum = 1;

656 657 658 659
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
  tscDebug("consumer:0x%"PRIx64" start to commit offset for %d topics", tmq->consumerId, numOfTopics);

  for (int32_t i = 0; i < numOfTopics; i++) {
660
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
661

662 663
    int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
    for (int32_t j = 0; j < numOfVgroups; j++) {
L
Liu Jicong 已提交
664
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
L
Liu Jicong 已提交
665
      if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
L
Liu Jicong 已提交
666 667 668
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          continue;
        }
669 670 671
      } else {
        tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, not commit, current:%" PRId64 ", ordinal:%d/%d",
                 tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->currentOffset.version, j + 1, numOfVgroups);
672 673 674 675
      }
    }
  }

L
Liu Jicong 已提交
676
  // no request is sent
L
Liu Jicong 已提交
677 678 679 680 681 682
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
    return 0;
  }

L
Liu Jicong 已提交
683 684
  // count down since waiting rsp num init as 1
  tmqCommitRspCountDown(pParamSet);
685

686 687 688 689
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
690
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
691
#if 0
692 693
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
L
Liu Jicong 已提交
694
#endif
L
Liu Jicong 已提交
695
  }
696

L
Liu Jicong 已提交
697 698 699 700 701 702 703 704 705 706
  return code;
}

int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                       void* userParam) {
  if (msg) {
    return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
  } else {
    return tmqCommitConsumerImpl(tmq, automatic, async, userCb, userParam);
  }
707 708
}

709
void tmqAssignAskEpTask(void* param, void* tmrId) {
710 711 712
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
S
Shengliang Guan 已提交
713
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
714 715 716 717 718
    *pTaskType = TMQ_DELAYED_TASK__ASK_EP;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
719 720 721
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
722 723 724
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
S
Shengliang Guan 已提交
725
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
726 727 728 729 730
    *pTaskType = TMQ_DELAYED_TASK__COMMIT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
731 732 733
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
734 735 736
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
S
Shengliang Guan 已提交
737
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
738 739 740 741 742
    *pTaskType = TMQ_DELAYED_TASK__REPORT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
  taosMemoryFree(param);
L
Liu Jicong 已提交
743 744
}

745
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
dengyihao's avatar
dengyihao 已提交
746 747 748 749
  if (pMsg) {
    taosMemoryFree(pMsg->pData);
    taosMemoryFree(pMsg->pEpSet);
  }
750 751 752 753
  return 0;
}

void tmqSendHbReq(void* param, void* tmrId) {
754 755 756
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq == NULL) {
L
Liu Jicong 已提交
757
    taosMemoryFree(param);
758 759
    return;
  }
D
dapan1121 已提交
760 761 762 763 764

  SMqHbReq req = {0};
  req.consumerId = tmq->consumerId;
  req.epoch = tmq->epoch;

L
Liu Jicong 已提交
765
  int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
D
dapan1121 已提交
766 767 768 769
  if (tlen < 0) {
    tscError("tSerializeSMqHbReq failed");
    return;
  }
L
Liu Jicong 已提交
770
  void* pReq = taosMemoryCalloc(1, tlen);
D
dapan1121 已提交
771 772 773 774 775 776 777 778 779
  if (tlen < 0) {
    tscError("failed to malloc MqHbReq msg, size:%d", tlen);
    return;
  }
  if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) {
    tscError("tSerializeSMqHbReq %d failed", tlen);
    taosMemoryFree(pReq);
    return;
  }
780 781 782 783

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
784
    goto OVER;
785 786 787
  }
  sendInfo->msgInfo = (SDataBuf){
      .pData = pReq,
D
dapan1121 已提交
788
      .len = tlen,
789 790 791 792 793 794 795
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = NULL;
  sendInfo->fp = tmqHbCb;
L
Liu Jicong 已提交
796
  sendInfo->msgType = TDMT_MND_TMQ_HB;
797 798 799 800 801 802 803

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

OVER:
804
  taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer);
805 806
}

807
int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
L
Liu Jicong 已提交
808
  STaosQall* qall = taosAllocateQall();
809 810
  taosReadAllQitems(pTmq->delayedTask, qall);

811 812 813 814
  if (qall->numOfItems == 0) {
    taosFreeQall(qall);
    return TSDB_CODE_SUCCESS;
  }
815

816 817 818
  tscDebug("consumer:0x%"PRIx64" handle delayed %d tasks before poll data", pTmq->consumerId, qall->numOfItems);
  int8_t* pTaskType = NULL;
  taosGetQitem(qall, (void**)&pTaskType);
L
Liu Jicong 已提交
819

820
  while (pTaskType != NULL) {
821
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
822
      tmqAskEp(pTmq, true);
823 824

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
825
      *pRefId = pTmq->refId;
826

827
      tscDebug("consumer:0x%"PRIx64" next retrieve ep from mnode in 1s", pTmq->consumerId);
828
      taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer);
L
Liu Jicong 已提交
829
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
830
      tmqCommitInner(pTmq, NULL, 1, 1, pTmq->commitCb, pTmq->commitCbUserParam);
831 832

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
833
      *pRefId = pTmq->refId;
834

835
      tscDebug("consumer:0x%"PRIx64" next commit to mnode in %.2fs", pTmq->consumerId, pTmq->autoCommitInterval/1000.0);
836
      taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer);
L
Liu Jicong 已提交
837
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
838
      // do nothing
L
Liu Jicong 已提交
839
    } else {
840
      ASSERT(0);
L
Liu Jicong 已提交
841
    }
842

L
Liu Jicong 已提交
843
    taosFreeQitem(pTaskType);
844
    taosGetQitem(qall, (void**)&pTaskType);
L
Liu Jicong 已提交
845
  }
846

L
Liu Jicong 已提交
847 848 849 850
  taosFreeQall(qall);
  return 0;
}

L
Liu Jicong 已提交
851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877
static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
    // do nothing
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
    SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
    tDeleteSMqAskEpRsp(&pEpRspWrapper->msg);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
    taosArrayDestroyP(pRsp->dataRsp.blockData, taosMemoryFree);
    taosArrayDestroy(pRsp->dataRsp.blockDataLen);
    taosArrayDestroyP(pRsp->dataRsp.blockTbName, taosMemoryFree);
    taosArrayDestroyP(pRsp->dataRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
    taosMemoryFree(pRsp->metaRsp.metaRsp);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
    taosArrayDestroyP(pRsp->taosxRsp.blockData, taosMemoryFree);
    taosArrayDestroy(pRsp->taosxRsp.blockDataLen);
    taosArrayDestroyP(pRsp->taosxRsp.blockTbName, taosMemoryFree);
    taosArrayDestroyP(pRsp->taosxRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
    // taosx
    taosArrayDestroy(pRsp->taosxRsp.createTableLen);
    taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree);
  }
}

L
Liu Jicong 已提交
878
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
879
  SMqRspWrapper* rspWrapper = NULL;
L
Liu Jicong 已提交
880
  while (1) {
L
Liu Jicong 已提交
881 882 883 884 885
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper) {
      tmqFreeRspWrapper(rspWrapper);
      taosFreeQitem(rspWrapper);
    } else {
L
Liu Jicong 已提交
886
      break;
L
Liu Jicong 已提交
887
    }
L
Liu Jicong 已提交
888 889
  }

L
Liu Jicong 已提交
890
  rspWrapper = NULL;
L
Liu Jicong 已提交
891 892
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
L
Liu Jicong 已提交
893 894 895 896 897
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper) {
      tmqFreeRspWrapper(rspWrapper);
      taosFreeQitem(rspWrapper);
    } else {
L
Liu Jicong 已提交
898
      break;
L
Liu Jicong 已提交
899
    }
L
Liu Jicong 已提交
900 901 902
  }
}

D
dapan1121 已提交
903
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
904 905
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
dengyihao's avatar
dengyihao 已提交
906 907

  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
908 909 910
  tsem_post(&pParam->rspSem);
  return 0;
}
911

L
Liu Jicong 已提交
912
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
X
Xiaoyu Wang 已提交
913 914 915 916
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
917
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
918
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
919
  }
L
Liu Jicong 已提交
920
  return 0;
X
Xiaoyu Wang 已提交
921 922
}

L
Liu Jicong 已提交
923
int32_t tmq_unsubscribe(tmq_t* tmq) {
L
Liu Jicong 已提交
924 925
  int32_t     rsp;
  int32_t     retryCnt = 0;
L
Liu Jicong 已提交
926
  tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
927 928 929 930 931 932 933 934 935 936
  while (1) {
    rsp = tmq_subscribe(tmq, lst);
    if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
      break;
    } else {
      retryCnt++;
      taosMsleep(500);
    }
  }

L
Liu Jicong 已提交
937 938
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
939 940
}

941 942
void tmqFreeImpl(void* handle) {
  tmq_t* tmq = (tmq_t*)handle;
L
Liu Jicong 已提交
943

944
  // TODO stop timer
L
Liu Jicong 已提交
945 946 947 948
  if (tmq->mqueue) {
    tmqClearUnhandleMsg(tmq);
    taosCloseQueue(tmq->mqueue);
  }
949
  if (tmq->delayedTask) taosCloseQueue(tmq->delayedTask);
L
Liu Jicong 已提交
950
  taosFreeQall(tmq->qall);
L
Liu Jicong 已提交
951

952
  tsem_destroy(&tmq->rspSem);
L
Liu Jicong 已提交
953

954 955 956
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
957
    taosMemoryFreeClear(pTopic->schema.pSchema);
958 959 960 961 962
    taosArrayDestroy(pTopic->vgs);
  }
  taosArrayDestroy(tmq->clientTopics);
  taos_close_internal(tmq->pTscObj);
  taosMemoryFree(tmq);
L
Liu Jicong 已提交
963 964
}

965 966 967 968 969 970 971 972 973
static void tmqMgmtInit(void) {
  tmqInitRes = 0;
  tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");

  if (tmqMgmt.timer == NULL) {
    tmqInitRes = TSDB_CODE_OUT_OF_MEMORY;
  }

  tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
974
  if (tmqMgmt.rsetId < 0) {
975 976 977 978
    tmqInitRes = terrno;
  }
}

L
Liu Jicong 已提交
979
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
980 981 982 983
  taosThreadOnce(&tmqInit, tmqMgmtInit);
  if (tmqInitRes != 0) {
    terrno = tmqInitRes;
    return NULL;
L
Liu Jicong 已提交
984 985
  }

L
Liu Jicong 已提交
986 987
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
L
Liu Jicong 已提交
988
    terrno = TSDB_CODE_OUT_OF_MEMORY;
989
    tscError("failed to create consumer, consumer group %s, code:%s", conf->groupId, terrstr());
L
Liu Jicong 已提交
990 991
    return NULL;
  }
L
Liu Jicong 已提交
992

L
Liu Jicong 已提交
993 994 995
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

L
Liu Jicong 已提交
996 997 998 999 1000
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
  pTmq->delayedTask = taosOpenQueue();

H
Haojun Liao 已提交
1001
  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL || conf->groupId[0] == 0) {
L
Liu Jicong 已提交
1002
    terrno = TSDB_CODE_OUT_OF_MEMORY;
1003
    tscError("consumer:0x%" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
S
Shengliang Guan 已提交
1004
             pTmq->groupId);
L
Liu Jicong 已提交
1005 1006
    goto FAIL;
  }
L
Liu Jicong 已提交
1007

L
Liu Jicong 已提交
1008 1009
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
1010 1011
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
1012 1013
  /*pTmq->epStatus = 0;*/
  /*pTmq->epSkipCnt = 0;*/
L
Liu Jicong 已提交
1014

L
Liu Jicong 已提交
1015 1016 1017
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
1018
  pTmq->withTbName = conf->withTbName;
L
Liu Jicong 已提交
1019
  pTmq->useSnapshot = conf->snapEnable;
L
Liu Jicong 已提交
1020
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
1021
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
1022 1023
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
1024 1025
  pTmq->resetOffsetCfg = conf->resetOffset;

1026 1027
  pTmq->hbBgEnable = conf->hbBgEnable;

L
Liu Jicong 已提交
1028
  // assign consumerId
L
Liu Jicong 已提交
1029
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
1030

L
Liu Jicong 已提交
1031 1032
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
1033
    tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
S
Shengliang Guan 已提交
1034
             pTmq->groupId);
L
Liu Jicong 已提交
1035 1036
    goto FAIL;
  }
L
Liu Jicong 已提交
1037

L
Liu Jicong 已提交
1038 1039 1040
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
1041
    tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
S
Shengliang Guan 已提交
1042
             pTmq->groupId);
L
Liu Jicong 已提交
1043 1044 1045
    tsem_destroy(&pTmq->rspSem);
    goto FAIL;
  }
L
Liu Jicong 已提交
1046

1047 1048 1049 1050 1051 1052
  pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
  if (pTmq->refId < 0) {
    tmqFreeImpl(pTmq);
    return NULL;
  }

1053
  if (pTmq->hbBgEnable) {
L
Liu Jicong 已提交
1054 1055
    int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
    *pRefId = pTmq->refId;
1056
    pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
1057 1058
  }

1059
  tscInfo("consumer:0x%" PRIx64 " is setup, consumer groupId %s", pTmq->consumerId, pTmq->groupId);
1060
  return pTmq;
L
Liu Jicong 已提交
1061 1062 1063 1064 1065 1066 1067

FAIL:
  if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
  if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
  if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
  if (pTmq->qall) taosFreeQall(pTmq->qall);
  taosMemoryFree(pTmq);
1068

L
Liu Jicong 已提交
1069
  return NULL;
1070 1071
}

L
Liu Jicong 已提交
1072
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
L
Liu Jicong 已提交
1073 1074 1075
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
L
Liu Jicong 已提交
1076
  SMsgSendInfo*   sendInfo = NULL;
L
Liu Jicong 已提交
1077
  SCMSubscribeReq req = {0};
1078
  int32_t         code = 0;
1079

1080
  tscDebug("consumer:0x%"PRIx64" subscribe %d topics", tmq->consumerId, sz);
L
Liu Jicong 已提交
1081

1082
  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
1083
  tstrncpy(req.clientId, tmq->clientId, 256);
L
Liu Jicong 已提交
1084
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
1085 1086
  req.topicNames = taosArrayInit(sz, sizeof(void*));

1087 1088 1089 1090
  if (req.topicNames == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
1091

L
Liu Jicong 已提交
1092 1093
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
1094 1095

    SName name = {0};
L
Liu Jicong 已提交
1096 1097 1098 1099
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
1100 1101
    }

1102
    tNameExtractFullName(&name, topicFName);
1103
    tscDebug("consumer:0x%"PRIx64" subscribe topic:%s", tmq->consumerId, topicFName);
L
Liu Jicong 已提交
1104 1105

    taosArrayPush(req.topicNames, &topicFName);
1106 1107
  }

L
Liu Jicong 已提交
1108
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
1109

L
Liu Jicong 已提交
1110
  buf = taosMemoryMalloc(tlen);
1111 1112 1113 1114
  if (buf == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
L
Liu Jicong 已提交
1115

1116 1117 1118
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

L
Liu Jicong 已提交
1119
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
1120 1121 1122 1123
  if (sendInfo == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
1124

X
Xiaoyu Wang 已提交
1125
  SMqSubscribeCbParam param = {
L
Liu Jicong 已提交
1126
      .rspErr = 0,
1127 1128
      .refId = tmq->refId,
      .epoch = tmq->epoch,
X
Xiaoyu Wang 已提交
1129
  };
L
Liu Jicong 已提交
1130

1131 1132 1133
  if (tsem_init(&param.rspSem, 0, 0) != 0) {
    goto FAIL;
  }
L
Liu Jicong 已提交
1134 1135

  sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1136 1137 1138 1139
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
1140

L
Liu Jicong 已提交
1141 1142
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1143 1144
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
1145
  sendInfo->msgType = TDMT_MND_TMQ_SUBSCRIBE;
L
Liu Jicong 已提交
1146

1147 1148 1149 1150 1151
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
1152 1153
  // avoid double free if msg is sent
  buf = NULL;
L
Liu Jicong 已提交
1154
  sendInfo = NULL;
L
Liu Jicong 已提交
1155

L
Liu Jicong 已提交
1156 1157
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
1158

1159 1160 1161 1162
  if (param.rspErr != 0) {
    code = param.rspErr;
    goto FAIL;
  }
L
Liu Jicong 已提交
1163

L
Liu Jicong 已提交
1164
  int32_t retryCnt = 0;
L
Liu Jicong 已提交
1165
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
L
Liu Jicong 已提交
1166 1167 1168
    if (retryCnt++ > 10) {
      goto FAIL;
    }
1169

1170
    tscDebug("consumer:0x%"PRIx64", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
L
Liu Jicong 已提交
1171 1172
    taosMsleep(500);
  }
1173

1174 1175
  // init ep timer
  if (tmq->epTimer == NULL) {
1176 1177 1178
    int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId1 = tmq->refId;
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, pRefId1, tmqMgmt.timer);
1179
  }
L
Liu Jicong 已提交
1180 1181

  // init auto commit timer
1182
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
1183 1184 1185
    int64_t* pRefId2 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId2 = tmq->refId;
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId2, tmqMgmt.timer);
L
Liu Jicong 已提交
1186 1187
  }

L
Liu Jicong 已提交
1188
FAIL:
L
Liu Jicong 已提交
1189
  taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
1190
  taosMemoryFree(buf);
L
Liu Jicong 已提交
1191
  taosMemoryFree(sendInfo);
L
Liu Jicong 已提交
1192

L
Liu Jicong 已提交
1193
  return code;
1194 1195
}

L
Liu Jicong 已提交
1196
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
1197
  //
1198
  conf->commitCb = cb;
L
Liu Jicong 已提交
1199
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
1200
}
1201

D
dapan1121 已提交
1202
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1203 1204
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
1205
  SMqClientTopic* pTopic = pParam->pTopic;
1206 1207 1208 1209 1210 1211

  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
  if (tmq == NULL) {
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1212
    taosMemoryFree(pMsg->pEpSet);
1213 1214 1215 1216 1217 1218
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  int32_t epoch = pParam->epoch;
  int32_t vgId = pParam->vgId;
L
Liu Jicong 已提交
1219
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
1220
  if (code != 0) {
L
Liu Jicong 已提交
1221
    tscWarn("msg discard from vgId:%d, epoch %d, since %s", vgId, epoch, terrstr());
L
Liu Jicong 已提交
1222
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1223 1224
    if (pMsg->pEpSet) taosMemoryFree(pMsg->pEpSet);

L
Liu Jicong 已提交
1225 1226 1227 1228
    if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
      goto CREATE_MSG_FAIL;
    }
L
Liu Jicong 已提交
1229
    if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
S
Shengliang Guan 已提交
1230
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1231
      if (pRspWrapper == NULL) {
S
Shengliang Guan 已提交
1232
        tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
Liu Jicong 已提交
1233 1234 1235 1236 1237 1238 1239 1240
        goto CREATE_MSG_FAIL;
      }
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      /*pRspWrapper->vgHandle = pVg;*/
      /*pRspWrapper->topicHandle = pTopic;*/
      taosWriteQitem(tmq->mqueue, pRspWrapper);
      tsem_post(&tmq->rspSem);
    }
L
fix txn  
Liu Jicong 已提交
1241
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1242 1243
  }

X
Xiaoyu Wang 已提交
1244 1245 1246
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
1247
    // do not write into queue since updating epoch reset
S
Shengliang Guan 已提交
1248
    tscWarn("msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d", vgId, msgEpoch,
L
Liu Jicong 已提交
1249
            tmqEpoch);
1250
    tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1251
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1252
    taosMemoryFree(pMsg->pEpSet);
X
Xiaoyu Wang 已提交
1253 1254 1255 1256
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
S
Shengliang Guan 已提交
1257
    tscWarn("mismatch rsp from vgId:%d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
X
Xiaoyu Wang 已提交
1258 1259
  }

L
Liu Jicong 已提交
1260 1261 1262
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

S
Shengliang Guan 已提交
1263
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1264
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1265
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1266
    taosMemoryFree(pMsg->pEpSet);
S
Shengliang Guan 已提交
1267
    tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
L
fix txn  
Liu Jicong 已提交
1268
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1269
  }
L
Liu Jicong 已提交
1270

L
Liu Jicong 已提交
1271
  pRspWrapper->tmqRspType = rspType;
L
Liu Jicong 已提交
1272 1273
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
1274

L
Liu Jicong 已提交
1275
  if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1276 1277 1278
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
wmmhello's avatar
wmmhello 已提交
1279
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1280
    memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1281

1282
    tscDebug("consumer:0x%" PRIx64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d",
L
Liu Jicong 已提交
1283 1284 1285 1286
             tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version,
             rspType);

  } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
1287 1288 1289 1290
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqMetaRsp(&decoder, &pRspWrapper->metaRsp);
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1291
    memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1292 1293 1294 1295 1296 1297 1298
  } else if (rspType == TMQ_MSG_TYPE__TAOSX_RSP) {
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp);
    tDecoderClear(&decoder);
    memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead));
  } else {
1299
    ASSERT(0);
L
Liu Jicong 已提交
1300
  }
L
Liu Jicong 已提交
1301

L
Liu Jicong 已提交
1302
  taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1303
  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
1304

1305
  tscDebug("consumer:0x%" PRIx64 ", put poll res into mqueue %p", tmq->consumerId, pRspWrapper);
L
Liu Jicong 已提交
1306

L
Liu Jicong 已提交
1307
  taosWriteQitem(tmq->mqueue, pRspWrapper);
1308
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1309

L
Liu Jicong 已提交
1310
  return 0;
L
fix txn  
Liu Jicong 已提交
1311
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1312
  if (epoch == tmq->epoch) {
L
Liu Jicong 已提交
1313 1314
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
1315
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1316
  return -1;
1317 1318
}

L
Liu Jicong 已提交
1319
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
1320 1321
  bool set = false;

1322
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
1323
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
1324

1325
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
1326
  tscDebug("consumer:0x%" PRIx64" update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
1327
           tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur);
1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }
1339

1340 1341 1342 1343 1344
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
1345
      tscDebug("consumer:0x%" PRIx64 ", new vg num: %d", tmq->consumerId, vgNumCur);
1346 1347 1348
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
        sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
L
Liu Jicong 已提交
1349
        char buf[80];
L
Liu Jicong 已提交
1350
        tFormatOffset(buf, 80, &pVgCur->currentOffset);
1351
        tscDebug("consumer:0x%" PRIx64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch,
L
Liu Jicong 已提交
1352
                 pVgCur->vgId, vgKey, buf);
L
Liu Jicong 已提交
1353
        taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal));
1354 1355 1356 1357 1358 1359 1360 1361
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
    topic.schema = pTopicEp->schema;
L
Liu Jicong 已提交
1362 1363
    pTopicEp->schema.nCols = 0;
    pTopicEp->schema.pSchema = NULL;
1364
    tstrncpy(topic.topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
1365 1366
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);

1367
    tscDebug("consumer:0x%" PRIx64 ", update topic: %s", tmq->consumerId, topic.topicName);
1368 1369 1370 1371 1372 1373

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
L
Liu Jicong 已提交
1374 1375
      STqOffsetVal* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      STqOffsetVal  offsetNew = {.type = tmq->resetOffsetCfg};
1376
      if (pOffset != NULL) {
L
Liu Jicong 已提交
1377
        offsetNew = *pOffset;
1378 1379 1380 1381
      }

      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1382
          .currentOffset = offsetNew,
1383 1384 1385 1386 1387 1388 1389 1390 1391 1392
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
          .vgSkipCnt = 0,
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
    taosArrayPush(newTopics, &topic);
  }
1393 1394

  // destroy current buffered existed topics info
1395 1396 1397 1398
  if (tmq->clientTopics) {
    int32_t sz = taosArrayGetSize(tmq->clientTopics);
    for (int32_t i = 0; i < sz; i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
1399
      if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
1400
      taosArrayDestroy(pTopic->vgs);
L
Liu Jicong 已提交
1401
    }
1402

1403
    taosArrayDestroy(tmq->clientTopics);
X
Xiaoyu Wang 已提交
1404
  }
1405

L
Liu Jicong 已提交
1406
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
1407
  tmq->clientTopics = newTopics;
1408

1409
  if (taosArrayGetSize(tmq->clientTopics) == 0) {
1410
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
1411
  } else {
1412
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
1413
  }
1414

X
Xiaoyu Wang 已提交
1415
  atomic_store_32(&tmq->epoch, epoch);
1416
  tscDebug("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId);
X
Xiaoyu Wang 已提交
1417 1418 1419
  return set;
}

D
dapan1121 已提交
1420
int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
1421
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1422
  int8_t           async = pParam->async;
1423 1424 1425 1426 1427 1428 1429 1430 1431
  tmq_t*           tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);

  if (tmq == NULL) {
    if (!async) {
      tsem_destroy(&pParam->rspSem);
    } else {
      taosMemoryFree(pParam);
    }
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1432
    taosMemoryFree(pMsg->pEpSet);
1433 1434 1435 1436
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

L
Liu Jicong 已提交
1437
  pParam->code = code;
1438
  if (code != 0) {
1439 1440
    tscError("consumer:0x%" PRIx64 ", get topic endpoint error, async:%d, code:%s", tmq->consumerId,
             pParam->async, tstrerror(code));
L
Liu Jicong 已提交
1441
    goto END;
1442
  }
L
Liu Jicong 已提交
1443

L
Liu Jicong 已提交
1444
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1445
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1446
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1447 1448 1449
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
  if (head->epoch <= epoch) {
1450 1451
    tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep",
             tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1452
    goto END;
1453
  }
L
Liu Jicong 已提交
1454

1455 1456 1457
  tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId,
           head->epoch, epoch);

L
Liu Jicong 已提交
1458
  if (!async) {
L
Liu Jicong 已提交
1459 1460
    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
S
Shengliang Guan 已提交
1461 1462
    /*printf("rsp epoch %" PRId64 " sz %" PRId64 "\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %" PRId64 " sz %" PRId64 "\n", tmq->epoch, tmq->clientTopics->size);*/
L
Liu Jicong 已提交
1463
    tmqUpdateEp(tmq, head->epoch, &rsp);
L
Liu Jicong 已提交
1464
    tDeleteSMqAskEpRsp(&rsp);
X
Xiaoyu Wang 已提交
1465
  } else {
S
Shengliang Guan 已提交
1466
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1467
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1468
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1469 1470
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1471
    }
1472

L
Liu Jicong 已提交
1473 1474 1475
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1476
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1477

L
Liu Jicong 已提交
1478
    taosWriteQitem(tmq->mqueue, pWrapper);
1479
    tsem_post(&tmq->rspSem);
1480
  }
L
Liu Jicong 已提交
1481 1482

END:
L
Liu Jicong 已提交
1483
  /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1484
  if (!async) {
L
Liu Jicong 已提交
1485
    tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
1486 1487
  } else {
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
1488
  }
dengyihao's avatar
dengyihao 已提交
1489 1490

  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
1491
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1492
  return code;
1493 1494
}

L
Liu Jicong 已提交
1495
int32_t tmqAskEp(tmq_t* tmq, bool async) {
1496
  int32_t code = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1497
#if 0
L
Liu Jicong 已提交
1498
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
L
fix txn  
Liu Jicong 已提交
1499
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
1500
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
1501
    tscTrace("consumer:0x%" PRIx64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
1502
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
1503
  }
L
temp  
Liu Jicong 已提交
1504
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
1505
#endif
1506

D
dapan1121 已提交
1507 1508 1509 1510 1511
  SMqAskEpReq req = {0};
  req.consumerId = tmq->consumerId;
  req.epoch = tmq->epoch;
  strcpy(req.cgroup, tmq->groupId);

L
Liu Jicong 已提交
1512
  int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
D
dapan1121 已提交
1513
  if (tlen < 0) {
1514
    tscError("consumer:0x%"PRIx64", tSerializeSMqAskEpReq failed", tmq->consumerId);
D
dapan1121 已提交
1515 1516
    return -1;
  }
1517

L
Liu Jicong 已提交
1518
  void* pReq = taosMemoryCalloc(1, tlen);
L
Liu Jicong 已提交
1519
  if (pReq == NULL) {
1520 1521
    tscError("consumer:0x%"PRIx64", failed to malloc askEpReq msg, size:%d", tmq->consumerId, tlen);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
1522 1523
    return -1;
  }
1524

D
dapan1121 已提交
1525
  if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
1526
    tscError("consumer:0x%"PRIx64", tSerializeSMqAskEpReq %d failed", tmq->consumerId, tlen);
D
dapan1121 已提交
1527
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
1528
    return -1;
L
Liu Jicong 已提交
1529
  }
1530

L
Liu Jicong 已提交
1531
  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
1532
  if (pParam == NULL) {
1533
    tscError("consumer:0x%"PRIx64", failed to malloc subscribe param", tmq->consumerId);
D
dapan1121 已提交
1534
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
1535
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1536
    return -1;
L
Liu Jicong 已提交
1537
  }
1538

1539 1540
  pParam->refId = tmq->refId;
  pParam->epoch = tmq->epoch;
L
Liu Jicong 已提交
1541
  pParam->async = async;
X
Xiaoyu Wang 已提交
1542
  tsem_init(&pParam->rspSem, 0, 0);
1543

1544
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1545 1546
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1547
    taosMemoryFree(pParam);
D
dapan1121 已提交
1548
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
1549
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1550 1551 1552 1553
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
D
dapan1121 已提交
1554
      .pData = pReq,
L
Liu Jicong 已提交
1555 1556 1557 1558
      .len = tlen,
      .handle = NULL,
  };

1559
  sendInfo->requestId = tmq->consumerId;
L
Liu Jicong 已提交
1560 1561 1562
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1563
  sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
1564

L
Liu Jicong 已提交
1565
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
1566
  tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, async:%d, reqId:0x%"PRIx64, tmq->consumerId, async, tmq->consumerId);
L
add log  
Liu Jicong 已提交
1567

L
Liu Jicong 已提交
1568 1569
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1570

L
Liu Jicong 已提交
1571
  if (!async) {
L
Liu Jicong 已提交
1572 1573 1574 1575
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }
1576

L
Liu Jicong 已提交
1577
  return code;
1578 1579
}

L
Liu Jicong 已提交
1580
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1581 1582 1583
  /*strcpy(pReq->topic, pTopic->topicName);*/
  /*strcpy(pReq->cgroup, tmq->groupId);*/

L
Liu Jicong 已提交
1584 1585 1586 1587
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1588

1589
  pReq->withTbName = tmq->withTbName;
1590
  pReq->timeout = timeout;
L
Liu Jicong 已提交
1591
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1592
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1593
  /*pReq->currentOffset = reqOffset;*/
L
Liu Jicong 已提交
1594
  pReq->reqOffset = pVg->currentOffset;
L
Liu Jicong 已提交
1595
  pReq->reqId = generateRequestId();
1596

L
Liu Jicong 已提交
1597 1598
  pReq->useSnapshot = tmq->useSnapshot;

D
dapan1121 已提交
1599
  pReq->head.vgId = pVg->vgId;
1600 1601
}

L
Liu Jicong 已提交
1602 1603
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
L
Liu Jicong 已提交
1604
  pRspObj->resType = RES_TYPE__TMQ_META;
L
Liu Jicong 已提交
1605 1606 1607 1608 1609 1610 1611 1612
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

L
Liu Jicong 已提交
1613 1614 1615
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1616 1617
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1618
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1619
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1620
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
L
Liu Jicong 已提交
1621

L
Liu Jicong 已提交
1622 1623
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
L
Liu Jicong 已提交
1624
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1625 1626
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1627

L
Liu Jicong 已提交
1628
  return pRspObj;
X
Xiaoyu Wang 已提交
1629 1630
}

L
Liu Jicong 已提交
1631 1632
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
1633
  pRspObj->resType = RES_TYPE__TMQ_METADATA;
L
Liu Jicong 已提交
1634 1635 1636 1637
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;
  pRspObj->resIter = -1;
1638
  memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp));
L
Liu Jicong 已提交
1639 1640 1641

  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
1642
  if (!pWrapper->taosxRsp.withSchema) {
L
Liu Jicong 已提交
1643 1644 1645 1646 1647 1648
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }

  return pRspObj;
}

1649
// broadcast the poll request to all related vnodes
1650
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
1651 1652 1653 1654 1655
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
  tscDebug("consumer:0x%" PRIx64" start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics);

  for (int i = 0; i < numOfTopics; i++) {

X
Xiaoyu Wang 已提交
1656 1657 1658 1659
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
1660
      if (vgStatus == TMQ_VG_STATUS__WAIT) {
L
Liu Jicong 已提交
1661
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
1662
        tscDebug("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId,
L
Liu Jicong 已提交
1663
                 vgSkipCnt);
X
Xiaoyu Wang 已提交
1664
        continue;
L
Liu Jicong 已提交
1665
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1666 1667 1668 1669
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
1670
        tscDebug("consumer:0x%" PRIx64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId);
L
temp  
Liu Jicong 已提交
1671 1672
        }
#endif
X
Xiaoyu Wang 已提交
1673
      }
1674

L
Liu Jicong 已提交
1675
      atomic_store_32(&pVg->vgSkipCnt, 0);
D
dapan1121 已提交
1676 1677 1678 1679 1680 1681 1682 1683 1684

      SMqPollReq req = {0};
      tmqBuildConsumeReqImpl(&req, tmq, timeout, pTopic, pVg);
      int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
      if (msgSize < 0) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        tsem_post(&tmq->rspSem);
        return -1;
      }
1685

L
Liu Jicong 已提交
1686
      char* msg = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
1687 1688 1689 1690 1691
      if (NULL == msg) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        tsem_post(&tmq->rspSem);
        return -1;
      }
L
Liu Jicong 已提交
1692

D
dapan1121 已提交
1693 1694
      if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
        taosMemoryFree(msg);
X
Xiaoyu Wang 已提交
1695
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1696
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1697 1698
        return -1;
      }
L
Liu Jicong 已提交
1699

wafwerar's avatar
wafwerar 已提交
1700
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1701
      if (pParam == NULL) {
D
dapan1121 已提交
1702
        taosMemoryFree(msg);
X
Xiaoyu Wang 已提交
1703
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1704
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1705 1706
        return -1;
      }
1707

1708 1709 1710
      pParam->refId = tmq->refId;
      pParam->epoch = tmq->epoch;

L
Liu Jicong 已提交
1711
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1712
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1713
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1714

1715
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1716
      if (sendInfo == NULL) {
D
dapan1121 已提交
1717
        taosMemoryFree(msg);
wafwerar's avatar
wafwerar 已提交
1718
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1719
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1720
        tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1721 1722 1723 1724
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
D
dapan1121 已提交
1725 1726
          .pData = msg,
          .len = msgSize,
X
Xiaoyu Wang 已提交
1727 1728
          .handle = NULL,
      };
1729

D
dapan1121 已提交
1730
      sendInfo->requestId = req.reqId;
X
Xiaoyu Wang 已提交
1731
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1732
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1733
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1734
      sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
X
Xiaoyu Wang 已提交
1735 1736

      int64_t transporterId = 0;
L
Liu Jicong 已提交
1737

1738
      char offsetFormatBuf[80];
L
Liu Jicong 已提交
1739
      tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffset);
1740 1741

      tscDebug("consumer:0x%" PRIx64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:0x%" PRIx64,
D
dapan1121 已提交
1742
               tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
X
Xiaoyu Wang 已提交
1743
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
1744

X
Xiaoyu Wang 已提交
1745 1746 1747 1748
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
1749

X
Xiaoyu Wang 已提交
1750 1751 1752
  return 0;
}

L
Liu Jicong 已提交
1753 1754
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1755
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1756 1757
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1758
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
L
Liu Jicong 已提交
1759
      tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1760
      /*tmqClearUnhandleMsg(tmq);*/
L
Liu Jicong 已提交
1761
      tDeleteSMqAskEpRsp(rspMsg);
X
Xiaoyu Wang 已提交
1762 1763
      *pReset = true;
    } else {
L
Liu Jicong 已提交
1764
      tmqFreeRspWrapper(rspWrapper);
X
Xiaoyu Wang 已提交
1765 1766 1767 1768 1769 1770 1771 1772
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
1773
void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
1774 1775
  tscDebug("consumer:0x%"PRIx64" start to handle the rsp", tmq->consumerId);

X
Xiaoyu Wang 已提交
1776
  while (1) {
L
Liu Jicong 已提交
1777 1778
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
1779

L
Liu Jicong 已提交
1780
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1781
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1782
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
L
Liu Jicong 已提交
1783 1784 1785 1786

      if (rspWrapper == NULL) {
        return NULL;
      }
X
Xiaoyu Wang 已提交
1787 1788
    }

L
Liu Jicong 已提交
1789 1790 1791 1792 1793
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(rspWrapper);
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
      return NULL;
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1794
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
1795
      tscDebug("consumer:0x%" PRIx64 " process poll rsp", tmq->consumerId);
L
Liu Jicong 已提交
1796
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
1797
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1798
      if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1799
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1800
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1801
         * rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1802
        pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;
X
Xiaoyu Wang 已提交
1803
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1804
        if (pollRspWrapper->dataRsp.blockNum == 0) {
L
Liu Jicong 已提交
1805 1806
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1807 1808
          continue;
        }
L
Liu Jicong 已提交
1809
        // build rsp
L
Liu Jicong 已提交
1810
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1811
        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1812
        return pRsp;
X
Xiaoyu Wang 已提交
1813
      } else {
1814
        tscDebug("consumer:0x%"PRIx64" msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
1815
                 tmq->consumerId, pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1816
        tmqFreeRspWrapper(rspWrapper);
L
Liu Jicong 已提交
1817 1818 1819 1820 1821
        taosFreeQitem(pollRspWrapper);
      }
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
1822 1823 1824

      tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId);

L
Liu Jicong 已提交
1825
      if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1826
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
S
Shengliang Guan 已提交
1827
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
L
Liu Jicong 已提交
1828
         * rspMsg->msg.rspOffset);*/
wmmhello's avatar
wmmhello 已提交
1829
        pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset;
L
Liu Jicong 已提交
1830 1831
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // build rsp
L
Liu Jicong 已提交
1832
        SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1833 1834 1835
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
1836
        tscDebug("consumer:0x%"PRIx64" msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
1837
                 tmq->consumerId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1838
        tmqFreeRspWrapper(rspWrapper);
L
Liu Jicong 已提交
1839
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1840
      }
L
Liu Jicong 已提交
1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
      if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
        /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
         * rspMsg->msg.rspOffset);*/
        pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset;
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        if (pollRspWrapper->taosxRsp.blockNum == 0) {
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
          continue;
        }
wmmhello's avatar
wmmhello 已提交
1856

L
Liu Jicong 已提交
1857
        // build rsp
wmmhello's avatar
wmmhello 已提交
1858
        void* pRsp = NULL;
L
Liu Jicong 已提交
1859
        if (pollRspWrapper->taosxRsp.createTableNum == 0) {
wmmhello's avatar
wmmhello 已提交
1860
          pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1861
        } else {
wmmhello's avatar
wmmhello 已提交
1862 1863
          pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper);
        }
L
Liu Jicong 已提交
1864 1865 1866
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
1867
        tscDebug("consumer:0x%"PRIx64" msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
1868
                 tmq->consumerId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1869
        tmqFreeRspWrapper(rspWrapper);
L
Liu Jicong 已提交
1870 1871
        taosFreeQitem(pollRspWrapper);
      }
X
Xiaoyu Wang 已提交
1872
    } else {
L
fix  
Liu Jicong 已提交
1873
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1874
      bool reset = false;
L
Liu Jicong 已提交
1875 1876
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1877
      if (pollIfReset && reset) {
1878
        tscDebug("consumer:0x%" PRIx64 ", reset and repoll", tmq->consumerId);
1879
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
1880 1881 1882 1883 1884
      }
    }
  }
}

1885
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1886 1887
  void*   rspObj;
  int64_t startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1888

1889
  tscDebug("consumer:0x%" PRIx64 " start to poll at %" PRId64, tmq->consumerId, startTime);
L
Liu Jicong 已提交
1890

1891 1892 1893
#if 0
  tmqHandleAllDelayedTask(tmq);
  tmqPollImpl(tmq, timeout);
1894
  rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1895 1896
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1897
  }
1898
#endif
X
Xiaoyu Wang 已提交
1899

1900
  // in no topic status, delayed task also need to be processed
L
Liu Jicong 已提交
1901
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1902
    tscDebug("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId);
1903 1904 1905
    return NULL;
  }

L
Liu Jicong 已提交
1906 1907 1908 1909 1910 1911
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
    int32_t retryCnt = 0;
    while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
      if (retryCnt++ > 10) {
        return NULL;
      }
1912 1913

      tscDebug("consumer:0x%"PRIx64" not ready, retry:%d/10 in 500ms", tmq->consumerId, retryCnt);
L
Liu Jicong 已提交
1914 1915 1916 1917
      taosMsleep(500);
    }
  }

X
Xiaoyu Wang 已提交
1918
  while (1) {
L
Liu Jicong 已提交
1919
    tmqHandleAllDelayedTask(tmq);
1920

L
Liu Jicong 已提交
1921
    if (tmqPollImpl(tmq, timeout) < 0) {
1922
      tscDebug("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId);
L
Liu Jicong 已提交
1923 1924
      /*return NULL;*/
    }
L
Liu Jicong 已提交
1925

1926
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1927
    if (rspObj) {
1928
      tscDebug("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj);
L
Liu Jicong 已提交
1929
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
1930
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
1931
      tscDebug("consumer:0x%" PRIx64 " return null since no committed offset", tmq->consumerId);
L
Liu Jicong 已提交
1932
      return NULL;
X
Xiaoyu Wang 已提交
1933
    }
1934

1935
    if (timeout != -1) {
L
Liu Jicong 已提交
1936
      int64_t currentTime = taosGetTimestampMs();
1937 1938 1939
      int64_t elapsedTime = currentTime - startTime;
      if (elapsedTime > timeout) {
        tscDebug("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
L
Liu Jicong 已提交
1940
                 tmq->consumerId, tmq->epoch, startTime, currentTime);
X
Xiaoyu Wang 已提交
1941 1942
        return NULL;
      }
1943
      /*tscInfo("consumer:0x%" PRIx64 ", (epoch %d) wait, start time %" PRId64 ", current time %" PRId64*/
L
Liu Jicong 已提交
1944
      /*", left time %" PRId64,*/
1945 1946
      /*tmq->consumerId, tmq->epoch, startTime, currentTime, (timeout - elapsedTime));*/
      tsem_timewait(&tmq->rspSem, (timeout - elapsedTime));
L
Liu Jicong 已提交
1947 1948
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
L
Liu Jicong 已提交
1949
      tsem_timewait(&tmq->rspSem, 1000);
X
Xiaoyu Wang 已提交
1950 1951 1952 1953
    }
  }
}

L
Liu Jicong 已提交
1954
int32_t tmq_consumer_close(tmq_t* tmq) {
1955
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
L
Liu Jicong 已提交
1956 1957
    int32_t rsp = tmq_commit_sync(tmq, NULL);
    if (rsp != 0) {
L
Liu Jicong 已提交
1958
      return rsp;
1959 1960
    }

L
Liu Jicong 已提交
1961
    int32_t     retryCnt = 0;
1962
    tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
1963 1964 1965 1966 1967 1968 1969 1970 1971 1972
    while (1) {
      rsp = tmq_subscribe(tmq, lst);
      if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
        break;
      } else {
        retryCnt++;
        taosMsleep(500);
      }
    }

1973
    tmq_list_destroy(lst);
L
Liu Jicong 已提交
1974
  }
1975
  taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
L
Liu Jicong 已提交
1976
  return 0;
1977
}
L
Liu Jicong 已提交
1978

L
Liu Jicong 已提交
1979 1980
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
1981
    return "success";
L
Liu Jicong 已提交
1982
  } else if (err == -1) {
L
Liu Jicong 已提交
1983 1984 1985
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
1986 1987
  }
}
L
Liu Jicong 已提交
1988

L
Liu Jicong 已提交
1989 1990 1991 1992 1993
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
    return TMQ_RES_TABLE_META;
1994 1995
  } else if (TD_RES_TMQ_METADATA(res)) {
    return TMQ_RES_METADATA;
L
Liu Jicong 已提交
1996 1997 1998 1999 2000
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
2001
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
2002 2003
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
2004
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
2005 2006 2007
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
2008 2009 2010
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
2011 2012 2013 2014 2015
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
2016 2017 2018 2019
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
2020 2021 2022
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
2023 2024 2025
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
2026 2027 2028 2029 2030
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
2031 2032 2033 2034
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
2035 2036 2037
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
2038
  } else if (TD_RES_TMQ_METADATA(res)) {
2039 2040
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
2041 2042 2043 2044
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
2045 2046 2047 2048 2049 2050 2051 2052

const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
2053
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
2054 2055
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
L
Liu Jicong 已提交
2056 2057 2058
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
2059
    }
L
Liu Jicong 已提交
2060 2061
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
  }
L
Liu Jicong 已提交
2062 2063
  return NULL;
}
2064

L
Liu Jicong 已提交
2065
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
2066
  //
L
Liu Jicong 已提交
2067
  tmqCommitInner(tmq, msg, 0, 1, cb, param);
L
Liu Jicong 已提交
2068 2069
}

2070 2071
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) {
  //
L
Liu Jicong 已提交
2072
  return tmqCommitInner(tmq, msg, 0, 0, NULL, NULL);
2073
}