clientTmq.c 85.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "cJSON.h"
17 18 19
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22
#include "tdef.h"
#include "tglobal.h"
X
Xiaoyu Wang 已提交
23
#include "tqueue.h"
24
#include "tref.h"
L
Liu Jicong 已提交
25 26
#include "ttimer.h"

X
Xiaoyu Wang 已提交
27 28
#define EMPTY_BLOCK_POLL_IDLE_DURATION 10
#define DEFAULT_AUTO_COMMIT_INTERVAL   5000
29

30 31
typedef void (*__tmq_askep_fn_t)(tmq_t* pTmq, int32_t code, SDataBuf* pBuf, void* pParam);

X
Xiaoyu Wang 已提交
32
struct SMqMgmt {
33 34 35
  int8_t  inited;
  tmr_h   timer;
  int32_t rsetId;
36
};
L
Liu Jicong 已提交
37

X
Xiaoyu Wang 已提交
38 39
static TdThreadOnce   tmqInit = PTHREAD_ONCE_INIT;  // initialize only once
volatile int32_t      tmqInitRes = 0;               // initialize rsp code
40
static struct SMqMgmt tmqMgmt = {0};
41

L
Liu Jicong 已提交
42 43 44 45 46 47
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
48 49 50
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
51 52
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
53
struct tmq_list_t {
L
Liu Jicong 已提交
54
  SArray container;
L
Liu Jicong 已提交
55
};
L
Liu Jicong 已提交
56

L
Liu Jicong 已提交
57
struct tmq_conf_t {
58 59 60 61 62 63 64 65
  char           clientId[256];
  char           groupId[TSDB_CGROUP_LEN];
  int8_t         autoCommit;
  int8_t         resetOffset;
  int8_t         withTbName;
  int8_t         snapEnable;
  int32_t        snapBatchSize;
  bool           hbBgEnable;
66 67 68 69 70
  uint16_t       port;
  int32_t        autoCommitInterval;
  char*          ip;
  char*          user;
  char*          pass;
71
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
72
  void*          commitCbUserParam;
L
Liu Jicong 已提交
73 74 75
};

struct tmq_t {
76 77 78 79 80 81 82 83 84 85
  int64_t        refId;
  char           groupId[TSDB_CGROUP_LEN];
  char           clientId[256];
  int8_t         withTbName;
  int8_t         useSnapshot;
  int8_t         autoCommit;
  int32_t        autoCommitInterval;
  int32_t        resetOffsetCfg;
  uint64_t       consumerId;
  bool           hbBgEnable;
L
Liu Jicong 已提交
86 87
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
88 89 90 91

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
92 93
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
94
  int32_t epSkipCnt;
L
Liu Jicong 已提交
95
#endif
96
  // poll info
X
Xiaoyu Wang 已提交
97 98
  int64_t pollCnt;
  int64_t totalRows;
L
Liu Jicong 已提交
99

L
Liu Jicong 已提交
100
  // timer
X
Xiaoyu Wang 已提交
101 102 103 104 105 106 107 108 109 110
  tmr_h       hbLiveTimer;
  tmr_h       epTimer;
  tmr_h       reportTimer;
  tmr_h       commitTimer;
  STscObj*    pTscObj;       // connection
  SArray*     clientTopics;  // SArray<SMqClientTopic>
  STaosQueue* mqueue;        // queue of rsp
  STaosQall*  qall;
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit
  tsem_t      rspSem;
L
Liu Jicong 已提交
111 112
};

113 114
typedef struct SAskEpInfo {
  int32_t code;
H
Haojun Liao 已提交
115
  tsem_t  sem;
116 117
} SAskEpInfo;

X
Xiaoyu Wang 已提交
118 119 120 121 122 123 124 125
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
126
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
127
  TMQ_CONSUMER_STATUS__RECOVER,
L
Liu Jicong 已提交
128 129
};

L
Liu Jicong 已提交
130
enum {
131
  TMQ_DELAYED_TASK__ASK_EP = 1,
L
Liu Jicong 已提交
132 133 134 135
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

H
Haojun Liao 已提交
136
typedef struct SVgOffsetInfo {
L
Liu Jicong 已提交
137 138
  STqOffsetVal committedOffset;
  STqOffsetVal currentOffset;
H
Haojun Liao 已提交
139 140 141 142 143 144 145 146 147 148 149
  int64_t      walVerBegin;
  int64_t      walVerEnd;
} SVgOffsetInfo;

typedef struct {
  int64_t       pollCnt;
  int64_t       numOfRows;
  SVgOffsetInfo offsetInfo;
  int32_t       vgId;
  int32_t       vgStatus;
  int32_t       vgSkipCnt;            // here used to mark the slow vgroups
150
  bool          receivedInfoFromVnode;// has already received info from vnode
H
Haojun Liao 已提交
151
  int64_t       emptyBlockReceiveTs;  // once empty block is received, idle for ignoreCnt then start to poll data
152
  bool          seekUpdated;          // offset is updated by seek operator, therefore, not update by vnode rsp.
H
Haojun Liao 已提交
153
  SEpSet        epSet;
154 155
} SMqClientVg;

L
Liu Jicong 已提交
156
typedef struct {
157 158 159
  char           topicName[TSDB_TOPIC_FNAME_LEN];
  char           db[TSDB_DB_FNAME_LEN];
  SArray*        vgs;  // SArray<SMqClientVg>
L
Liu Jicong 已提交
160
  SSchemaWrapper schema;
161 162
} SMqClientTopic;

L
Liu Jicong 已提交
163 164
typedef struct {
  int8_t          tmqRspType;
165
  int32_t         epoch;  // epoch can be used to guard the vgHandle
166
  int32_t         vgId;
L
Liu Jicong 已提交
167 168
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
H
Haojun Liao 已提交
169
  uint64_t        reqId;
170
  SEpSet*         pEpset;
L
Liu Jicong 已提交
171
  union {
L
Liu Jicong 已提交
172 173
    SMqDataRsp dataRsp;
    SMqMetaRsp metaRsp;
L
Liu Jicong 已提交
174
    STaosxRsp  taosxRsp;
L
Liu Jicong 已提交
175
  };
L
Liu Jicong 已提交
176 177
} SMqPollRspWrapper;

L
Liu Jicong 已提交
178
typedef struct {
179 180
  int64_t refId;
  int32_t epoch;
L
Liu Jicong 已提交
181 182
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
183
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
184

L
Liu Jicong 已提交
185
typedef struct {
186 187 188 189
  int64_t          refId;
  int32_t          epoch;
  void*            pParam;
  __tmq_askep_fn_t pUserFn;
190 191
} SMqAskEpCbParam;

L
Liu Jicong 已提交
192
typedef struct {
193 194
  int64_t         refId;
  int32_t         epoch;
L
Liu Jicong 已提交
195
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
196
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
197
  int32_t         vgId;
X
Xiaoyu Wang 已提交
198
  uint64_t        requestId;  // request id for debug purpose
X
Xiaoyu Wang 已提交
199
} SMqPollCbParam;
200

201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
typedef struct SMqVgCommon {
  tsem_t        rsp;
  int32_t       numOfRsp;
  SArray*       pList;
  TdThreadMutex mutex;
  int64_t       consumerId;
  char*         pTopicName;
  int32_t       code;
} SMqVgCommon;

typedef struct SMqVgWalInfoParam {
  int32_t      vgId;
  int32_t      epoch;
  int32_t      totalReq;
  SMqVgCommon* pCommon;
} SMqVgWalInfoParam;

218
typedef struct {
219 220
  int64_t        refId;
  int32_t        epoch;
L
Liu Jicong 已提交
221 222
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
223
  int32_t        code;
224
  tmq_commit_cb* callbackFn;
L
Liu Jicong 已提交
225 226
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
X
Xiaoyu Wang 已提交
227
  void* userParam;
228 229 230 231
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
232
  SMqVgOffset*         pOffset;
H
Haojun Liao 已提交
233 234 235
  char                 topicName[TSDB_TOPIC_FNAME_LEN];
  int32_t              vgId;
  tmq_t*               pTmq;
236
} SMqCommitCbParam;
237

238 239 240 241 242
typedef struct SSyncCommitInfo {
  tsem_t  sem;
  int32_t code;
} SSyncCommitInfo;

243
static int32_t doAskEp(tmq_t* tmq);
244 245
static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg);
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet);
246
static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet,
247
                               int32_t index, int32_t totalVgroups, int32_t type);
X
Xiaoyu Wang 已提交
248 249 250
static void    commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId);
static void    asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param);
static void    addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param);
251

252
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
253
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
254 255 256 257 258
  if (conf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return conf;
  }

259
  conf->withTbName = false;
L
Liu Jicong 已提交
260
  conf->autoCommit = true;
261
  conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL;
262
  conf->resetOffset = TMQ_OFFSET__RESET_EARLIEAST;
263
  conf->hbBgEnable = true;
264

265 266 267
  return conf;
}

L
Liu Jicong 已提交
268
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
269
  if (conf) {
270 271 272 273 274 275 276 277 278
    if (conf->ip) {
      taosMemoryFree(conf->ip);
    }
    if (conf->user) {
      taosMemoryFree(conf->user);
    }
    if (conf->pass) {
      taosMemoryFree(conf->pass);
    }
L
Liu Jicong 已提交
279 280
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
281 282 283
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
284
  if (strcasecmp(key, "group.id") == 0) {
L
Liu Jicong 已提交
285
    tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN);
L
Liu Jicong 已提交
286
    return TMQ_CONF_OK;
287
  }
L
Liu Jicong 已提交
288

289
  if (strcasecmp(key, "client.id") == 0) {
L
Liu Jicong 已提交
290
    tstrncpy(conf->clientId, value, 256);
L
Liu Jicong 已提交
291 292
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
293

294 295
  if (strcasecmp(key, "enable.auto.commit") == 0) {
    if (strcasecmp(value, "true") == 0) {
L
Liu Jicong 已提交
296
      conf->autoCommit = true;
L
Liu Jicong 已提交
297
      return TMQ_CONF_OK;
298
    } else if (strcasecmp(value, "false") == 0) {
L
Liu Jicong 已提交
299
      conf->autoCommit = false;
L
Liu Jicong 已提交
300 301 302 303
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
304
  }
L
Liu Jicong 已提交
305

306
  if (strcasecmp(key, "auto.commit.interval.ms") == 0) {
307
    conf->autoCommitInterval = taosStr2int64(value);
L
Liu Jicong 已提交
308 309 310
    return TMQ_CONF_OK;
  }

311 312 313
  if (strcasecmp(key, "auto.offset.reset") == 0) {
    if (strcasecmp(value, "none") == 0) {
      conf->resetOffset = TMQ_OFFSET__RESET_NONE;
L
Liu Jicong 已提交
314
      return TMQ_CONF_OK;
315 316
    } else if (strcasecmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_OFFSET__RESET_EARLIEAST;
L
Liu Jicong 已提交
317
      return TMQ_CONF_OK;
318 319
    } else if (strcasecmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
L
Liu Jicong 已提交
320 321 322 323 324
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
325

326 327
  if (strcasecmp(key, "msg.with.table.name") == 0) {
    if (strcasecmp(value, "true") == 0) {
328
      conf->withTbName = true;
L
Liu Jicong 已提交
329
      return TMQ_CONF_OK;
330
    } else if (strcasecmp(value, "false") == 0) {
331
      conf->withTbName = false;
L
Liu Jicong 已提交
332
      return TMQ_CONF_OK;
333 334 335 336 337
    } else {
      return TMQ_CONF_INVALID;
    }
  }

338 339
  if (strcasecmp(key, "experimental.snapshot.enable") == 0) {
    if (strcasecmp(value, "true") == 0) {
L
Liu Jicong 已提交
340
      conf->snapEnable = true;
341
      return TMQ_CONF_OK;
342
    } else if (strcasecmp(value, "false") == 0) {
L
Liu Jicong 已提交
343
      conf->snapEnable = false;
344 345 346 347 348 349
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

350
  if (strcasecmp(key, "experimental.snapshot.batch.size") == 0) {
351
    conf->snapBatchSize = taosStr2int64(value);
L
Liu Jicong 已提交
352 353 354
    return TMQ_CONF_OK;
  }

355
  if (strcasecmp(key, "enable.heartbeat.background") == 0) {
X
Xiaoyu Wang 已提交
356 357 358 359 360 361 362 363 364 365
    //    if (strcasecmp(value, "true") == 0) {
    //      conf->hbBgEnable = true;
    //      return TMQ_CONF_OK;
    //    } else if (strcasecmp(value, "false") == 0) {
    //      conf->hbBgEnable = false;
    //      return TMQ_CONF_OK;
    //    } else {
    tscError("the default value of enable.heartbeat.background is true, can not be seted");
    return TMQ_CONF_INVALID;
    //    }
L
Liu Jicong 已提交
366 367
  }

368
  if (strcasecmp(key, "td.connect.ip") == 0) {
369
    conf->ip = taosStrdup(value);
L
Liu Jicong 已提交
370 371
    return TMQ_CONF_OK;
  }
372

373
  if (strcasecmp(key, "td.connect.user") == 0) {
374
    conf->user = taosStrdup(value);
L
Liu Jicong 已提交
375 376
    return TMQ_CONF_OK;
  }
377

378
  if (strcasecmp(key, "td.connect.pass") == 0) {
379
    conf->pass = taosStrdup(value);
L
Liu Jicong 已提交
380 381
    return TMQ_CONF_OK;
  }
382

383
  if (strcasecmp(key, "td.connect.port") == 0) {
384
    conf->port = taosStr2int64(value);
L
Liu Jicong 已提交
385 386
    return TMQ_CONF_OK;
  }
387

388
  if (strcasecmp(key, "td.connect.db") == 0) {
L
Liu Jicong 已提交
389 390 391
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
392
  return TMQ_CONF_UNKNOWN;
393 394
}

X
Xiaoyu Wang 已提交
395
tmq_list_t* tmq_list_new() { return (tmq_list_t*)taosArrayInit(0, sizeof(void*)); }
396

L
Liu Jicong 已提交
397 398
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
399
  if (src == NULL || src[0] == 0) return -1;
400
  char* topic = taosStrdup(src);
L
fix  
Liu Jicong 已提交
401
  if (taosArrayPush(container, &topic) == NULL) return -1;
402 403 404
  return 0;
}

L
Liu Jicong 已提交
405
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
406
  SArray* container = &list->container;
L
Liu Jicong 已提交
407
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
408 409
}

L
Liu Jicong 已提交
410 411 412 413 414 415 416 417 418 419
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

X
Xiaoyu Wang 已提交
420 421
static SMqClientVg* foundClientVg(SArray* pTopicList, const char* pName, int32_t vgId, int32_t* index,
                                  int32_t* numOfVgroups) {
422 423 424 425
  int32_t numOfTopics = taosArrayGetSize(pTopicList);
  *index = -1;
  *numOfVgroups = 0;

X
Xiaoyu Wang 已提交
426
  for (int32_t i = 0; i < numOfTopics; ++i) {
427 428
    SMqClientTopic* pTopic = taosArrayGet(pTopicList, i);
    if (strcmp(pTopic->topicName, pName) != 0) {
429 430 431
      continue;
    }

432 433
    *numOfVgroups = taosArrayGetSize(pTopic->vgs);
    for (int32_t j = 0; j < (*numOfVgroups); ++j) {
434
      SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
435 436 437
      if (pClientVg->vgId == vgId) {
        *index = j;
        return pClientVg;
438 439
      }
    }
L
Liu Jicong 已提交
440
  }
441 442

  return NULL;
L
Liu Jicong 已提交
443
}
444

445 446 447
// Two problems do not need to be addressed here
// 1. update to of epset. the response of poll request will automatically handle this problem
// 2. commit failure. This one needs to be resolved.
H
Haojun Liao 已提交
448
static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
449
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
450
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
H
Haojun Liao 已提交
451

X
Xiaoyu Wang 已提交
452 453 454 455 456 457 458 459 460 461 462 463 464 465
  //  if (code != TSDB_CODE_SUCCESS) { // if commit offset failed, let's try again
  //    taosThreadMutexLock(&pParam->pTmq->lock);
  //    int32_t numOfVgroups, index;
  //    SMqClientVg* pVg = foundClientVg(pParam->pTmq->clientTopics, pParam->topicName, pParam->vgId, &index,
  //    &numOfVgroups); if (pVg == NULL) {
  //      tscDebug("consumer:0x%" PRIx64
  //               " subKey:%s vgId:%d commit failed, code:%s has been transferred to other consumer, no need retry
  //               ordinal:%d/%d", pParam->pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, tstrerror(code),
  //               index + 1, numOfVgroups);
  //    } else { // let's retry the commit
  //      int32_t code1 = doSendCommitMsg(pParam->pTmq, pVg, pParam->topicName, pParamSet, index, numOfVgroups);
  //      if (code1 != TSDB_CODE_SUCCESS) {  // retry failed.
  //        tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64
  //                 " retry failed, ignore this commit. code:%s ordinal:%d/%d",
H
Haojun Liao 已提交
466
  //                 pParam->pTmq->consumerId, pParam->topicName, pVg->vgId, pVg->offsetInfo.committedOffset.version,
X
Xiaoyu Wang 已提交
467 468 469 470 471 472 473 474 475 476 477 478 479 480 481
  //                 tstrerror(terrno), index + 1, numOfVgroups);
  //      }
  //    }
  //
  //    taosThreadMutexUnlock(&pParam->pTmq->lock);
  //
  //    taosMemoryFree(pParam->pOffset);
  //    taosMemoryFree(pBuf->pData);
  //    taosMemoryFree(pBuf->pEpSet);
  //
  //    commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
  //    return 0;
  //  }
  //
  //  // todo replace the pTmq with refId
482

L
Liu Jicong 已提交
483
  taosMemoryFree(pParam->pOffset);
L
Liu Jicong 已提交
484
  taosMemoryFree(pBuf->pData);
dengyihao's avatar
dengyihao 已提交
485
  taosMemoryFree(pBuf->pEpSet);
L
Liu Jicong 已提交
486

487
  commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
488 489 490
  return 0;
}

491
static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet,
492
                               int32_t index, int32_t totalVgroups, int32_t type) {
493
  SMqVgOffset* pOffset = taosMemoryCalloc(1, sizeof(SMqVgOffset));
L
Liu Jicong 已提交
494
  if (pOffset == NULL) {
495
    return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
496
  }
497

498 499
  pOffset->consumerId = tmq->consumerId;
  pOffset->offset.val = pVg->offsetInfo.currentOffset;
500

L
Liu Jicong 已提交
501
  int32_t groupLen = strlen(tmq->groupId);
502 503 504
  memcpy(pOffset->offset.subKey, tmq->groupId, groupLen);
  pOffset->offset.subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pOffset->offset.subKey + groupLen + 1, pTopicName);
L
Liu Jicong 已提交
505

506 507
  int32_t len = 0;
  int32_t code = 0;
508
  tEncodeSize(tEncodeMqVgOffset, pOffset, len, code);
L
Liu Jicong 已提交
509
  if (code < 0) {
510
    return TSDB_CODE_INVALID_PARA;
L
Liu Jicong 已提交
511
  }
512

L
Liu Jicong 已提交
513
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
L
Liu Jicong 已提交
514 515
  if (buf == NULL) {
    taosMemoryFree(pOffset);
516
    return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
517
  }
518

L
Liu Jicong 已提交
519
  ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
520

L
Liu Jicong 已提交
521 522 523 524
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
525
  tEncodeMqVgOffset(&encoder, pOffset);
L
Liu Jicong 已提交
526
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
527 528

  // build param
529
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
530
  if (pParam == NULL) {
L
Liu Jicong 已提交
531
    taosMemoryFree(pOffset);
L
Liu Jicong 已提交
532
    taosMemoryFree(buf);
533
    return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
534
  }
535

L
Liu Jicong 已提交
536 537
  pParam->params = pParamSet;
  pParam->pOffset = pOffset;
H
Haojun Liao 已提交
538 539 540
  pParam->vgId = pVg->vgId;
  pParam->pTmq = tmq;

H
Haojun Liao 已提交
541
  tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName));
L
Liu Jicong 已提交
542 543 544 545

  // build send info
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
L
Liu Jicong 已提交
546
    taosMemoryFree(pOffset);
L
Liu Jicong 已提交
547 548
    taosMemoryFree(buf);
    taosMemoryFree(pParam);
549
    return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
550
  }
551

552
  pMsgSendInfo->msgInfo = (SDataBuf) { .pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL };
L
Liu Jicong 已提交
553 554 555 556

  pMsgSendInfo->requestId = generateRequestId();
  pMsgSendInfo->requestObjRefId = 0;
  pMsgSendInfo->param = pParam;
L
Liu Jicong 已提交
557
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
558
  pMsgSendInfo->fp = tmqCommitCb;
559
  pMsgSendInfo->msgType = type;
L
Liu Jicong 已提交
560

L
Liu Jicong 已提交
561 562 563
  atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
  atomic_add_fetch_32(&pParamSet->totalRspNum, 1);

H
Haojun Liao 已提交
564
  SEp* pEp = GET_ACTIVE_EP(&pVg->epSet);
565
  char offsetBuf[80] = {0};
566
  tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffset->offset.val);
567 568

  char commitBuf[80] = {0};
H
Haojun Liao 已提交
569
  tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
570
  tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%s prev:%s, ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64,
571
           tmq->consumerId, pOffset->offset.subKey, pVg->vgId, offsetBuf, commitBuf, pEp->fqdn, pEp->port, index + 1,
572
           totalVgroups, pMsgSendInfo->requestId);
H
Haojun Liao 已提交
573

L
Liu Jicong 已提交
574 575
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
576 577

  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
578 579
}

H
Haojun Liao 已提交
580 581 582 583 584 585 586 587 588 589 590
static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) {
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < numOfTopics; ++i) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(pTopic->topicName, pTopicName) != 0) {
      continue;
    }

    return pTopic;
  }

H
Haojun Liao 已提交
591
  tscError("consumer:0x%" PRIx64 ", total:%d, failed to find topic:%s", tmq->consumerId, numOfTopics, pTopicName);
H
Haojun Liao 已提交
592 593 594
  return NULL;
}

595
static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tmq_commit_cb* pCommitFp, void* userParam) {
596 597 598 599 600 601 602 603 604 605 606 607
  char*   pTopicName = NULL;
  int32_t vgId = 0;
  int32_t code = 0;

  if (pRes == NULL || tmq == NULL) {
    pCommitFp(tmq, TSDB_CODE_INVALID_PARA, userParam);
    return;
  }

  if (TD_RES_TMQ(pRes)) {
    SMqRspObj* pRspObj = (SMqRspObj*)pRes;
    pTopicName = pRspObj->topic;
L
Liu Jicong 已提交
608
    vgId = pRspObj->vgId;
609 610 611
  } else if (TD_RES_TMQ_META(pRes)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)pRes;
    pTopicName = pMetaRspObj->topic;
L
Liu Jicong 已提交
612
    vgId = pMetaRspObj->vgId;
613 614 615
  } else if (TD_RES_TMQ_METADATA(pRes)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)pRes;
    pTopicName = pRspObj->topic;
616
    vgId = pRspObj->vgId;
L
Liu Jicong 已提交
617
  } else {
618 619
    pCommitFp(tmq, TSDB_CODE_TMQ_INVALID_MSG, userParam);
    return;
L
Liu Jicong 已提交
620 621 622 623
  }

  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
624 625
    pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam);
    return;
L
Liu Jicong 已提交
626
  }
H
Haojun Liao 已提交
627

628 629
  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;
630
  pParamSet->callbackFn = pCommitFp;
L
Liu Jicong 已提交
631
  pParamSet->userParam = userParam;
L
Liu Jicong 已提交
632

H
Haojun Liao 已提交
633 634
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);

635 636
  tscDebug("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId);

H
Haojun Liao 已提交
637 638
  SMqClientTopic* pTopic = getTopicByName(tmq, pTopicName);
  if (pTopic == NULL) {
X
Xiaoyu Wang 已提交
639 640
    tscWarn("consumer:0x%" PRIx64 " failed to find the specified topic:%s, total topics:%d", tmq->consumerId,
            pTopicName, numOfTopics);
641 642 643 644
    taosMemoryFree(pParamSet);
    pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
    return;
  }
L
Liu Jicong 已提交
645

646 647 648 649 650 651
  int32_t j = 0;
  int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
  for (j = 0; j < numOfVgroups; j++) {
    SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
    if (pVg->vgId == vgId) {
      break;
L
Liu Jicong 已提交
652
    }
L
Liu Jicong 已提交
653
  }
L
Liu Jicong 已提交
654

655
  if (j == numOfVgroups) {
X
Xiaoyu Wang 已提交
656 657
    tscWarn("consumer:0x%" PRIx64 " failed to find the specified vgId:%d, total Vgs:%d, topic:%s", tmq->consumerId,
            vgId, numOfVgroups, pTopicName);
L
Liu Jicong 已提交
658
    taosMemoryFree(pParamSet);
659 660
    pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
    return;
L
Liu Jicong 已提交
661 662
  }

663
  SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
H
Haojun Liao 已提交
664
  if (pVg->offsetInfo.currentOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.currentOffset, &pVg->offsetInfo.committedOffset)) {
665
    code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups, type);
L
Liu Jicong 已提交
666

667 668 669 670 671
    // failed to commit, callback user function directly.
    if (code != TSDB_CODE_SUCCESS) {
      taosMemoryFree(pParamSet);
      pCommitFp(tmq, code, userParam);
    }
X
Xiaoyu Wang 已提交
672
  } else {  // do not perform commit, callback user function directly.
673 674
    taosMemoryFree(pParamSet);
    pCommitFp(tmq, code, userParam);
L
Liu Jicong 已提交
675 676 677
  }
}

678
static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) {
679 680
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
681 682
    pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam);
    return;
683
  }
684 685 686

  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;
687
  pParamSet->callbackFn = pCommitFp;
688 689
  pParamSet->userParam = userParam;

690 691 692
  // init as 1 to prevent concurrency issue
  pParamSet->waitingRspNum = 1;

693
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
X
Xiaoyu Wang 已提交
694
  tscDebug("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
695 696

  for (int32_t i = 0; i < numOfTopics; i++) {
697
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
698
    int32_t         numOfVgroups = taosArrayGetSize(pTopic->vgs);
L
Liu Jicong 已提交
699

700 701
    tscDebug("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName,
             numOfVgroups);
702
    for (int32_t j = 0; j < numOfVgroups; j++) {
703 704
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);

H
Haojun Liao 已提交
705
      if (pVg->offsetInfo.currentOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.currentOffset, &pVg->offsetInfo.committedOffset)) {
706
        int32_t code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups, TDMT_VND_TMQ_COMMIT_OFFSET);
707 708
        if (code != TSDB_CODE_SUCCESS) {
          tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64 " failed, code:%s ordinal:%d/%d",
H
Haojun Liao 已提交
709
                   tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.committedOffset.version, tstrerror(terrno),
710
                   j + 1, numOfVgroups);
L
Liu Jicong 已提交
711 712
          continue;
        }
H
Haojun Liao 已提交
713 714

        // update the offset value.
H
Haojun Liao 已提交
715
        pVg->offsetInfo.committedOffset = pVg->offsetInfo.currentOffset;
716
      } else {
D
dapan1121 已提交
717
        tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d",
H
Haojun Liao 已提交
718
                 tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.currentOffset.version, j + 1, numOfVgroups);
719 720 721 722
      }
    }
  }

H
Haojun Liao 已提交
723
  tscDebug("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1,
724
           numOfTopics);
H
Haojun Liao 已提交
725

L
Liu Jicong 已提交
726
  // no request is sent
L
Liu Jicong 已提交
727 728
  if (pParamSet->totalRspNum == 0) {
    taosMemoryFree(pParamSet);
729 730
    pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
    return;
L
Liu Jicong 已提交
731 732
  }

L
Liu Jicong 已提交
733
  // count down since waiting rsp num init as 1
734
  commitRspCountDown(pParamSet, tmq->consumerId, "", 0);
735 736
}

737 738
static void generateTimedTask(int64_t refId, int32_t type) {
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
739
  if (tmq != NULL) {
S
Shengliang Guan 已提交
740
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
741
    *pTaskType = type;
742 743 744
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
745
  taosReleaseRef(tmqMgmt.rsetId, refId);
746 747 748 749 750
}

void tmqAssignAskEpTask(void* param, void* tmrId) {
  int64_t refId = *(int64_t*)param;
  generateTimedTask(refId, TMQ_DELAYED_TASK__ASK_EP);
751
  taosMemoryFree(param);
L
Liu Jicong 已提交
752 753 754
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
755
  int64_t refId = *(int64_t*)param;
756
  generateTimedTask(refId, TMQ_DELAYED_TASK__COMMIT);
757
  taosMemoryFree(param);
L
Liu Jicong 已提交
758 759 760
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
761 762 763
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
S
Shengliang Guan 已提交
764
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
765 766 767 768
    *pTaskType = TMQ_DELAYED_TASK__REPORT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
769 770

  taosReleaseRef(tmqMgmt.rsetId, refId);
771
  taosMemoryFree(param);
L
Liu Jicong 已提交
772 773
}

774
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
dengyihao's avatar
dengyihao 已提交
775 776 777 778
  if (pMsg) {
    taosMemoryFree(pMsg->pData);
    taosMemoryFree(pMsg->pEpSet);
  }
779 780 781 782
  return 0;
}

void tmqSendHbReq(void* param, void* tmrId) {
783
  int64_t refId = *(int64_t*)param;
784

X
Xiaoyu Wang 已提交
785
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
786
  if (tmq == NULL) {
L
Liu Jicong 已提交
787
    taosMemoryFree(param);
788 789
    return;
  }
D
dapan1121 已提交
790 791 792 793 794

  SMqHbReq req = {0};
  req.consumerId = tmq->consumerId;
  req.epoch = tmq->epoch;

L
Liu Jicong 已提交
795
  int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
D
dapan1121 已提交
796 797
  if (tlen < 0) {
    tscError("tSerializeSMqHbReq failed");
798
    goto OVER;
D
dapan1121 已提交
799
  }
800

L
Liu Jicong 已提交
801
  void* pReq = taosMemoryCalloc(1, tlen);
D
dapan1121 已提交
802 803
  if (tlen < 0) {
    tscError("failed to malloc MqHbReq msg, size:%d", tlen);
804
    goto OVER;
D
dapan1121 已提交
805
  }
806

D
dapan1121 已提交
807 808 809
  if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) {
    tscError("tSerializeSMqHbReq %d failed", tlen);
    taosMemoryFree(pReq);
810
    goto OVER;
D
dapan1121 已提交
811
  }
812 813 814 815

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
816
    goto OVER;
817
  }
818

819
  sendInfo->msgInfo = (SDataBuf){ .pData = pReq, .len = tlen, .handle = NULL };
820 821 822 823 824

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = NULL;
  sendInfo->fp = tmqHbCb;
L
Liu Jicong 已提交
825
  sendInfo->msgType = TDMT_MND_TMQ_HB;
826 827 828 829 830 831 832

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

OVER:
833
  taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer);
834
  taosReleaseRef(tmqMgmt.rsetId, refId);
835 836
}

837 838
static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) {
  if (code != 0) {
X
Xiaoyu Wang 已提交
839
    tscDebug("consumer:0x%" PRIx64 ", failed to commit offset, code:%s", pTmq->consumerId, tstrerror(code));
840 841 842
  }
}

843
int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
L
Liu Jicong 已提交
844
  STaosQall* qall = taosAllocateQall();
845
  taosReadAllQitems(pTmq->delayedTask, qall);
L
Liu Jicong 已提交
846

847 848 849 850
  if (qall->numOfItems == 0) {
    taosFreeQall(qall);
    return TSDB_CODE_SUCCESS;
  }
851

X
Xiaoyu Wang 已提交
852
  tscDebug("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, qall->numOfItems);
853 854
  int8_t* pTaskType = NULL;
  taosGetQitem(qall, (void**)&pTaskType);
L
Liu Jicong 已提交
855

856
  while (pTaskType != NULL) {
857
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
858
      asyncAskEp(pTmq, addToQueueCallbackFn, NULL);
859 860

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
861
      *pRefId = pTmq->refId;
862

X
Xiaoyu Wang 已提交
863
      tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
864
      taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer);
L
Liu Jicong 已提交
865
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
X
Xiaoyu Wang 已提交
866
      tmq_commit_cb* pCallbackFn = pTmq->commitCb ? pTmq->commitCb : defaultCommitCbFn;
867 868

      asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam);
869
      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
870
      *pRefId = pTmq->refId;
871

872
      tscDebug("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId,
X
Xiaoyu Wang 已提交
873
               pTmq->autoCommitInterval / 1000.0);
874
      taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer);
L
Liu Jicong 已提交
875 876
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    }
877

L
Liu Jicong 已提交
878
    taosFreeQitem(pTaskType);
879
    taosGetQitem(qall, (void**)&pTaskType);
L
Liu Jicong 已提交
880
  }
881

L
Liu Jicong 已提交
882 883 884 885
  taosFreeQall(qall);
  return 0;
}

886
static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
L
Liu Jicong 已提交
887 888 889 890 891 892 893
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
    // do nothing
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
    SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
    tDeleteSMqAskEpRsp(&pEpRspWrapper->msg);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
894 895
    taosMemoryFreeClear(pRsp->pEpset);

L
Liu Jicong 已提交
896 897 898
    taosArrayDestroyP(pRsp->dataRsp.blockData, taosMemoryFree);
    taosArrayDestroy(pRsp->dataRsp.blockDataLen);
    taosArrayDestroyP(pRsp->dataRsp.blockTbName, taosMemoryFree);
899
    taosArrayDestroyP(pRsp->dataRsp.blockSchema, (FDelete)tDeleteSchemaWrapper);
L
Liu Jicong 已提交
900 901
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
902 903
    taosMemoryFreeClear(pRsp->pEpset);

L
Liu Jicong 已提交
904 905 906
    taosMemoryFree(pRsp->metaRsp.metaRsp);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
907 908
    taosMemoryFreeClear(pRsp->pEpset);

L
Liu Jicong 已提交
909 910 911
    taosArrayDestroyP(pRsp->taosxRsp.blockData, taosMemoryFree);
    taosArrayDestroy(pRsp->taosxRsp.blockDataLen);
    taosArrayDestroyP(pRsp->taosxRsp.blockTbName, taosMemoryFree);
912
    taosArrayDestroyP(pRsp->taosxRsp.blockSchema, (FDelete)tDeleteSchemaWrapper);
L
Liu Jicong 已提交
913 914 915 916
    // taosx
    taosArrayDestroy(pRsp->taosxRsp.createTableLen);
    taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree);
  }
917 918

  return NULL;
L
Liu Jicong 已提交
919 920
}

L
Liu Jicong 已提交
921
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
922
  SMqRspWrapper* rspWrapper = NULL;
L
Liu Jicong 已提交
923
  while (1) {
L
Liu Jicong 已提交
924 925 926 927 928
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper) {
      tmqFreeRspWrapper(rspWrapper);
      taosFreeQitem(rspWrapper);
    } else {
L
Liu Jicong 已提交
929
      break;
L
Liu Jicong 已提交
930
    }
L
Liu Jicong 已提交
931 932
  }

L
Liu Jicong 已提交
933
  rspWrapper = NULL;
L
Liu Jicong 已提交
934 935
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
L
Liu Jicong 已提交
936 937 938 939 940
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper) {
      tmqFreeRspWrapper(rspWrapper);
      taosFreeQitem(rspWrapper);
    } else {
L
Liu Jicong 已提交
941
      break;
L
Liu Jicong 已提交
942
    }
L
Liu Jicong 已提交
943 944 945
  }
}

D
dapan1121 已提交
946
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
947 948
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
dengyihao's avatar
dengyihao 已提交
949 950

  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
951 952 953
  tsem_post(&pParam->rspSem);
  return 0;
}
954

L
Liu Jicong 已提交
955
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
X
Xiaoyu Wang 已提交
956 957 958 959
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
960
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
961
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
962
  }
L
Liu Jicong 已提交
963
  return 0;
X
Xiaoyu Wang 已提交
964 965
}

L
Liu Jicong 已提交
966
int32_t tmq_unsubscribe(tmq_t* tmq) {
L
Liu Jicong 已提交
967 968
  int32_t     rsp;
  int32_t     retryCnt = 0;
L
Liu Jicong 已提交
969
  tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
970 971 972 973 974 975 976 977 978 979
  while (1) {
    rsp = tmq_subscribe(tmq, lst);
    if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
      break;
    } else {
      retryCnt++;
      taosMsleep(500);
    }
  }

L
Liu Jicong 已提交
980 981
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
982 983
}

984 985 986 987 988 989
static void freeClientVgImpl(void* param) {
  SMqClientTopic* pTopic = param;
  taosMemoryFreeClear(pTopic->schema.pSchema);
  taosArrayDestroy(pTopic->vgs);
}

990
void tmqFreeImpl(void* handle) {
991 992
  tmq_t*  tmq = (tmq_t*)handle;
  int64_t id = tmq->consumerId;
L
Liu Jicong 已提交
993

994
  // TODO stop timer
L
Liu Jicong 已提交
995 996 997 998
  if (tmq->mqueue) {
    tmqClearUnhandleMsg(tmq);
    taosCloseQueue(tmq->mqueue);
  }
L
Liu Jicong 已提交
999

H
Haojun Liao 已提交
1000 1001 1002 1003 1004
  if (tmq->delayedTask) {
    taosCloseQueue(tmq->delayedTask);
  }

  taosFreeQall(tmq->qall);
1005
  tsem_destroy(&tmq->rspSem);
L
Liu Jicong 已提交
1006

1007
  taosArrayDestroyEx(tmq->clientTopics, freeClientVgImpl);
1008 1009
  taos_close_internal(tmq->pTscObj);
  taosMemoryFree(tmq);
1010 1011

  tscDebug("consumer:0x%" PRIx64 " closed", id);
L
Liu Jicong 已提交
1012 1013
}

1014 1015 1016 1017 1018 1019 1020 1021 1022
static void tmqMgmtInit(void) {
  tmqInitRes = 0;
  tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");

  if (tmqMgmt.timer == NULL) {
    tmqInitRes = TSDB_CODE_OUT_OF_MEMORY;
  }

  tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
1023
  if (tmqMgmt.rsetId < 0) {
1024 1025 1026 1027
    tmqInitRes = terrno;
  }
}

L
Liu Jicong 已提交
1028
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
1029 1030 1031 1032
  taosThreadOnce(&tmqInit, tmqMgmtInit);
  if (tmqInitRes != 0) {
    terrno = tmqInitRes;
    return NULL;
L
Liu Jicong 已提交
1033 1034
  }

L
Liu Jicong 已提交
1035 1036
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
L
Liu Jicong 已提交
1037
    terrno = TSDB_CODE_OUT_OF_MEMORY;
1038
    tscError("failed to create consumer, groupId:%s, code:%s", conf->groupId, terrstr());
L
Liu Jicong 已提交
1039 1040
    return NULL;
  }
L
Liu Jicong 已提交
1041

L
Liu Jicong 已提交
1042 1043 1044
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

L
Liu Jicong 已提交
1045 1046 1047
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->delayedTask = taosOpenQueue();
H
Haojun Liao 已提交
1048
  pTmq->qall = taosAllocateQall();
L
Liu Jicong 已提交
1049

X
Xiaoyu Wang 已提交
1050 1051
  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL ||
      conf->groupId[0] == 0) {
L
Liu Jicong 已提交
1052
    terrno = TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
1053
    tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
1054
    goto _failed;
L
Liu Jicong 已提交
1055
  }
L
Liu Jicong 已提交
1056

L
Liu Jicong 已提交
1057 1058
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
1059 1060
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
1061

L
Liu Jicong 已提交
1062 1063 1064
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
1065
  pTmq->withTbName = conf->withTbName;
L
Liu Jicong 已提交
1066
  pTmq->useSnapshot = conf->snapEnable;
L
Liu Jicong 已提交
1067
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
1068
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
1069 1070
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
1071 1072
  pTmq->resetOffsetCfg = conf->resetOffset;

1073 1074
  pTmq->hbBgEnable = conf->hbBgEnable;

L
Liu Jicong 已提交
1075
  // assign consumerId
L
Liu Jicong 已提交
1076
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
1077

L
Liu Jicong 已提交
1078 1079
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
1080
    tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
S
Shengliang Guan 已提交
1081
             pTmq->groupId);
1082
    goto _failed;
L
Liu Jicong 已提交
1083
  }
L
Liu Jicong 已提交
1084

L
Liu Jicong 已提交
1085 1086 1087
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
1088
    tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
L
Liu Jicong 已提交
1089
    tsem_destroy(&pTmq->rspSem);
1090
    goto _failed;
L
Liu Jicong 已提交
1091
  }
L
Liu Jicong 已提交
1092

1093 1094
  pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
  if (pTmq->refId < 0) {
1095
    goto _failed;
1096 1097
  }

1098
  if (pTmq->hbBgEnable) {
L
Liu Jicong 已提交
1099 1100
    int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
    *pRefId = pTmq->refId;
1101
    pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
1102 1103
  }

1104 1105 1106
  char         buf[80] = {0};
  STqOffsetVal offset = {.type = pTmq->resetOffsetCfg};
  tFormatOffset(buf, tListLen(buf), &offset);
X
Xiaoyu Wang 已提交
1107 1108 1109 1110
  tscInfo("consumer:0x%" PRIx64 " is setup, refId:%" PRId64
          ", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s, backgroudHB:%d",
          pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval,
          buf, pTmq->hbBgEnable);
L
Liu Jicong 已提交
1111

1112
  return pTmq;
1113

1114 1115
_failed:
  tmqFreeImpl(pTmq);
L
Liu Jicong 已提交
1116
  return NULL;
1117 1118
}

L
Liu Jicong 已提交
1119
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
1120
  const int32_t   MAX_RETRY_COUNT = 120 * 4;  // let's wait for 4 mins at most
L
Liu Jicong 已提交
1121 1122 1123
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
L
Liu Jicong 已提交
1124
  SMsgSendInfo*   sendInfo = NULL;
L
Liu Jicong 已提交
1125
  SCMSubscribeReq req = {0};
1126
  int32_t         code = 0;
1127

1128
  tscDebug("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz);
L
Liu Jicong 已提交
1129

1130
  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
1131
  tstrncpy(req.clientId, tmq->clientId, 256);
L
Liu Jicong 已提交
1132
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
1133 1134
  req.topicNames = taosArrayInit(sz, sizeof(void*));

1135 1136 1137 1138
  if (req.topicNames == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
1139

L
Liu Jicong 已提交
1140 1141
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
1142 1143

    SName name = {0};
L
Liu Jicong 已提交
1144 1145 1146 1147
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
1148 1149
    }

1150
    tNameExtractFullName(&name, topicFName);
X
Xiaoyu Wang 已提交
1151
    tscDebug("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName);
L
Liu Jicong 已提交
1152 1153

    taosArrayPush(req.topicNames, &topicFName);
1154 1155
  }

L
Liu Jicong 已提交
1156
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
1157

L
Liu Jicong 已提交
1158
  buf = taosMemoryMalloc(tlen);
1159 1160 1161 1162
  if (buf == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
L
Liu Jicong 已提交
1163

1164 1165 1166
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

L
Liu Jicong 已提交
1167
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
1168 1169 1170 1171
  if (sendInfo == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
1172

1173
  SMqSubscribeCbParam param = { .rspErr = 0, .refId = tmq->refId, .epoch = tmq->epoch };
1174
  if (tsem_init(&param.rspSem, 0, 0) != 0) {
wmmhello's avatar
wmmhello 已提交
1175
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
1176 1177
    goto FAIL;
  }
L
Liu Jicong 已提交
1178

1179
  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
1180

L
Liu Jicong 已提交
1181 1182
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1183 1184
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
1185
  sendInfo->msgType = TDMT_MND_TMQ_SUBSCRIBE;
L
Liu Jicong 已提交
1186

1187 1188 1189 1190 1191
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
1192 1193
  // avoid double free if msg is sent
  buf = NULL;
L
Liu Jicong 已提交
1194
  sendInfo = NULL;
L
Liu Jicong 已提交
1195

L
Liu Jicong 已提交
1196 1197
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
1198

1199 1200 1201 1202
  if (param.rspErr != 0) {
    code = param.rspErr;
    goto FAIL;
  }
L
Liu Jicong 已提交
1203

L
Liu Jicong 已提交
1204
  int32_t retryCnt = 0;
1205
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) {
1206
    if (retryCnt++ > MAX_RETRY_COUNT) {
1207
      tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, max retry reached:%d", tmq->consumerId, retryCnt);
wmmhello's avatar
wmmhello 已提交
1208
      code = TSDB_CODE_TSC_INTERNAL_ERROR;
L
Liu Jicong 已提交
1209 1210
      goto FAIL;
    }
1211

X
Xiaoyu Wang 已提交
1212
    tscDebug("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
L
Liu Jicong 已提交
1213 1214
    taosMsleep(500);
  }
1215

1216 1217
  // init ep timer
  if (tmq->epTimer == NULL) {
1218 1219 1220
    int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId1 = tmq->refId;
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, pRefId1, tmqMgmt.timer);
1221
  }
L
Liu Jicong 已提交
1222 1223

  // init auto commit timer
1224
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
1225 1226 1227
    int64_t* pRefId2 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId2 = tmq->refId;
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId2, tmqMgmt.timer);
L
Liu Jicong 已提交
1228 1229
  }

L
Liu Jicong 已提交
1230
FAIL:
L
Liu Jicong 已提交
1231
  taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
1232
  taosMemoryFree(buf);
L
Liu Jicong 已提交
1233
  taosMemoryFree(sendInfo);
L
Liu Jicong 已提交
1234

L
Liu Jicong 已提交
1235
  return code;
1236 1237
}

L
Liu Jicong 已提交
1238
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
1239
  conf->commitCb = cb;
L
Liu Jicong 已提交
1240
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
1241
}
1242

1243
static int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1244
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
1245 1246

  int64_t         refId = pParam->refId;
X
Xiaoyu Wang 已提交
1247
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
1248
  SMqClientTopic* pTopic = pParam->pTopic;
1249

1250
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
1251 1252 1253
  if (tmq == NULL) {
    taosMemoryFree(pParam);
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1254
    taosMemoryFree(pMsg->pEpSet);
1255 1256 1257 1258
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

H
Haojun Liao 已提交
1259 1260 1261 1262
  int32_t  epoch = pParam->epoch;
  int32_t  vgId = pParam->vgId;
  uint64_t requestId = pParam->requestId;

L
Liu Jicong 已提交
1263
  taosMemoryFree(pParam);
H
Haojun Liao 已提交
1264

L
Liu Jicong 已提交
1265
  if (code != 0) {
L
Liu Jicong 已提交
1266
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1267 1268
    if (pMsg->pEpSet) taosMemoryFree(pMsg->pEpSet);

H
Haojun Liao 已提交
1269
    // in case of consumer mismatch, wait for 500ms and retry
L
Liu Jicong 已提交
1270
    if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
1271
      taosMsleep(500);
L
Liu Jicong 已提交
1272
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
X
Xiaoyu Wang 已提交
1273 1274
      tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, wait for 500ms and set status to be RECOVER",
               tmq->consumerId);
H
Haojun Liao 已提交
1275
    } else if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
S
Shengliang Guan 已提交
1276
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1277
      if (pRspWrapper == NULL) {
H
Haojun Liao 已提交
1278 1279
        tscWarn("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d since out of memory, reqId:0x%" PRIx64,
                tmq->consumerId, vgId, epoch, requestId);
L
Liu Jicong 已提交
1280 1281
        goto CREATE_MSG_FAIL;
      }
H
Haojun Liao 已提交
1282

L
Liu Jicong 已提交
1283 1284
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      taosWriteQitem(tmq->mqueue, pRspWrapper);
X
Xiaoyu Wang 已提交
1285
    } else if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) {  // poll data while insert
1286
      taosMsleep(500);
wmmhello's avatar
wmmhello 已提交
1287 1288 1289
    } else{
      tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d, since %s, reqId:0x%" PRIx64, tmq->consumerId,
               vgId, epoch, tstrerror(code), requestId);
L
Liu Jicong 已提交
1290
    }
H
Haojun Liao 已提交
1291

L
fix txn  
Liu Jicong 已提交
1292
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1293 1294
  }

X
Xiaoyu Wang 已提交
1295
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
1296 1297
  int32_t clientEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < clientEpoch) {
L
Liu Jicong 已提交
1298
    // do not write into queue since updating epoch reset
X
Xiaoyu Wang 已提交
1299 1300
    tscWarn("consumer:0x%" PRIx64
            " msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64,
1301
            tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
H
Haojun Liao 已提交
1302

1303
    tsem_post(&tmq->rspSem);
1304 1305
    taosReleaseRef(tmqMgmt.rsetId, refId);

L
Liu Jicong 已提交
1306
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1307
    taosMemoryFree(pMsg->pEpSet);
X
Xiaoyu Wang 已提交
1308 1309 1310
    return 0;
  }

1311
  if (msgEpoch != clientEpoch) {
H
Haojun Liao 已提交
1312
    tscWarn("consumer:0x%" PRIx64 " mismatch rsp from vgId:%d, epoch %d, current epoch %d, reqId:0x%" PRIx64,
1313
            tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
X
Xiaoyu Wang 已提交
1314 1315
  }

L
Liu Jicong 已提交
1316 1317 1318
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

S
Shengliang Guan 已提交
1319
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1320
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1321
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1322
    taosMemoryFree(pMsg->pEpSet);
X
Xiaoyu Wang 已提交
1323 1324
    tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, epoch %d since out of memory", tmq->consumerId, vgId,
            epoch);
L
fix txn  
Liu Jicong 已提交
1325
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1326
  }
L
Liu Jicong 已提交
1327

L
Liu Jicong 已提交
1328
  pRspWrapper->tmqRspType = rspType;
L
Liu Jicong 已提交
1329 1330
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
H
Haojun Liao 已提交
1331
  pRspWrapper->reqId = requestId;
1332
  pRspWrapper->pEpset = pMsg->pEpSet;
1333
  pRspWrapper->vgId = pVg->vgId;
L
Liu Jicong 已提交
1334

1335
  pMsg->pEpSet = NULL;
L
Liu Jicong 已提交
1336
  if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1337 1338
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
1339
    tDecodeMqDataRsp(&decoder, &pRspWrapper->dataRsp);
wmmhello's avatar
wmmhello 已提交
1340
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1341
    memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1342

H
Haojun Liao 已提交
1343 1344
    char buf[80];
    tFormatOffset(buf, 80, &pRspWrapper->dataRsp.rspOffset);
H
Haojun Liao 已提交
1345
    tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req ver:%" PRId64 ", rsp:%s type %d, reqId:0x%" PRIx64,
H
Haojun Liao 已提交
1346
             tmq->consumerId, vgId, pRspWrapper->dataRsp.reqOffset.version, buf, rspType, requestId);
L
Liu Jicong 已提交
1347
  } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
1348 1349
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
1350
    tDecodeMqMetaRsp(&decoder, &pRspWrapper->metaRsp);
1351
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1352
    memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1353 1354 1355 1356 1357 1358
  } else if (rspType == TMQ_MSG_TYPE__TAOSX_RSP) {
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp);
    tDecoderClear(&decoder);
    memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead));
X
Xiaoyu Wang 已提交
1359 1360
  } else {  // invalid rspType
    tscError("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType);
L
Liu Jicong 已提交
1361
  }
L
Liu Jicong 已提交
1362

L
Liu Jicong 已提交
1363
  taosMemoryFree(pMsg->pData);
H
Haojun Liao 已提交
1364
  taosWriteQitem(tmq->mqueue, pRspWrapper);
L
Liu Jicong 已提交
1365

1366
  int32_t total = taosQueueItemSize(tmq->mqueue);
H
Haojun Liao 已提交
1367
  tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64,
1368
           tmq->consumerId, rspType, vgId, total, requestId);
H
Haojun Liao 已提交
1369

1370
  tsem_post(&tmq->rspSem);
1371 1372
  taosReleaseRef(tmqMgmt.rsetId, refId);

L
Liu Jicong 已提交
1373
  return 0;
H
Haojun Liao 已提交
1374

L
fix txn  
Liu Jicong 已提交
1375
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1376
  if (epoch == tmq->epoch) {
L
Liu Jicong 已提交
1377 1378
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
H
Haojun Liao 已提交
1379

1380
  tsem_post(&tmq->rspSem);
1381 1382
  taosReleaseRef(tmqMgmt.rsetId, refId);

L
Liu Jicong 已提交
1383
  return -1;
1384 1385
}

H
Haojun Liao 已提交
1386 1387 1388 1389 1390
typedef struct SVgroupSaveInfo {
  STqOffsetVal offset;
  int64_t      numOfRows;
} SVgroupSaveInfo;

H
Haojun Liao 已提交
1391 1392 1393 1394 1395 1396
static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap,
                                   tmq_t* tmq) {
  pTopic->schema = pTopicEp->schema;
  pTopicEp->schema.nCols = 0;
  pTopicEp->schema.pSchema = NULL;

X
Xiaoyu Wang 已提交
1397
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
H
Haojun Liao 已提交
1398 1399 1400 1401 1402
  int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);

  tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN);

wmmhello's avatar
wmmhello 已提交
1403
  tscDebug("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet);
H
Haojun Liao 已提交
1404 1405 1406 1407
  pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));

  for (int32_t j = 0; j < vgNumGet; j++) {
    SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
H
Haojun Liao 已提交
1408 1409

    makeTopicVgroupKey(vgKey, pTopic->topicName, pVgEp->vgId);
H
Haojun Liao 已提交
1410
    SVgroupSaveInfo* pInfo = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
H
Haojun Liao 已提交
1411

X
Xiaoyu Wang 已提交
1412 1413
    int64_t      numOfRows = 0;
    STqOffsetVal offsetNew = {.type = tmq->resetOffsetCfg};
H
Haojun Liao 已提交
1414 1415 1416
    if (pInfo != NULL) {
      offsetNew = pInfo->offset;
      numOfRows = pInfo->numOfRows;
H
Haojun Liao 已提交
1417 1418 1419 1420 1421 1422 1423 1424
    }

    SMqClientVg clientVg = {
        .pollCnt = 0,
        .vgId = pVgEp->vgId,
        .epSet = pVgEp->epSet,
        .vgStatus = TMQ_VG_STATUS__IDLE,
        .vgSkipCnt = 0,
H
Haojun Liao 已提交
1425
        .emptyBlockReceiveTs = 0,
H
Haojun Liao 已提交
1426
        .numOfRows = numOfRows,
H
Haojun Liao 已提交
1427 1428
    };

H
Haojun Liao 已提交
1429 1430 1431 1432
    clientVg.offsetInfo.currentOffset = offsetNew;
    clientVg.offsetInfo.committedOffset = offsetNew;
    clientVg.offsetInfo.walVerBegin = -1;
    clientVg.offsetInfo.walVerEnd = -1;
1433 1434 1435
    clientVg.seekUpdated = false;
    clientVg.receivedInfoFromVnode = false;

H
Haojun Liao 已提交
1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448
    taosArrayPush(pTopic->vgs, &clientVg);
  }
}

static void freeClientVgInfo(void* param) {
  SMqClientTopic* pTopic = param;
  if (pTopic->schema.nCols) {
    taosMemoryFreeClear(pTopic->schema.pSchema);
  }

  taosArrayDestroy(pTopic->vgs);
}

1449
static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
1450 1451
  bool set = false;

1452
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
1453
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
1454

X
Xiaoyu Wang 已提交
1455 1456
  char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
  tscDebug("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
1457
           tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur);
wmmhello's avatar
wmmhello 已提交
1458 1459 1460
  if (epoch <= tmq->epoch) {
    return false;
  }
1461 1462 1463 1464 1465 1466

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

H
Haojun Liao 已提交
1467 1468
  SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pVgOffsetHashMap == NULL) {
1469 1470 1471
    taosArrayDestroy(newTopics);
    return false;
  }
1472

H
Haojun Liao 已提交
1473
  // todo extract method
1474 1475 1476 1477 1478
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
wmmhello's avatar
wmmhello 已提交
1479
      tscDebug("consumer:0x%" PRIx64 ", current vg num: %d", tmq->consumerId, vgNumCur);
1480 1481
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
H
Haojun Liao 已提交
1482 1483
        makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId);

L
Liu Jicong 已提交
1484
        char buf[80];
H
Haojun Liao 已提交
1485
        tFormatOffset(buf, 80, &pVgCur->offsetInfo.currentOffset);
X
Xiaoyu Wang 已提交
1486 1487
        tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId,
                 vgKey, buf);
H
Haojun Liao 已提交
1488

H
Haojun Liao 已提交
1489
        SVgroupSaveInfo info = {.offset = pVgCur->offsetInfo.currentOffset, .numOfRows = pVgCur->numOfRows};
H
Haojun Liao 已提交
1490
        taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo));
1491 1492 1493 1494 1495 1496 1497
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
H
Haojun Liao 已提交
1498
    initClientTopicFromRsp(&topic, pTopicEp, pVgOffsetHashMap, tmq);
1499 1500
    taosArrayPush(newTopics, &topic);
  }
1501

H
Haojun Liao 已提交
1502 1503
  taosHashCleanup(pVgOffsetHashMap);

1504
  // destroy current buffered existed topics info
1505
  if (tmq->clientTopics) {
H
Haojun Liao 已提交
1506
    taosArrayDestroyEx(tmq->clientTopics, freeClientVgInfo);
X
Xiaoyu Wang 已提交
1507
  }
H
Haojun Liao 已提交
1508
  tmq->clientTopics = newTopics;
1509

X
Xiaoyu Wang 已提交
1510
  int8_t flag = (topicNumGet == 0) ? TMQ_CONSUMER_STATUS__NO_TOPIC : TMQ_CONSUMER_STATUS__READY;
H
Haojun Liao 已提交
1511
  atomic_store_8(&tmq->status, flag);
X
Xiaoyu Wang 已提交
1512
  atomic_store_32(&tmq->epoch, epoch);
H
Haojun Liao 已提交
1513

1514
  tscDebug("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId);
X
Xiaoyu Wang 已提交
1515 1516 1517
  return set;
}

1518
int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
1519
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
1520 1521 1522
  tmq_t*           tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);

  if (tmq == NULL) {
1523 1524 1525
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    pParam->pUserFn(tmq, terrno, NULL, pParam->pParam);

1526
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1527
    taosMemoryFree(pMsg->pEpSet);
1528 1529
    taosMemoryFree(pParam);
    return terrno;
1530 1531
  }

H
Haojun Liao 已提交
1532
  if (code != TSDB_CODE_SUCCESS) {
1533 1534 1535 1536 1537 1538 1539 1540 1541
    tscError("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code));
    pParam->pUserFn(tmq, code, NULL, pParam->pParam);

    taosReleaseRef(tmqMgmt.rsetId, pParam->refId);

    taosMemoryFree(pMsg->pData);
    taosMemoryFree(pMsg->pEpSet);
    taosMemoryFree(pParam);
    return code;
1542
  }
L
Liu Jicong 已提交
1543

L
Liu Jicong 已提交
1544
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1545
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1546
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1547 1548 1549
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
  if (head->epoch <= epoch) {
1550 1551
    tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep",
             tmq->consumerId, head->epoch, epoch);
1552

1553 1554 1555 1556 1557 1558 1559 1560
    if (tmq->status == TMQ_CONSUMER_STATUS__RECOVER) {
      SMqAskEpRsp rsp;
      tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
      int8_t flag = (taosArrayGetSize(rsp.topics) == 0) ? TMQ_CONSUMER_STATUS__NO_TOPIC : TMQ_CONSUMER_STATUS__READY;
      atomic_store_8(&tmq->status, flag);
      tDeleteSMqAskEpRsp(&rsp);
    }

X
Xiaoyu Wang 已提交
1561
  } else {
1562 1563
    tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId,
             head->epoch, epoch);
1564
  }
L
Liu Jicong 已提交
1565

1566
  pParam->pUserFn(tmq, code, pMsg, pParam->pParam);
1567 1568
  taosReleaseRef(tmqMgmt.rsetId, pParam->refId);

dengyihao's avatar
dengyihao 已提交
1569
  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
1570
  taosMemoryFree(pMsg->pData);
1571
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
1572
  return code;
1573 1574
}

L
Liu Jicong 已提交
1575
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1576 1577 1578 1579
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1580

1581
  pReq->withTbName = tmq->withTbName;
L
Liu Jicong 已提交
1582
  pReq->consumerId = tmq->consumerId;
1583
  pReq->timeout = timeout;
X
Xiaoyu Wang 已提交
1584
  pReq->epoch = tmq->epoch;
H
Haojun Liao 已提交
1585
  pReq->reqOffset = pVg->offsetInfo.currentOffset;
D
dapan1121 已提交
1586
  pReq->head.vgId = pVg->vgId;
1587 1588
  pReq->useSnapshot = tmq->useSnapshot;
  pReq->reqId = generateRequestId();
1589 1590
}

L
Liu Jicong 已提交
1591 1592
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
L
Liu Jicong 已提交
1593
  pRspObj->resType = RES_TYPE__TMQ_META;
L
Liu Jicong 已提交
1594 1595 1596 1597 1598 1599 1600 1601
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

1602
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
L
Liu Jicong 已提交
1603 1604
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
1605

1606
  (*numOfRows) = 0;
L
Liu Jicong 已提交
1607 1608
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
1609

L
Liu Jicong 已提交
1610
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1611
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1612
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
L
Liu Jicong 已提交
1613

L
Liu Jicong 已提交
1614 1615
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
1616

L
Liu Jicong 已提交
1617
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1618 1619
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1620

1621
  // extract the rows in this data packet
X
Xiaoyu Wang 已提交
1622
  for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) {
1623
    SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, i);
X
Xiaoyu Wang 已提交
1624
    int64_t            rows = htobe64(pRetrieve->numOfRows);
1625
    pVg->numOfRows += rows;
1626
    (*numOfRows) += rows;
1627 1628
  }

L
Liu Jicong 已提交
1629
  return pRspObj;
X
Xiaoyu Wang 已提交
1630 1631
}

L
Liu Jicong 已提交
1632 1633
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
1634
  pRspObj->resType = RES_TYPE__TMQ_METADATA;
L
Liu Jicong 已提交
1635 1636 1637 1638
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;
  pRspObj->resIter = -1;
1639
  memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp));
L
Liu Jicong 已提交
1640 1641 1642

  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
1643
  if (!pWrapper->taosxRsp.withSchema) {
L
Liu Jicong 已提交
1644 1645 1646 1647 1648 1649
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }

  return pRspObj;
}

1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682
static int32_t handleErrorBeforePoll(SMqClientVg* pVg, tmq_t* pTmq) {
  atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  tsem_post(&pTmq->rspSem);
  return -1;
}

static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg, int64_t timeout) {
  SMqPollReq req = {0};
  tmqBuildConsumeReqImpl(&req, pTmq, timeout, pTopic, pVg);

  int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
  if (msgSize < 0) {
    return handleErrorBeforePoll(pVg, pTmq);
  }

  char* msg = taosMemoryCalloc(1, msgSize);
  if (NULL == msg) {
    return handleErrorBeforePoll(pVg, pTmq);
  }

  if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
    taosMemoryFree(msg);
    return handleErrorBeforePoll(pVg, pTmq);
  }

  SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
  if (pParam == NULL) {
    taosMemoryFree(msg);
    return handleErrorBeforePoll(pVg, pTmq);
  }

  pParam->refId = pTmq->refId;
  pParam->epoch = pTmq->epoch;
H
Haojun Liao 已提交
1683
  pParam->pVg = pVg;
1684 1685
  pParam->pTopic = pTopic;
  pParam->vgId = pVg->vgId;
H
Haojun Liao 已提交
1686
  pParam->requestId = req.reqId;
1687 1688 1689 1690 1691 1692 1693 1694

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pParam);
    taosMemoryFree(msg);
    return handleErrorBeforePoll(pVg, pTmq);
  }

H
Haojun Liao 已提交
1695
  sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
1696 1697 1698 1699 1700 1701 1702 1703
  sendInfo->requestId = req.reqId;
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqPollCb;
  sendInfo->msgType = TDMT_VND_TMQ_CONSUME;

  int64_t transporterId = 0;
  char    offsetFormatBuf[80];
H
Haojun Liao 已提交
1704
  tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.currentOffset);
1705

X
Xiaoyu Wang 已提交
1706 1707
  tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId,
           pTopic->topicName, pVg->vgId, pTmq->epoch, offsetFormatBuf, req.reqId);
1708 1709 1710
  asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);

  pVg->pollCnt++;
1711
  pVg->seekUpdated = false;   // reset this flag.
1712 1713 1714 1715 1716
  pTmq->pollCnt++;

  return TSDB_CODE_SUCCESS;
}

1717
// broadcast the poll request to all related vnodes
H
Haojun Liao 已提交
1718
static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
1719
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
X
Xiaoyu Wang 已提交
1720
  tscDebug("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics);
1721 1722

  for (int i = 0; i < numOfTopics; i++) {
X
Xiaoyu Wang 已提交
1723
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
X
Xiaoyu Wang 已提交
1724
    int32_t         numOfVg = taosArrayGetSize(pTopic->vgs);
1725 1726

    for (int j = 0; j < numOfVg; j++) {
X
Xiaoyu Wang 已提交
1727
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
X
Xiaoyu Wang 已提交
1728 1729 1730
      if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) {  // less than 100ms
        tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId,
                 tmq->epoch, pVg->vgId);
H
Haojun Liao 已提交
1731 1732 1733
        continue;
      }

1734
      int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
1735
      if (vgStatus == TMQ_VG_STATUS__WAIT) {
L
Liu Jicong 已提交
1736
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
H
Haojun Liao 已提交
1737
        tscTrace("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
X
Xiaoyu Wang 已提交
1738
                 pVg->vgId, vgSkipCnt);
X
Xiaoyu Wang 已提交
1739 1740
        continue;
      }
1741

L
Liu Jicong 已提交
1742
      atomic_store_32(&pVg->vgSkipCnt, 0);
1743 1744 1745
      int32_t code = doTmqPollImpl(tmq, pTopic, pVg, timeout);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
D
dapan1121 已提交
1746
      }
X
Xiaoyu Wang 已提交
1747 1748
    }
  }
1749

1750
  tscDebug("consumer:0x%" PRIx64 " end to poll data", tmq->consumerId);
X
Xiaoyu Wang 已提交
1751 1752 1753
  return 0;
}

H
Haojun Liao 已提交
1754
static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
L
Liu Jicong 已提交
1755
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1756
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1757 1758
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1759
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
1760
      doUpdateLocalEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1761
      /*tmqClearUnhandleMsg(tmq);*/
L
Liu Jicong 已提交
1762
      tDeleteSMqAskEpRsp(rspMsg);
X
Xiaoyu Wang 已提交
1763 1764
      *pReset = true;
    } else {
L
Liu Jicong 已提交
1765
      tmqFreeRspWrapper(rspWrapper);
X
Xiaoyu Wang 已提交
1766 1767 1768 1769 1770 1771 1772 1773
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

H
Haojun Liao 已提交
1774
static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
H
Haojun Liao 已提交
1775
  tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems);
1776

X
Xiaoyu Wang 已提交
1777
  while (1) {
1778 1779
    SMqRspWrapper* pRspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&pRspWrapper);
1780

1781
    if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1782
      taosReadAllQitems(tmq->mqueue, tmq->qall);
1783 1784
      taosGetQitem(tmq->qall, (void**)&pRspWrapper);
      if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1785 1786
        return NULL;
      }
X
Xiaoyu Wang 已提交
1787 1788
    }

X
Xiaoyu Wang 已提交
1789
    tscDebug("consumer:0x%" PRIx64 " handle rsp, type:%d", tmq->consumerId, pRspWrapper->tmqRspType);
H
Haojun Liao 已提交
1790

1791 1792
    if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(pRspWrapper);
L
Liu Jicong 已提交
1793
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
H
Haojun Liao 已提交
1794
      tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(terrno));
L
Liu Jicong 已提交
1795
      return NULL;
1796 1797
    } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
H
Haojun Liao 已提交
1798

X
Xiaoyu Wang 已提交
1799
      int32_t     consumerEpoch = atomic_load_32(&tmq->epoch);
1800 1801 1802
      SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp;

      if (pDataRsp->head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1803
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
1804 1805 1806 1807 1808

        // update the epset
        if (pollRspWrapper->pEpset != NULL) {
          SEp* pEp = GET_ACTIVE_EP(pollRspWrapper->pEpset);
          SEp* pOld = GET_ACTIVE_EP(&(pVg->epSet));
X
Xiaoyu Wang 已提交
1809 1810
          tscDebug("consumer:0x%" PRIx64 " update epset vgId:%d, ep:%s:%d, old ep:%s:%d", tmq->consumerId, pVg->vgId,
                   pEp->fqdn, pEp->port, pOld->fqdn, pOld->port);
1811 1812 1813
          pVg->epSet = *pollRspWrapper->pEpset;
        }

1814 1815 1816 1817 1818
        // update the local offset value only for the returned values, only when the local offset is NOT updated
        // by tmq_offset_seek function
        if (!pVg->seekUpdated) {
          pVg->offsetInfo.currentOffset = pDataRsp->rspOffset;
        }
1819 1820

        // update the status
X
Xiaoyu Wang 已提交
1821
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
H
Haojun Liao 已提交
1822

1823 1824 1825
        // update the valid wal version range
        pVg->offsetInfo.walVerBegin = pDataRsp->head.walsver;
        pVg->offsetInfo.walVerEnd = pDataRsp->head.walever;
1826
        pVg->receivedInfoFromVnode = true;
1827

1828 1829 1830
        char buf[80];
        tFormatOffset(buf, 80, &pDataRsp->rspOffset);
        if (pDataRsp->blockNum == 0) {
X
Xiaoyu Wang 已提交
1831 1832 1833
          tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64
                   " total:%" PRId64 " reqId:0x%" PRIx64,
                   tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
1834
          pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
1835
          pVg->emptyBlockReceiveTs = taosGetTimestampMs();
L
Liu Jicong 已提交
1836
          taosFreeQitem(pollRspWrapper);
1837
        } else {  // build rsp
X
Xiaoyu Wang 已提交
1838
          int64_t    numOfRows = 0;
1839
          SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
H
Haojun Liao 已提交
1840
          tmq->totalRows += numOfRows;
1841
          pVg->emptyBlockReceiveTs = 0;
H
Haojun Liao 已提交
1842
          tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
H
Haojun Liao 已提交
1843
                   " vg total:%" PRId64 " total:%" PRId64 ", reqId:0x%" PRIx64,
H
Haojun Liao 已提交
1844
                   tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
H
Haojun Liao 已提交
1845
                   pollRspWrapper->reqId);
1846 1847 1848
          taosFreeQitem(pollRspWrapper);
          return pRsp;
        }
X
Xiaoyu Wang 已提交
1849
      } else {
H
Haojun Liao 已提交
1850
        tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
1851
                 tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch);
1852
        pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
L
Liu Jicong 已提交
1853 1854
        taosFreeQitem(pollRspWrapper);
      }
1855
    } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
1856
      // todo handle the wal range and epset for each vgroup
1857
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
L
Liu Jicong 已提交
1858
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
1859 1860 1861

      tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId);

L
Liu Jicong 已提交
1862
      if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1863
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
1864
        if (!pVg->seekUpdated) {
H
Haojun Liao 已提交
1865
          pVg->offsetInfo.currentOffset = pollRspWrapper->metaRsp.rspOffset;
1866
        }
1867

L
Liu Jicong 已提交
1868 1869
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // build rsp
L
Liu Jicong 已提交
1870
        SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1871 1872 1873
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
H
Haojun Liao 已提交
1874
        tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
1875
                 tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
1876
        pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
L
Liu Jicong 已提交
1877
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1878
      }
1879 1880
    } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
X
Xiaoyu Wang 已提交
1881
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
H
Haojun Liao 已提交
1882

L
Liu Jicong 已提交
1883 1884
      if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
1885
        if (!pVg->seekUpdated) {  // if offset is validate
H
Haojun Liao 已提交
1886
          pVg->offsetInfo.currentOffset = pollRspWrapper->taosxRsp.rspOffset;
1887
        }
1888

L
Liu Jicong 已提交
1889
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
H
Haojun Liao 已提交
1890

L
Liu Jicong 已提交
1891
        if (pollRspWrapper->taosxRsp.blockNum == 0) {
H
Haojun Liao 已提交
1892 1893
          tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 " reqId:0x%" PRIx64,
                   tmq->consumerId, pVg->vgId, pVg->numOfRows, pollRspWrapper->reqId);
H
Haojun Liao 已提交
1894
          pVg->emptyBlockReceiveTs = taosGetTimestampMs();
1895
          pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
H
Haojun Liao 已提交
1896
          taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1897
          continue;
H
Haojun Liao 已提交
1898
        } else {
X
Xiaoyu Wang 已提交
1899
          pVg->emptyBlockReceiveTs = 0;  // reset the ts
L
Liu Jicong 已提交
1900
        }
wmmhello's avatar
wmmhello 已提交
1901

L
Liu Jicong 已提交
1902
        // build rsp
X
Xiaoyu Wang 已提交
1903
        void*   pRsp = NULL;
1904
        int64_t numOfRows = 0;
L
Liu Jicong 已提交
1905
        if (pollRspWrapper->taosxRsp.createTableNum == 0) {
1906
          pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
L
Liu Jicong 已提交
1907
        } else {
wmmhello's avatar
wmmhello 已提交
1908 1909
          pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper);
        }
H
Haojun Liao 已提交
1910

1911 1912
        tmq->totalRows += numOfRows;

H
Haojun Liao 已提交
1913
        char buf[80];
H
Haojun Liao 已提交
1914
        tFormatOffset(buf, 80, &pVg->offsetInfo.currentOffset);
H
Haojun Liao 已提交
1915
        tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
X
Xiaoyu Wang 已提交
1916
                 ", vg total:%" PRId64 " total:%" PRId64 " reqId:0x%" PRIx64,
H
Haojun Liao 已提交
1917
                 tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows,
H
Haojun Liao 已提交
1918
                 tmq->totalRows, pollRspWrapper->reqId);
H
Haojun Liao 已提交
1919 1920

        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1921
        return pRsp;
H
Haojun Liao 已提交
1922

L
Liu Jicong 已提交
1923
      } else {
H
Haojun Liao 已提交
1924
        tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
1925
                 tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
1926
        pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
L
Liu Jicong 已提交
1927 1928
        taosFreeQitem(pollRspWrapper);
      }
X
Xiaoyu Wang 已提交
1929
    } else {
H
Haojun Liao 已提交
1930 1931
      tscDebug("consumer:0x%" PRIx64 " not data msg received", tmq->consumerId);

X
Xiaoyu Wang 已提交
1932
      bool reset = false;
1933 1934
      tmqHandleNoPollRsp(tmq, pRspWrapper, &reset);
      taosFreeQitem(pRspWrapper);
X
Xiaoyu Wang 已提交
1935
      if (pollIfReset && reset) {
1936
        tscDebug("consumer:0x%" PRIx64 ", reset and repoll", tmq->consumerId);
1937
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
1938 1939 1940 1941 1942
      }
    }
  }
}

1943
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1944 1945
  void*   rspObj;
  int64_t startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1946

X
Xiaoyu Wang 已提交
1947 1948
  tscDebug("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime,
           timeout);
L
Liu Jicong 已提交
1949

1950
  // in no topic status, delayed task also need to be processed
L
Liu Jicong 已提交
1951
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1952
    tscDebug("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId);
1953
    taosMsleep(500);  //     sleep for a while
1954 1955 1956
    return NULL;
  }

wmmhello's avatar
wmmhello 已提交
1957
  while (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
L
Liu Jicong 已提交
1958
    int32_t retryCnt = 0;
1959
    while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) {
H
Haojun Liao 已提交
1960
      if (retryCnt++ > 40) {
L
Liu Jicong 已提交
1961 1962
        return NULL;
      }
1963

H
Haojun Liao 已提交
1964
      tscDebug("consumer:0x%" PRIx64 " not ready, retry:%d/40 in 500ms", tmq->consumerId, retryCnt);
L
Liu Jicong 已提交
1965 1966 1967 1968
      taosMsleep(500);
    }
  }

X
Xiaoyu Wang 已提交
1969
  while (1) {
L
Liu Jicong 已提交
1970
    tmqHandleAllDelayedTask(tmq);
1971

L
Liu Jicong 已提交
1972
    if (tmqPollImpl(tmq, timeout) < 0) {
1973
      tscDebug("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId);
L
Liu Jicong 已提交
1974
    }
L
Liu Jicong 已提交
1975

1976
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1977
    if (rspObj) {
1978
      tscDebug("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj);
L
Liu Jicong 已提交
1979
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
1980
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
1981
      tscDebug("consumer:0x%" PRIx64 " return null since no committed offset", tmq->consumerId);
L
Liu Jicong 已提交
1982
      return NULL;
X
Xiaoyu Wang 已提交
1983
    }
1984

1985
    if (timeout >= 0) {
L
Liu Jicong 已提交
1986
      int64_t currentTime = taosGetTimestampMs();
1987 1988 1989
      int64_t elapsedTime = currentTime - startTime;
      if (elapsedTime > timeout) {
        tscDebug("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
L
Liu Jicong 已提交
1990
                 tmq->consumerId, tmq->epoch, startTime, currentTime);
X
Xiaoyu Wang 已提交
1991 1992
        return NULL;
      }
1993
      tsem_timewait(&tmq->rspSem, (timeout - elapsedTime));
L
Liu Jicong 已提交
1994 1995
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
L
Liu Jicong 已提交
1996
      tsem_timewait(&tmq->rspSem, 1000);
X
Xiaoyu Wang 已提交
1997 1998 1999 2000
    }
  }
}

2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014
static void displayConsumeStatistics(const tmq_t* pTmq) {
  int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics);
  tscDebug("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d",
           pTmq->consumerId, pTmq->pollCnt, pTmq->totalRows, numOfTopics, pTmq->epoch);

  tscDebug("consumer:0x%" PRIx64 " rows dist begin: ", pTmq->consumerId);
  for (int32_t i = 0; i < numOfTopics; ++i) {
    SMqClientTopic* pTopics = taosArrayGet(pTmq->clientTopics, i);

    tscDebug("consumer:0x%" PRIx64 " topic:%d", pTmq->consumerId, i);
    int32_t numOfVgs = taosArrayGetSize(pTopics->vgs);
    for (int32_t j = 0; j < numOfVgs; ++j) {
      SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j);
      tscDebug("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows);
2015
    }
2016
  }
2017

2018 2019
  tscDebug("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId);
}
2020

2021
int32_t tmq_consumer_close(tmq_t* tmq) {
X
Xiaoyu Wang 已提交
2022
  tscDebug("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status);
2023
  displayConsumeStatistics(tmq);
2024

2025 2026 2027 2028 2029 2030
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
    // if auto commit is set, commit before close consumer. Otherwise, do nothing.
    if (tmq->autoCommit) {
      int32_t rsp = tmq_commit_sync(tmq, NULL);
      if (rsp != 0) {
        return rsp;
2031 2032 2033
      }
    }

L
Liu Jicong 已提交
2034
    int32_t     retryCnt = 0;
2035
    tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
2036
    while (1) {
2037
      int32_t rsp = tmq_subscribe(tmq, lst);
L
Liu Jicong 已提交
2038 2039 2040 2041 2042 2043 2044 2045
      if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
        break;
      } else {
        retryCnt++;
        taosMsleep(500);
      }
    }

2046
    tmq_list_destroy(lst);
2047
  } else {
X
Xiaoyu Wang 已提交
2048
    tscWarn("consumer:0x%" PRIx64 " not in ready state, close it directly", tmq->consumerId);
L
Liu Jicong 已提交
2049
  }
H
Haojun Liao 已提交
2050

2051
  taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
L
Liu Jicong 已提交
2052
  return 0;
2053
}
L
Liu Jicong 已提交
2054

L
Liu Jicong 已提交
2055 2056
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
2057
    return "success";
L
Liu Jicong 已提交
2058
  } else if (err == -1) {
L
Liu Jicong 已提交
2059 2060 2061
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
2062 2063
  }
}
L
Liu Jicong 已提交
2064

L
Liu Jicong 已提交
2065 2066 2067 2068 2069
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
    return TMQ_RES_TABLE_META;
2070 2071
  } else if (TD_RES_TMQ_METADATA(res)) {
    return TMQ_RES_METADATA;
L
Liu Jicong 已提交
2072 2073 2074 2075 2076
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
2077
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
2078 2079
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
2080
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
2081 2082 2083
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
2084 2085 2086
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
2087 2088 2089 2090 2091
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
2092 2093 2094 2095
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
2096 2097 2098
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
2099 2100 2101
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
2102 2103 2104 2105 2106
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
2107 2108 2109 2110
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
2111 2112 2113
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
2114
  } else if (TD_RES_TMQ_METADATA(res)) {
2115 2116
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
2117 2118 2119 2120
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
2121

2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144
int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*) res;
    STqOffsetVal* pOffset = &pRspObj->rsp.rspOffset;
    if (pOffset->type == TMQ_OFFSET__LOG) {
      return pRspObj->rsp.rspOffset.version;
    }
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pRspObj = (SMqMetaRspObj*)res;
    if (pRspObj->metaRsp.rspOffset.type == TMQ_OFFSET__LOG) {
      return pRspObj->metaRsp.rspOffset.version;
    }
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*) res;
    if (pRspObj->rsp.rspOffset.type == TMQ_OFFSET__LOG) {
      return pRspObj->rsp.rspOffset.version;
    }
  }

  // data from tsdb, no valid offset info
  return -1;
}

L
Liu Jicong 已提交
2145 2146 2147 2148 2149 2150 2151
const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
2152
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
2153 2154
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
L
Liu Jicong 已提交
2155 2156 2157
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
2158
    }
L
Liu Jicong 已提交
2159 2160
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
  }
L
Liu Jicong 已提交
2161 2162
  return NULL;
}
2163

2164 2165 2166 2167
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) {
  if (pRes == NULL) {  // here needs to commit all offsets.
    asyncCommitAllOffsets(tmq, cb, param);
  } else {  // only commit one offset
2168
    asyncCommitOffset(tmq, pRes, TDMT_VND_TMQ_COMMIT_OFFSET, cb, param);
2169
  }
L
Liu Jicong 已提交
2170 2171
}

2172
static void commitCallBackFn(tmq_t *UNUSED_PARAM(tmq), int32_t code, void* param) {
2173 2174 2175
  SSyncCommitInfo* pInfo = (SSyncCommitInfo*) param;
  pInfo->code = code;
  tsem_post(&pInfo->sem);
2176
}
2177

2178 2179 2180 2181 2182 2183 2184 2185 2186
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
  int32_t code = 0;

  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
  tsem_init(&pInfo->sem, 0, 0);
  pInfo->code = 0;

  if (pRes == NULL) {
    asyncCommitAllOffsets(tmq, commitCallBackFn, pInfo);
H
Haojun Liao 已提交
2187
  } else {
2188
    asyncCommitOffset(tmq, pRes, TDMT_VND_TMQ_COMMIT_OFFSET, commitCallBackFn, pInfo);
2189 2190
  }

2191 2192
  tsem_wait(&pInfo->sem);
  code = pInfo->code;
H
Haojun Liao 已提交
2193 2194

  tsem_destroy(&pInfo->sem);
2195 2196
  taosMemoryFree(pInfo);

X
Xiaoyu Wang 已提交
2197
  tscDebug("consumer:0x%" PRIx64 " sync commit done, code:%s", tmq->consumerId, tstrerror(code));
2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209
  return code;
}

void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
  SAskEpInfo* pInfo = param;
  pInfo->code = code;

  if (code == TSDB_CODE_SUCCESS) {
    SMqRspHead* head = pDataBuf->pData;

    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &rsp);
2210
    doUpdateLocalEp(pTmq, head->epoch, &rsp);
2211 2212 2213
    tDeleteSMqAskEpRsp(&rsp);
  }

H
Haojun Liao 已提交
2214
  tsem_post(&pInfo->sem);
2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240
}

void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return;
  }

  SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0);
  if (pWrapper == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return;
  }

  SMqRspHead* head = pDataBuf->pData;

  pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
  pWrapper->epoch = head->epoch;
  memcpy(&pWrapper->msg, pDataBuf->pData, sizeof(SMqRspHead));
  tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &pWrapper->msg);

  taosWriteQitem(pTmq->mqueue, pWrapper);
}

int32_t doAskEp(tmq_t* pTmq) {
  SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo));
H
Haojun Liao 已提交
2241
  tsem_init(&pInfo->sem, 0, 0);
2242 2243

  asyncAskEp(pTmq, updateEpCallbackFn, pInfo);
H
Haojun Liao 已提交
2244
  tsem_wait(&pInfo->sem);
2245 2246

  int32_t code = pInfo->code;
H
Haojun Liao 已提交
2247
  tsem_destroy(&pInfo->sem);
2248 2249 2250 2251 2252
  taosMemoryFree(pInfo);
  return code;
}

void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) {
2253
  SMqAskEpReq req = {0};
2254 2255 2256
  req.consumerId = pTmq->consumerId;
  req.epoch = pTmq->epoch;
  strcpy(req.cgroup, pTmq->groupId);
2257 2258 2259

  int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
  if (tlen < 0) {
2260 2261 2262
    tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", pTmq->consumerId);
    askEpFn(pTmq, TSDB_CODE_INVALID_PARA, NULL, param);
    return;
2263 2264 2265 2266
  }

  void* pReq = taosMemoryCalloc(1, tlen);
  if (pReq == NULL) {
2267 2268 2269
    tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", pTmq->consumerId, tlen);
    askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
    return;
2270 2271 2272
  }

  if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
2273
    tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen);
2274
    taosMemoryFree(pReq);
2275 2276 2277

    askEpFn(pTmq, TSDB_CODE_INVALID_PARA, NULL, param);
    return;
2278 2279 2280 2281
  }

  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
  if (pParam == NULL) {
2282
    tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId);
2283
    taosMemoryFree(pReq);
2284 2285 2286

    askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
    return;
2287 2288
  }

2289 2290 2291 2292
  pParam->refId = pTmq->refId;
  pParam->epoch = pTmq->epoch;
  pParam->pUserFn = askEpFn;
  pParam->pParam = param;
2293 2294 2295 2296 2297

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pParam);
    taosMemoryFree(pReq);
2298 2299
    askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
    return;
2300 2301
  }

X
Xiaoyu Wang 已提交
2302
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
2303 2304 2305 2306

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
2307
  sendInfo->fp = askEpCallbackFn;
2308 2309
  sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;

2310 2311
  SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
  tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
2312 2313

  int64_t transporterId = 0;
2314
  asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
2315 2316 2317 2318 2319 2320 2321
}

int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
2322 2323 2324
  int64_t refId = pParamSet->refId;

  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
2325 2326 2327 2328 2329 2330 2331
  if (tmq == NULL) {
    taosMemoryFree(pParamSet);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  // if no more waiting rsp
2332
  pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam);
2333
  taosMemoryFree(pParamSet);
2334 2335

  taosReleaseRef(tmqMgmt.rsetId, refId);
2336
  return 0;
2337 2338
}

2339
void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) {
2340 2341
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
  if (waitingRspNum == 0) {
H
Haojun Liao 已提交
2342 2343
    tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic,
             vgId);
2344
    tmqCommitDone(pParamSet);
H
Haojun Liao 已提交
2345 2346 2347
  } else {
    tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId,
             waitingRspNum);
2348 2349
  }
}
2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371

SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) {
  SMqRspObj* pRspObj = (SMqRspObj*)res;
  pRspObj->resIter++;

  if (pRspObj->resIter < pRspObj->rsp.blockNum) {
    SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter);
    if (pRspObj->rsp.withSchema) {
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRspObj->rsp.blockSchema, pRspObj->resIter);
      setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols);
      taosMemoryFreeClear(pRspObj->resInfo.row);
      taosMemoryFreeClear(pRspObj->resInfo.pCol);
      taosMemoryFreeClear(pRspObj->resInfo.length);
      taosMemoryFreeClear(pRspObj->resInfo.convertBuf);
      taosMemoryFreeClear(pRspObj->resInfo.convertJson);
    }

    setQueryResultFromRsp(&pRspObj->resInfo, pRetrieve, convertUcs4, false);
    return &pRspObj->resInfo;
  }

  return NULL;
H
Haojun Liao 已提交
2372 2373
}

2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394
static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
  SMqVgWalInfoParam* pParam = param;
  SMqVgCommon* pCommon = pParam->pCommon;

  int32_t total = atomic_add_fetch_32(&pCommon->numOfRsp, 1);
  if (code != TSDB_CODE_SUCCESS) {
    tscError("consumer:0x%" PRIx64 " failed to get the wal info from vgId:%d for topic:%s", pCommon->consumerId,
             pParam->vgId, pCommon->pTopicName);
    pCommon->code = code;
  } else {
    SMqDataRsp rsp;
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeMqDataRsp(&decoder, &rsp);
    tDecoderClear(&decoder);

    SMqRspHead* pHead = pMsg->pData;

    tmq_topic_assignment assignment = {.begin = pHead->walsver,
                                       .end = pHead->walever,
                                       .currentOffset = rsp.rspOffset.version,
2395
                                       .vgId = pParam->vgId};
2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417

    taosThreadMutexLock(&pCommon->mutex);
    taosArrayPush(pCommon->pList, &assignment);
    taosThreadMutexUnlock(&pCommon->mutex);
  }

  if (total == pParam->totalReq) {
    tsem_post(&pCommon->rsp);
  }

  taosMemoryFree(pParam);
  return 0;
}

static void destroyCommonInfo(SMqVgCommon* pCommon) {
  taosArrayDestroy(pCommon->pList);
  tsem_destroy(&pCommon->rsp);
  taosThreadMutexDestroy(&pCommon->mutex);
  taosMemoryFree(pCommon->pTopicName);
  taosMemoryFree(pCommon);
}

H
Haojun Liao 已提交
2418
int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment,
H
Haojun Liao 已提交
2419
                                 int32_t* numOfAssignment) {
H
Haojun Liao 已提交
2420 2421 2422
  *numOfAssignment = 0;
  *assignment = NULL;

2423
  int32_t accId = tmq->pTscObj->acctId;
2424
  char    tname[128] = {0};
2425 2426 2427
  sprintf(tname, "%d.%s", accId, pTopicName);

  SMqClientTopic* pTopic = getTopicByName(tmq, tname);
H
Haojun Liao 已提交
2428 2429 2430 2431 2432 2433
  if (pTopic == NULL) {
    return TSDB_CODE_INVALID_PARA;
  }

  // in case of snapshot is opened, no valid offset will return
  *numOfAssignment = taosArrayGetSize(pTopic->vgs);
2434 2435 2436 2437 2438 2439 2440 2441

  *assignment = taosMemoryCalloc(*numOfAssignment, sizeof(tmq_topic_assignment));
  if (*assignment == NULL) {
    tscError("consumer:0x%" PRIx64 " failed to malloc buffer, size:%" PRIzu, tmq->consumerId,
             (*numOfAssignment) * sizeof(tmq_topic_assignment));
    return TSDB_CODE_OUT_OF_MEMORY;
  }

2442 2443
  bool needFetch = false;

H
Haojun Liao 已提交
2444 2445
  for (int32_t j = 0; j < (*numOfAssignment); ++j) {
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
2446
    if (!pClientVg->receivedInfoFromVnode) {
2447 2448 2449
      needFetch = true;
      break;
    }
H
Haojun Liao 已提交
2450 2451 2452 2453 2454 2455 2456 2457 2458 2459

    tmq_topic_assignment* pAssignment = &(*assignment)[j];
    if (pClientVg->offsetInfo.currentOffset.type == TMQ_OFFSET__LOG) {
      pAssignment->currentOffset = pClientVg->offsetInfo.currentOffset.version;
    } else {
      pAssignment->currentOffset = 0;
    }

    pAssignment->begin = pClientVg->offsetInfo.walVerBegin;
    pAssignment->end = pClientVg->offsetInfo.walVerEnd;
2460
    pAssignment->vgId = pClientVg->vgId;
H
Haojun Liao 已提交
2461 2462
  }

2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544
  if (needFetch) {
    SMqVgCommon* pCommon = taosMemoryCalloc(1, sizeof(SMqVgCommon));
    if (pCommon == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return terrno;
    }

    pCommon->pList= taosArrayInit(4, sizeof(tmq_topic_assignment));
    tsem_init(&pCommon->rsp, 0, 0);
    taosThreadMutexInit(&pCommon->mutex, 0);
    pCommon->pTopicName = taosStrdup(pTopic->topicName);
    pCommon->consumerId = tmq->consumerId;

    terrno = TSDB_CODE_OUT_OF_MEMORY;
    for (int32_t i = 0; i < (*numOfAssignment); ++i) {
      SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);

      SMqVgWalInfoParam* pParam = taosMemoryMalloc(sizeof(SMqVgWalInfoParam));
      if (pParam == NULL) {
        destroyCommonInfo(pCommon);
        return terrno;
      }

      pParam->epoch = tmq->epoch;
      pParam->vgId = pClientVg->vgId;
      pParam->totalReq = *numOfAssignment;
      pParam->pCommon = pCommon;

      SMqPollReq req = {0};
      tmqBuildConsumeReqImpl(&req, tmq, 10, pTopic, pClientVg);

      int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
      if (msgSize < 0) {
        taosMemoryFree(pParam);
        destroyCommonInfo(pCommon);
        return terrno;
      }

      char* msg = taosMemoryCalloc(1, msgSize);
      if (NULL == msg) {
        taosMemoryFree(pParam);
        destroyCommonInfo(pCommon);
        return terrno;
      }

      if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
        taosMemoryFree(msg);
        taosMemoryFree(pParam);
        destroyCommonInfo(pCommon);
        return terrno;
      }

      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
      if (sendInfo == NULL) {
        taosMemoryFree(pParam);
        taosMemoryFree(msg);
        destroyCommonInfo(pCommon);
        return terrno;
      }

      sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
      sendInfo->requestId = req.reqId;
      sendInfo->requestObjRefId = 0;
      sendInfo->param = pParam;
      sendInfo->fp = tmqGetWalInfoCb;
      sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO;

      int64_t transporterId = 0;
      char    offsetFormatBuf[80];
      tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.currentOffset);

      tscDebug("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64,
               tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo);
    }

    tsem_wait(&pCommon->rsp);
    int32_t code = pCommon->code;

    terrno = code;
    if (code != TSDB_CODE_SUCCESS) {
      taosMemoryFree(*assignment);
2545
      *assignment = NULL;
2546 2547 2548 2549 2550 2551 2552 2553 2554
      *numOfAssignment = 0;
    } else {
      int32_t num = taosArrayGetSize(pCommon->pList);
      for(int32_t i = 0; i < num; ++i) {
        (*assignment)[i] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i);
      }
      *numOfAssignment = num;
    }

2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579
    for (int32_t j = 0; j < (*numOfAssignment); ++j) {
      tmq_topic_assignment* p = &(*assignment)[j];

      for(int32_t i = 0; i < taosArrayGetSize(pTopic->vgs); ++i) {
        SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
        if (pClientVg->vgId != p->vgId) {
          continue;
        }

        SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;

        pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG;

        char offsetBuf[80] = {0};
        tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset);

        tscDebug("vgId:%d offset is update to:%s", p->vgId, offsetBuf);

        pOffsetInfo->walVerBegin = p->begin;
        pOffsetInfo->walVerEnd = p->end;
        pOffsetInfo->currentOffset.version = p->currentOffset;
        pOffsetInfo->committedOffset.version = p->currentOffset;
      }
    }

2580 2581 2582 2583 2584
    destroyCommonInfo(pCommon);
    return code;
  } else {
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
2585 2586
}

2587
int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
H
Haojun Liao 已提交
2588
  if (tmq == NULL) {
H
Haojun Liao 已提交
2589
    tscError("invalid tmq handle, null");
H
Haojun Liao 已提交
2590 2591 2592
    return TSDB_CODE_INVALID_PARA;
  }

2593 2594 2595 2596 2597
  int32_t accId = tmq->pTscObj->acctId;
  char tname[128] = {0};
  sprintf(tname, "%d.%s", accId, pTopicName);

  SMqClientTopic* pTopic = getTopicByName(tmq, tname);
H
Haojun Liao 已提交
2598
  if (pTopic == NULL) {
2599
    tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
H
Haojun Liao 已提交
2600 2601 2602 2603
    return TSDB_CODE_INVALID_PARA;
  }

  SMqClientVg* pVg = NULL;
H
Haojun Liao 已提交
2604 2605
  int32_t      numOfVgs = taosArrayGetSize(pTopic->vgs);
  for (int32_t i = 0; i < numOfVgs; ++i) {
H
Haojun Liao 已提交
2606
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
2607
    if (pClientVg->vgId == vgId) {
H
Haojun Liao 已提交
2608 2609 2610 2611 2612 2613
      pVg = pClientVg;
      break;
    }
  }

  if (pVg == NULL) {
2614
    tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgId);
H
Haojun Liao 已提交
2615 2616 2617 2618 2619 2620 2621
    return TSDB_CODE_INVALID_PARA;
  }

  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;

  int32_t type = pOffsetInfo->currentOffset.type;
  if (type != TMQ_OFFSET__LOG) {
2622
    tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type);
H
Haojun Liao 已提交
2623 2624 2625
    return TSDB_CODE_INVALID_PARA;
  }

H
Haojun Liao 已提交
2626
  if (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd) {
H
Haojun Liao 已提交
2627 2628
    tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]",
             tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd);
H
Haojun Liao 已提交
2629 2630 2631
    return TSDB_CODE_INVALID_PARA;
  }

H
Haojun Liao 已提交
2632 2633 2634
  // update the offset, and then commit to vnode
  if (pOffsetInfo->currentOffset.type == TMQ_OFFSET__LOG) {
    pOffsetInfo->currentOffset.version = offset;
2635
    pOffsetInfo->committedOffset.version = INT64_MIN;
2636
    pVg->seekUpdated = true;
H
Haojun Liao 已提交
2637 2638
  }

H
Haojun Liao 已提交
2639
  SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId};
2640
  tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic));
H
Haojun Liao 已提交
2641

H
Haojun Liao 已提交
2642
  tscDebug("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, pVg->vgId);
2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660

  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
  if (pInfo == NULL) {
    tscError("consumer:0x%"PRIx64" failed to prepare seek operation", tmq->consumerId);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  tsem_init(&pInfo->sem, 0, 0);
  pInfo->code = 0;

  asyncCommitOffset(tmq, &rspObj, TDMT_VND_TMQ_SEEK_TO_OFFSET, commitCallBackFn, pInfo);

  tsem_wait(&pInfo->sem);
  int32_t code = pInfo->code;

  tsem_destroy(&pInfo->sem);
  taosMemoryFree(pInfo);

H
Haojun Liao 已提交
2661 2662 2663 2664 2665 2666
  if (code != TSDB_CODE_SUCCESS) {
    tscError("consumer:0x%" PRIx64 " failed to send seek to vgId:%d, code:%s", tmq->consumerId, pVg->vgId,
             tstrerror(code));
  }

  return code;
2667
}