clientTmq.c 85.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "cJSON.h"
17 18 19
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22
#include "tdef.h"
#include "tglobal.h"
X
Xiaoyu Wang 已提交
23
#include "tqueue.h"
24
#include "tref.h"
L
Liu Jicong 已提交
25 26
#include "ttimer.h"

X
Xiaoyu Wang 已提交
27 28
#define EMPTY_BLOCK_POLL_IDLE_DURATION 10
#define DEFAULT_AUTO_COMMIT_INTERVAL   5000
29

30 31
typedef void (*__tmq_askep_fn_t)(tmq_t* pTmq, int32_t code, SDataBuf* pBuf, void* pParam);

X
Xiaoyu Wang 已提交
32
struct SMqMgmt {
33 34 35
  int8_t  inited;
  tmr_h   timer;
  int32_t rsetId;
36
};
L
Liu Jicong 已提交
37

X
Xiaoyu Wang 已提交
38 39
static TdThreadOnce   tmqInit = PTHREAD_ONCE_INIT;  // initialize only once
volatile int32_t      tmqInitRes = 0;               // initialize rsp code
40
static struct SMqMgmt tmqMgmt = {0};
41

L
Liu Jicong 已提交
42 43 44 45 46 47
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
48 49 50
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
51 52
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
53
struct tmq_list_t {
L
Liu Jicong 已提交
54
  SArray container;
L
Liu Jicong 已提交
55
};
L
Liu Jicong 已提交
56

L
Liu Jicong 已提交
57
struct tmq_conf_t {
58 59 60 61 62 63 64 65
  char           clientId[256];
  char           groupId[TSDB_CGROUP_LEN];
  int8_t         autoCommit;
  int8_t         resetOffset;
  int8_t         withTbName;
  int8_t         snapEnable;
  int32_t        snapBatchSize;
  bool           hbBgEnable;
66 67 68 69 70
  uint16_t       port;
  int32_t        autoCommitInterval;
  char*          ip;
  char*          user;
  char*          pass;
71
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
72
  void*          commitCbUserParam;
L
Liu Jicong 已提交
73 74 75
};

struct tmq_t {
76 77 78 79 80 81 82 83 84 85
  int64_t        refId;
  char           groupId[TSDB_CGROUP_LEN];
  char           clientId[256];
  int8_t         withTbName;
  int8_t         useSnapshot;
  int8_t         autoCommit;
  int32_t        autoCommitInterval;
  int32_t        resetOffsetCfg;
  uint64_t       consumerId;
  bool           hbBgEnable;
L
Liu Jicong 已提交
86 87
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
88 89 90 91

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
92 93
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
94
  int32_t epSkipCnt;
L
Liu Jicong 已提交
95
#endif
96
  // poll info
X
Xiaoyu Wang 已提交
97 98
  int64_t pollCnt;
  int64_t totalRows;
L
Liu Jicong 已提交
99

L
Liu Jicong 已提交
100
  // timer
X
Xiaoyu Wang 已提交
101 102 103 104 105 106 107 108 109 110
  tmr_h       hbLiveTimer;
  tmr_h       epTimer;
  tmr_h       reportTimer;
  tmr_h       commitTimer;
  STscObj*    pTscObj;       // connection
  SArray*     clientTopics;  // SArray<SMqClientTopic>
  STaosQueue* mqueue;        // queue of rsp
  STaosQall*  qall;
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit
  tsem_t      rspSem;
L
Liu Jicong 已提交
111 112
};

113 114
typedef struct SAskEpInfo {
  int32_t code;
H
Haojun Liao 已提交
115
  tsem_t  sem;
116 117
} SAskEpInfo;

X
Xiaoyu Wang 已提交
118 119 120 121 122 123 124 125
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
126
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
127
  TMQ_CONSUMER_STATUS__RECOVER,
L
Liu Jicong 已提交
128 129
};

L
Liu Jicong 已提交
130
enum {
131
  TMQ_DELAYED_TASK__ASK_EP = 1,
L
Liu Jicong 已提交
132 133 134 135
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

H
Haojun Liao 已提交
136
typedef struct SVgOffsetInfo {
L
Liu Jicong 已提交
137 138
  STqOffsetVal committedOffset;
  STqOffsetVal currentOffset;
H
Haojun Liao 已提交
139 140 141 142 143 144 145 146 147 148 149
  int64_t      walVerBegin;
  int64_t      walVerEnd;
} SVgOffsetInfo;

typedef struct {
  int64_t       pollCnt;
  int64_t       numOfRows;
  SVgOffsetInfo offsetInfo;
  int32_t       vgId;
  int32_t       vgStatus;
  int32_t       vgSkipCnt;            // here used to mark the slow vgroups
150
  bool          receivedInfoFromVnode;// has already received info from vnode
H
Haojun Liao 已提交
151
  int64_t       emptyBlockReceiveTs;  // once empty block is received, idle for ignoreCnt then start to poll data
152
  bool          seekUpdated;          // offset is updated by seek operator, therefore, not update by vnode rsp.
H
Haojun Liao 已提交
153
  SEpSet        epSet;
154 155
} SMqClientVg;

L
Liu Jicong 已提交
156
typedef struct {
157 158 159
  char           topicName[TSDB_TOPIC_FNAME_LEN];
  char           db[TSDB_DB_FNAME_LEN];
  SArray*        vgs;  // SArray<SMqClientVg>
L
Liu Jicong 已提交
160
  SSchemaWrapper schema;
161 162
} SMqClientTopic;

L
Liu Jicong 已提交
163 164
typedef struct {
  int8_t          tmqRspType;
165
  int32_t         epoch;  // epoch can be used to guard the vgHandle
166
  int32_t         vgId;
L
Liu Jicong 已提交
167 168
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
H
Haojun Liao 已提交
169
  uint64_t        reqId;
170
  SEpSet*         pEpset;
L
Liu Jicong 已提交
171
  union {
L
Liu Jicong 已提交
172 173
    SMqDataRsp dataRsp;
    SMqMetaRsp metaRsp;
L
Liu Jicong 已提交
174
    STaosxRsp  taosxRsp;
L
Liu Jicong 已提交
175
  };
L
Liu Jicong 已提交
176 177
} SMqPollRspWrapper;

L
Liu Jicong 已提交
178
typedef struct {
179 180
  int64_t refId;
  int32_t epoch;
L
Liu Jicong 已提交
181 182
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
183
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
184

L
Liu Jicong 已提交
185
typedef struct {
186 187 188 189
  int64_t          refId;
  int32_t          epoch;
  void*            pParam;
  __tmq_askep_fn_t pUserFn;
190 191
} SMqAskEpCbParam;

L
Liu Jicong 已提交
192
typedef struct {
193 194
  int64_t         refId;
  int32_t         epoch;
L
Liu Jicong 已提交
195
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
196
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
197
  int32_t         vgId;
X
Xiaoyu Wang 已提交
198
  uint64_t        requestId;  // request id for debug purpose
X
Xiaoyu Wang 已提交
199
} SMqPollCbParam;
200

201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
typedef struct SMqVgCommon {
  tsem_t        rsp;
  int32_t       numOfRsp;
  SArray*       pList;
  TdThreadMutex mutex;
  int64_t       consumerId;
  char*         pTopicName;
  int32_t       code;
} SMqVgCommon;

typedef struct SMqVgWalInfoParam {
  int32_t      vgId;
  int32_t      epoch;
  int32_t      totalReq;
  SMqVgCommon* pCommon;
} SMqVgWalInfoParam;

218
typedef struct {
219 220
  int64_t        refId;
  int32_t        epoch;
L
Liu Jicong 已提交
221 222
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
223
  int32_t        code;
224
  tmq_commit_cb* callbackFn;
L
Liu Jicong 已提交
225 226
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
X
Xiaoyu Wang 已提交
227
  void* userParam;
228 229 230 231
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
232
  SMqVgOffset*         pOffset;
H
Haojun Liao 已提交
233 234 235
  char                 topicName[TSDB_TOPIC_FNAME_LEN];
  int32_t              vgId;
  tmq_t*               pTmq;
236
} SMqCommitCbParam;
237

238 239 240 241 242
typedef struct SSyncCommitInfo {
  tsem_t  sem;
  int32_t code;
} SSyncCommitInfo;

243
static int32_t doAskEp(tmq_t* tmq);
244 245
static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg);
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet);
246
static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet,
247
                               int32_t index, int32_t totalVgroups, int32_t type);
X
Xiaoyu Wang 已提交
248 249 250
static void    commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId);
static void    asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param);
static void    addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param);
251

252
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
253
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
254 255 256 257 258
  if (conf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return conf;
  }

259
  conf->withTbName = false;
L
Liu Jicong 已提交
260
  conf->autoCommit = true;
261
  conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL;
262
  conf->resetOffset = TMQ_OFFSET__RESET_EARLIEAST;
263
  conf->hbBgEnable = true;
264

265 266 267
  return conf;
}

L
Liu Jicong 已提交
268
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
269
  if (conf) {
270 271 272 273 274 275 276 277 278
    if (conf->ip) {
      taosMemoryFree(conf->ip);
    }
    if (conf->user) {
      taosMemoryFree(conf->user);
    }
    if (conf->pass) {
      taosMemoryFree(conf->pass);
    }
L
Liu Jicong 已提交
279 280
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
281 282 283
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
284
  if (strcasecmp(key, "group.id") == 0) {
L
Liu Jicong 已提交
285
    tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN);
L
Liu Jicong 已提交
286
    return TMQ_CONF_OK;
287
  }
L
Liu Jicong 已提交
288

289
  if (strcasecmp(key, "client.id") == 0) {
L
Liu Jicong 已提交
290
    tstrncpy(conf->clientId, value, 256);
L
Liu Jicong 已提交
291 292
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
293

294 295
  if (strcasecmp(key, "enable.auto.commit") == 0) {
    if (strcasecmp(value, "true") == 0) {
L
Liu Jicong 已提交
296
      conf->autoCommit = true;
L
Liu Jicong 已提交
297
      return TMQ_CONF_OK;
298
    } else if (strcasecmp(value, "false") == 0) {
L
Liu Jicong 已提交
299
      conf->autoCommit = false;
L
Liu Jicong 已提交
300 301 302 303
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
304
  }
L
Liu Jicong 已提交
305

306
  if (strcasecmp(key, "auto.commit.interval.ms") == 0) {
307
    conf->autoCommitInterval = taosStr2int64(value);
L
Liu Jicong 已提交
308 309 310
    return TMQ_CONF_OK;
  }

311 312 313
  if (strcasecmp(key, "auto.offset.reset") == 0) {
    if (strcasecmp(value, "none") == 0) {
      conf->resetOffset = TMQ_OFFSET__RESET_NONE;
L
Liu Jicong 已提交
314
      return TMQ_CONF_OK;
315 316
    } else if (strcasecmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_OFFSET__RESET_EARLIEAST;
L
Liu Jicong 已提交
317
      return TMQ_CONF_OK;
318 319
    } else if (strcasecmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
L
Liu Jicong 已提交
320 321 322 323 324
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
325

326 327
  if (strcasecmp(key, "msg.with.table.name") == 0) {
    if (strcasecmp(value, "true") == 0) {
328
      conf->withTbName = true;
L
Liu Jicong 已提交
329
      return TMQ_CONF_OK;
330
    } else if (strcasecmp(value, "false") == 0) {
331
      conf->withTbName = false;
L
Liu Jicong 已提交
332
      return TMQ_CONF_OK;
333 334 335 336 337
    } else {
      return TMQ_CONF_INVALID;
    }
  }

338 339
  if (strcasecmp(key, "experimental.snapshot.enable") == 0) {
    if (strcasecmp(value, "true") == 0) {
L
Liu Jicong 已提交
340
      conf->snapEnable = true;
341
      return TMQ_CONF_OK;
342
    } else if (strcasecmp(value, "false") == 0) {
L
Liu Jicong 已提交
343
      conf->snapEnable = false;
344 345 346 347 348 349
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

350
  if (strcasecmp(key, "experimental.snapshot.batch.size") == 0) {
351
    conf->snapBatchSize = taosStr2int64(value);
L
Liu Jicong 已提交
352 353 354
    return TMQ_CONF_OK;
  }

355
  if (strcasecmp(key, "enable.heartbeat.background") == 0) {
X
Xiaoyu Wang 已提交
356 357 358 359 360 361 362 363 364 365
    //    if (strcasecmp(value, "true") == 0) {
    //      conf->hbBgEnable = true;
    //      return TMQ_CONF_OK;
    //    } else if (strcasecmp(value, "false") == 0) {
    //      conf->hbBgEnable = false;
    //      return TMQ_CONF_OK;
    //    } else {
    tscError("the default value of enable.heartbeat.background is true, can not be seted");
    return TMQ_CONF_INVALID;
    //    }
L
Liu Jicong 已提交
366 367
  }

368
  if (strcasecmp(key, "td.connect.ip") == 0) {
369
    conf->ip = taosStrdup(value);
L
Liu Jicong 已提交
370 371
    return TMQ_CONF_OK;
  }
372

373
  if (strcasecmp(key, "td.connect.user") == 0) {
374
    conf->user = taosStrdup(value);
L
Liu Jicong 已提交
375 376
    return TMQ_CONF_OK;
  }
377

378
  if (strcasecmp(key, "td.connect.pass") == 0) {
379
    conf->pass = taosStrdup(value);
L
Liu Jicong 已提交
380 381
    return TMQ_CONF_OK;
  }
382

383
  if (strcasecmp(key, "td.connect.port") == 0) {
384
    conf->port = taosStr2int64(value);
L
Liu Jicong 已提交
385 386
    return TMQ_CONF_OK;
  }
387

388
  if (strcasecmp(key, "td.connect.db") == 0) {
L
Liu Jicong 已提交
389 390 391
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
392
  return TMQ_CONF_UNKNOWN;
393 394
}

X
Xiaoyu Wang 已提交
395
tmq_list_t* tmq_list_new() { return (tmq_list_t*)taosArrayInit(0, sizeof(void*)); }
396

L
Liu Jicong 已提交
397 398
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
399
  if (src == NULL || src[0] == 0) return -1;
400
  char* topic = taosStrdup(src);
L
fix  
Liu Jicong 已提交
401
  if (taosArrayPush(container, &topic) == NULL) return -1;
402 403 404
  return 0;
}

L
Liu Jicong 已提交
405
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
406
  SArray* container = &list->container;
L
Liu Jicong 已提交
407
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
408 409
}

L
Liu Jicong 已提交
410 411 412 413 414 415 416 417 418 419
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

X
Xiaoyu Wang 已提交
420 421
static SMqClientVg* foundClientVg(SArray* pTopicList, const char* pName, int32_t vgId, int32_t* index,
                                  int32_t* numOfVgroups) {
422 423 424 425
  int32_t numOfTopics = taosArrayGetSize(pTopicList);
  *index = -1;
  *numOfVgroups = 0;

X
Xiaoyu Wang 已提交
426
  for (int32_t i = 0; i < numOfTopics; ++i) {
427 428
    SMqClientTopic* pTopic = taosArrayGet(pTopicList, i);
    if (strcmp(pTopic->topicName, pName) != 0) {
429 430 431
      continue;
    }

432 433
    *numOfVgroups = taosArrayGetSize(pTopic->vgs);
    for (int32_t j = 0; j < (*numOfVgroups); ++j) {
434
      SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
435 436 437
      if (pClientVg->vgId == vgId) {
        *index = j;
        return pClientVg;
438 439
      }
    }
L
Liu Jicong 已提交
440
  }
441 442

  return NULL;
L
Liu Jicong 已提交
443
}
444

445 446 447
// Two problems do not need to be addressed here
// 1. update to of epset. the response of poll request will automatically handle this problem
// 2. commit failure. This one needs to be resolved.
H
Haojun Liao 已提交
448
static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
449
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
450
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
H
Haojun Liao 已提交
451

X
Xiaoyu Wang 已提交
452 453 454 455 456 457 458 459 460 461 462 463 464 465
  //  if (code != TSDB_CODE_SUCCESS) { // if commit offset failed, let's try again
  //    taosThreadMutexLock(&pParam->pTmq->lock);
  //    int32_t numOfVgroups, index;
  //    SMqClientVg* pVg = foundClientVg(pParam->pTmq->clientTopics, pParam->topicName, pParam->vgId, &index,
  //    &numOfVgroups); if (pVg == NULL) {
  //      tscDebug("consumer:0x%" PRIx64
  //               " subKey:%s vgId:%d commit failed, code:%s has been transferred to other consumer, no need retry
  //               ordinal:%d/%d", pParam->pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, tstrerror(code),
  //               index + 1, numOfVgroups);
  //    } else { // let's retry the commit
  //      int32_t code1 = doSendCommitMsg(pParam->pTmq, pVg, pParam->topicName, pParamSet, index, numOfVgroups);
  //      if (code1 != TSDB_CODE_SUCCESS) {  // retry failed.
  //        tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64
  //                 " retry failed, ignore this commit. code:%s ordinal:%d/%d",
H
Haojun Liao 已提交
466
  //                 pParam->pTmq->consumerId, pParam->topicName, pVg->vgId, pVg->offsetInfo.committedOffset.version,
X
Xiaoyu Wang 已提交
467 468 469 470 471 472 473 474 475 476 477 478 479 480 481
  //                 tstrerror(terrno), index + 1, numOfVgroups);
  //      }
  //    }
  //
  //    taosThreadMutexUnlock(&pParam->pTmq->lock);
  //
  //    taosMemoryFree(pParam->pOffset);
  //    taosMemoryFree(pBuf->pData);
  //    taosMemoryFree(pBuf->pEpSet);
  //
  //    commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
  //    return 0;
  //  }
  //
  //  // todo replace the pTmq with refId
482

L
Liu Jicong 已提交
483
  taosMemoryFree(pParam->pOffset);
L
Liu Jicong 已提交
484
  taosMemoryFree(pBuf->pData);
dengyihao's avatar
dengyihao 已提交
485
  taosMemoryFree(pBuf->pEpSet);
L
Liu Jicong 已提交
486

487
  commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
488 489 490
  return 0;
}

491
static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet,
492
                               int32_t index, int32_t totalVgroups, int32_t type) {
493
  SMqVgOffset* pOffset = taosMemoryCalloc(1, sizeof(SMqVgOffset));
L
Liu Jicong 已提交
494
  if (pOffset == NULL) {
495
    return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
496
  }
497

498 499
  pOffset->consumerId = tmq->consumerId;
  pOffset->offset.val = pVg->offsetInfo.currentOffset;
500

L
Liu Jicong 已提交
501
  int32_t groupLen = strlen(tmq->groupId);
502 503 504
  memcpy(pOffset->offset.subKey, tmq->groupId, groupLen);
  pOffset->offset.subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pOffset->offset.subKey + groupLen + 1, pTopicName);
L
Liu Jicong 已提交
505

506 507
  int32_t len = 0;
  int32_t code = 0;
508
  tEncodeSize(tEncodeMqVgOffset, pOffset, len, code);
L
Liu Jicong 已提交
509
  if (code < 0) {
510
    return TSDB_CODE_INVALID_PARA;
L
Liu Jicong 已提交
511
  }
512

L
Liu Jicong 已提交
513
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
L
Liu Jicong 已提交
514 515
  if (buf == NULL) {
    taosMemoryFree(pOffset);
516
    return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
517
  }
518

L
Liu Jicong 已提交
519
  ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
520

L
Liu Jicong 已提交
521 522 523 524
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
525
  tEncodeMqVgOffset(&encoder, pOffset);
L
Liu Jicong 已提交
526
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
527 528

  // build param
529
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
530
  if (pParam == NULL) {
L
Liu Jicong 已提交
531
    taosMemoryFree(pOffset);
L
Liu Jicong 已提交
532
    taosMemoryFree(buf);
533
    return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
534
  }
535

L
Liu Jicong 已提交
536 537
  pParam->params = pParamSet;
  pParam->pOffset = pOffset;
H
Haojun Liao 已提交
538 539 540
  pParam->vgId = pVg->vgId;
  pParam->pTmq = tmq;

H
Haojun Liao 已提交
541
  tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName));
L
Liu Jicong 已提交
542 543 544 545

  // build send info
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
L
Liu Jicong 已提交
546
    taosMemoryFree(pOffset);
L
Liu Jicong 已提交
547 548
    taosMemoryFree(buf);
    taosMemoryFree(pParam);
549
    return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
550
  }
551

552
  pMsgSendInfo->msgInfo = (SDataBuf) { .pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL };
L
Liu Jicong 已提交
553 554 555 556

  pMsgSendInfo->requestId = generateRequestId();
  pMsgSendInfo->requestObjRefId = 0;
  pMsgSendInfo->param = pParam;
L
Liu Jicong 已提交
557
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
558
  pMsgSendInfo->fp = tmqCommitCb;
559
  pMsgSendInfo->msgType = type;
L
Liu Jicong 已提交
560

L
Liu Jicong 已提交
561 562 563
  atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
  atomic_add_fetch_32(&pParamSet->totalRspNum, 1);

H
Haojun Liao 已提交
564
  SEp* pEp = GET_ACTIVE_EP(&pVg->epSet);
565
  char offsetBuf[80] = {0};
566
  tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffset->offset.val);
567 568

  char commitBuf[80] = {0};
H
Haojun Liao 已提交
569
  tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
570
  tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%s prev:%s, ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64,
571
           tmq->consumerId, pOffset->offset.subKey, pVg->vgId, offsetBuf, commitBuf, pEp->fqdn, pEp->port, index + 1,
572
           totalVgroups, pMsgSendInfo->requestId);
H
Haojun Liao 已提交
573

L
Liu Jicong 已提交
574 575
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
576 577

  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
578 579
}

H
Haojun Liao 已提交
580 581 582 583 584 585 586 587 588 589 590
static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) {
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < numOfTopics; ++i) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(pTopic->topicName, pTopicName) != 0) {
      continue;
    }

    return pTopic;
  }

H
Haojun Liao 已提交
591
  tscError("consumer:0x%" PRIx64 ", total:%d, failed to find topic:%s", tmq->consumerId, numOfTopics, pTopicName);
H
Haojun Liao 已提交
592 593 594
  return NULL;
}

595
static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tmq_commit_cb* pCommitFp, void* userParam) {
596 597 598 599 600 601 602 603 604 605 606 607
  char*   pTopicName = NULL;
  int32_t vgId = 0;
  int32_t code = 0;

  if (pRes == NULL || tmq == NULL) {
    pCommitFp(tmq, TSDB_CODE_INVALID_PARA, userParam);
    return;
  }

  if (TD_RES_TMQ(pRes)) {
    SMqRspObj* pRspObj = (SMqRspObj*)pRes;
    pTopicName = pRspObj->topic;
L
Liu Jicong 已提交
608
    vgId = pRspObj->vgId;
609 610 611
  } else if (TD_RES_TMQ_META(pRes)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)pRes;
    pTopicName = pMetaRspObj->topic;
L
Liu Jicong 已提交
612
    vgId = pMetaRspObj->vgId;
613 614 615
  } else if (TD_RES_TMQ_METADATA(pRes)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)pRes;
    pTopicName = pRspObj->topic;
616
    vgId = pRspObj->vgId;
L
Liu Jicong 已提交
617
  } else {
618 619
    pCommitFp(tmq, TSDB_CODE_TMQ_INVALID_MSG, userParam);
    return;
L
Liu Jicong 已提交
620 621 622 623
  }

  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
624 625
    pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam);
    return;
L
Liu Jicong 已提交
626
  }
H
Haojun Liao 已提交
627

628 629
  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;
630
  pParamSet->callbackFn = pCommitFp;
L
Liu Jicong 已提交
631
  pParamSet->userParam = userParam;
L
Liu Jicong 已提交
632

H
Haojun Liao 已提交
633 634
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);

635 636
  tscDebug("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId);

H
Haojun Liao 已提交
637 638
  SMqClientTopic* pTopic = getTopicByName(tmq, pTopicName);
  if (pTopic == NULL) {
X
Xiaoyu Wang 已提交
639 640
    tscWarn("consumer:0x%" PRIx64 " failed to find the specified topic:%s, total topics:%d", tmq->consumerId,
            pTopicName, numOfTopics);
641 642 643 644
    taosMemoryFree(pParamSet);
    pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
    return;
  }
L
Liu Jicong 已提交
645

646 647 648 649 650 651
  int32_t j = 0;
  int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
  for (j = 0; j < numOfVgroups; j++) {
    SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
    if (pVg->vgId == vgId) {
      break;
L
Liu Jicong 已提交
652
    }
L
Liu Jicong 已提交
653
  }
L
Liu Jicong 已提交
654

655
  if (j == numOfVgroups) {
X
Xiaoyu Wang 已提交
656 657
    tscWarn("consumer:0x%" PRIx64 " failed to find the specified vgId:%d, total Vgs:%d, topic:%s", tmq->consumerId,
            vgId, numOfVgroups, pTopicName);
L
Liu Jicong 已提交
658
    taosMemoryFree(pParamSet);
659 660
    pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
    return;
L
Liu Jicong 已提交
661 662
  }

663
  SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
H
Haojun Liao 已提交
664
  if (pVg->offsetInfo.currentOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.currentOffset, &pVg->offsetInfo.committedOffset)) {
665
    code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups, type);
L
Liu Jicong 已提交
666

667 668 669 670 671
    // failed to commit, callback user function directly.
    if (code != TSDB_CODE_SUCCESS) {
      taosMemoryFree(pParamSet);
      pCommitFp(tmq, code, userParam);
    }
X
Xiaoyu Wang 已提交
672
  } else {  // do not perform commit, callback user function directly.
673 674
    taosMemoryFree(pParamSet);
    pCommitFp(tmq, code, userParam);
L
Liu Jicong 已提交
675 676 677
  }
}

678
static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) {
679 680
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
681 682
    pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam);
    return;
683
  }
684 685 686

  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;
687
  pParamSet->callbackFn = pCommitFp;
688 689
  pParamSet->userParam = userParam;

690 691 692
  // init as 1 to prevent concurrency issue
  pParamSet->waitingRspNum = 1;

693
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
X
Xiaoyu Wang 已提交
694
  tscDebug("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
695 696

  for (int32_t i = 0; i < numOfTopics; i++) {
697
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
698
    int32_t         numOfVgroups = taosArrayGetSize(pTopic->vgs);
L
Liu Jicong 已提交
699

700 701
    tscDebug("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName,
             numOfVgroups);
702
    for (int32_t j = 0; j < numOfVgroups; j++) {
703 704
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);

H
Haojun Liao 已提交
705
      if (pVg->offsetInfo.currentOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.currentOffset, &pVg->offsetInfo.committedOffset)) {
706
        int32_t code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups, TDMT_VND_TMQ_COMMIT_OFFSET);
707 708
        if (code != TSDB_CODE_SUCCESS) {
          tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64 " failed, code:%s ordinal:%d/%d",
H
Haojun Liao 已提交
709
                   tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.committedOffset.version, tstrerror(terrno),
710
                   j + 1, numOfVgroups);
L
Liu Jicong 已提交
711 712
          continue;
        }
H
Haojun Liao 已提交
713 714

        // update the offset value.
H
Haojun Liao 已提交
715
        pVg->offsetInfo.committedOffset = pVg->offsetInfo.currentOffset;
716
      } else {
D
dapan1121 已提交
717
        tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d",
H
Haojun Liao 已提交
718
                 tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.currentOffset.version, j + 1, numOfVgroups);
719 720 721 722
      }
    }
  }

H
Haojun Liao 已提交
723
  tscDebug("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1,
724
           numOfTopics);
H
Haojun Liao 已提交
725

L
Liu Jicong 已提交
726
  // no request is sent
L
Liu Jicong 已提交
727 728
  if (pParamSet->totalRspNum == 0) {
    taosMemoryFree(pParamSet);
729 730
    pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
    return;
L
Liu Jicong 已提交
731 732
  }

L
Liu Jicong 已提交
733
  // count down since waiting rsp num init as 1
734
  commitRspCountDown(pParamSet, tmq->consumerId, "", 0);
735 736
}

737 738
static void generateTimedTask(int64_t refId, int32_t type) {
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
739
  if (tmq != NULL) {
S
Shengliang Guan 已提交
740
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
741
    *pTaskType = type;
742 743 744
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
745
  taosReleaseRef(tmqMgmt.rsetId, refId);
746 747 748 749 750
}

void tmqAssignAskEpTask(void* param, void* tmrId) {
  int64_t refId = *(int64_t*)param;
  generateTimedTask(refId, TMQ_DELAYED_TASK__ASK_EP);
751
  taosMemoryFree(param);
L
Liu Jicong 已提交
752 753 754
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
755
  int64_t refId = *(int64_t*)param;
756
  generateTimedTask(refId, TMQ_DELAYED_TASK__COMMIT);
757
  taosMemoryFree(param);
L
Liu Jicong 已提交
758 759 760
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
761 762 763
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
S
Shengliang Guan 已提交
764
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
765 766 767 768
    *pTaskType = TMQ_DELAYED_TASK__REPORT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
769 770

  taosReleaseRef(tmqMgmt.rsetId, refId);
771
  taosMemoryFree(param);
L
Liu Jicong 已提交
772 773
}

774
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
dengyihao's avatar
dengyihao 已提交
775 776 777 778
  if (pMsg) {
    taosMemoryFree(pMsg->pData);
    taosMemoryFree(pMsg->pEpSet);
  }
779 780 781 782
  return 0;
}

void tmqSendHbReq(void* param, void* tmrId) {
783
  int64_t refId = *(int64_t*)param;
784

X
Xiaoyu Wang 已提交
785
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
786
  if (tmq == NULL) {
L
Liu Jicong 已提交
787
    taosMemoryFree(param);
788 789
    return;
  }
D
dapan1121 已提交
790 791 792 793 794

  SMqHbReq req = {0};
  req.consumerId = tmq->consumerId;
  req.epoch = tmq->epoch;

L
Liu Jicong 已提交
795
  int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
D
dapan1121 已提交
796 797
  if (tlen < 0) {
    tscError("tSerializeSMqHbReq failed");
798
    goto OVER;
D
dapan1121 已提交
799
  }
800

L
Liu Jicong 已提交
801
  void* pReq = taosMemoryCalloc(1, tlen);
D
dapan1121 已提交
802 803
  if (tlen < 0) {
    tscError("failed to malloc MqHbReq msg, size:%d", tlen);
804
    goto OVER;
D
dapan1121 已提交
805
  }
806

D
dapan1121 已提交
807 808 809
  if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) {
    tscError("tSerializeSMqHbReq %d failed", tlen);
    taosMemoryFree(pReq);
810
    goto OVER;
D
dapan1121 已提交
811
  }
812 813 814 815

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
816
    goto OVER;
817
  }
818

819
  sendInfo->msgInfo = (SDataBuf){ .pData = pReq, .len = tlen, .handle = NULL };
820 821 822 823 824

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = NULL;
  sendInfo->fp = tmqHbCb;
L
Liu Jicong 已提交
825
  sendInfo->msgType = TDMT_MND_TMQ_HB;
826 827 828 829 830 831 832

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

OVER:
833
  taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer);
834
  taosReleaseRef(tmqMgmt.rsetId, refId);
835 836
}

837 838
static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) {
  if (code != 0) {
X
Xiaoyu Wang 已提交
839
    tscDebug("consumer:0x%" PRIx64 ", failed to commit offset, code:%s", pTmq->consumerId, tstrerror(code));
840 841 842
  }
}

843
int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
L
Liu Jicong 已提交
844
  STaosQall* qall = taosAllocateQall();
845
  taosReadAllQitems(pTmq->delayedTask, qall);
L
Liu Jicong 已提交
846

847 848 849 850
  if (qall->numOfItems == 0) {
    taosFreeQall(qall);
    return TSDB_CODE_SUCCESS;
  }
851

X
Xiaoyu Wang 已提交
852
  tscDebug("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, qall->numOfItems);
853 854
  int8_t* pTaskType = NULL;
  taosGetQitem(qall, (void**)&pTaskType);
L
Liu Jicong 已提交
855

856
  while (pTaskType != NULL) {
857
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
858
      asyncAskEp(pTmq, addToQueueCallbackFn, NULL);
859 860

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
861
      *pRefId = pTmq->refId;
862

X
Xiaoyu Wang 已提交
863
      tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
864
      taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer);
L
Liu Jicong 已提交
865
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
X
Xiaoyu Wang 已提交
866
      tmq_commit_cb* pCallbackFn = pTmq->commitCb ? pTmq->commitCb : defaultCommitCbFn;
867 868

      asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam);
869
      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
870
      *pRefId = pTmq->refId;
871

872
      tscDebug("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId,
X
Xiaoyu Wang 已提交
873
               pTmq->autoCommitInterval / 1000.0);
874
      taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer);
L
Liu Jicong 已提交
875 876
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    }
877

L
Liu Jicong 已提交
878
    taosFreeQitem(pTaskType);
879
    taosGetQitem(qall, (void**)&pTaskType);
L
Liu Jicong 已提交
880
  }
881

L
Liu Jicong 已提交
882 883 884 885
  taosFreeQall(qall);
  return 0;
}

886
static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
L
Liu Jicong 已提交
887 888 889 890 891 892 893
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
    // do nothing
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
    SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
    tDeleteSMqAskEpRsp(&pEpRspWrapper->msg);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
894 895
    taosMemoryFreeClear(pRsp->pEpset);

L
Liu Jicong 已提交
896 897 898
    taosArrayDestroyP(pRsp->dataRsp.blockData, taosMemoryFree);
    taosArrayDestroy(pRsp->dataRsp.blockDataLen);
    taosArrayDestroyP(pRsp->dataRsp.blockTbName, taosMemoryFree);
899
    taosArrayDestroyP(pRsp->dataRsp.blockSchema, (FDelete)tDeleteSchemaWrapper);
L
Liu Jicong 已提交
900 901
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
902 903
    taosMemoryFreeClear(pRsp->pEpset);

L
Liu Jicong 已提交
904 905 906
    taosMemoryFree(pRsp->metaRsp.metaRsp);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
907 908
    taosMemoryFreeClear(pRsp->pEpset);

L
Liu Jicong 已提交
909 910 911
    taosArrayDestroyP(pRsp->taosxRsp.blockData, taosMemoryFree);
    taosArrayDestroy(pRsp->taosxRsp.blockDataLen);
    taosArrayDestroyP(pRsp->taosxRsp.blockTbName, taosMemoryFree);
912
    taosArrayDestroyP(pRsp->taosxRsp.blockSchema, (FDelete)tDeleteSchemaWrapper);
L
Liu Jicong 已提交
913 914 915 916
    // taosx
    taosArrayDestroy(pRsp->taosxRsp.createTableLen);
    taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree);
  }
917 918

  return NULL;
L
Liu Jicong 已提交
919 920
}

L
Liu Jicong 已提交
921
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
922
  SMqRspWrapper* rspWrapper = NULL;
L
Liu Jicong 已提交
923
  while (1) {
L
Liu Jicong 已提交
924 925 926 927 928
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper) {
      tmqFreeRspWrapper(rspWrapper);
      taosFreeQitem(rspWrapper);
    } else {
L
Liu Jicong 已提交
929
      break;
L
Liu Jicong 已提交
930
    }
L
Liu Jicong 已提交
931 932
  }

L
Liu Jicong 已提交
933
  rspWrapper = NULL;
L
Liu Jicong 已提交
934 935
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
L
Liu Jicong 已提交
936 937 938 939 940
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper) {
      tmqFreeRspWrapper(rspWrapper);
      taosFreeQitem(rspWrapper);
    } else {
L
Liu Jicong 已提交
941
      break;
L
Liu Jicong 已提交
942
    }
L
Liu Jicong 已提交
943 944 945
  }
}

D
dapan1121 已提交
946
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
947 948
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
dengyihao's avatar
dengyihao 已提交
949 950

  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
951 952 953
  tsem_post(&pParam->rspSem);
  return 0;
}
954

L
Liu Jicong 已提交
955
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
X
Xiaoyu Wang 已提交
956 957 958 959
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
960
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
961
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
962
  }
L
Liu Jicong 已提交
963
  return 0;
X
Xiaoyu Wang 已提交
964 965
}

L
Liu Jicong 已提交
966
int32_t tmq_unsubscribe(tmq_t* tmq) {
L
Liu Jicong 已提交
967 968
  int32_t     rsp;
  int32_t     retryCnt = 0;
L
Liu Jicong 已提交
969
  tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
970 971 972 973 974 975 976 977 978 979
  while (1) {
    rsp = tmq_subscribe(tmq, lst);
    if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
      break;
    } else {
      retryCnt++;
      taosMsleep(500);
    }
  }

L
Liu Jicong 已提交
980 981
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
982 983
}

984 985 986 987 988 989
static void freeClientVgImpl(void* param) {
  SMqClientTopic* pTopic = param;
  taosMemoryFreeClear(pTopic->schema.pSchema);
  taosArrayDestroy(pTopic->vgs);
}

990
void tmqFreeImpl(void* handle) {
991 992
  tmq_t*  tmq = (tmq_t*)handle;
  int64_t id = tmq->consumerId;
L
Liu Jicong 已提交
993

994
  // TODO stop timer
L
Liu Jicong 已提交
995 996 997 998
  if (tmq->mqueue) {
    tmqClearUnhandleMsg(tmq);
    taosCloseQueue(tmq->mqueue);
  }
L
Liu Jicong 已提交
999

H
Haojun Liao 已提交
1000 1001 1002 1003 1004
  if (tmq->delayedTask) {
    taosCloseQueue(tmq->delayedTask);
  }

  taosFreeQall(tmq->qall);
1005
  tsem_destroy(&tmq->rspSem);
L
Liu Jicong 已提交
1006

1007
  taosArrayDestroyEx(tmq->clientTopics, freeClientVgImpl);
1008 1009
  taos_close_internal(tmq->pTscObj);
  taosMemoryFree(tmq);
1010 1011

  tscDebug("consumer:0x%" PRIx64 " closed", id);
L
Liu Jicong 已提交
1012 1013
}

1014 1015 1016 1017 1018 1019 1020 1021 1022
static void tmqMgmtInit(void) {
  tmqInitRes = 0;
  tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");

  if (tmqMgmt.timer == NULL) {
    tmqInitRes = TSDB_CODE_OUT_OF_MEMORY;
  }

  tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
1023
  if (tmqMgmt.rsetId < 0) {
1024 1025 1026 1027
    tmqInitRes = terrno;
  }
}

L
Liu Jicong 已提交
1028
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
1029 1030 1031 1032
  taosThreadOnce(&tmqInit, tmqMgmtInit);
  if (tmqInitRes != 0) {
    terrno = tmqInitRes;
    return NULL;
L
Liu Jicong 已提交
1033 1034
  }

L
Liu Jicong 已提交
1035 1036
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
L
Liu Jicong 已提交
1037
    terrno = TSDB_CODE_OUT_OF_MEMORY;
1038
    tscError("failed to create consumer, groupId:%s, code:%s", conf->groupId, terrstr());
L
Liu Jicong 已提交
1039 1040
    return NULL;
  }
L
Liu Jicong 已提交
1041

L
Liu Jicong 已提交
1042 1043 1044
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

L
Liu Jicong 已提交
1045 1046 1047
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->delayedTask = taosOpenQueue();
H
Haojun Liao 已提交
1048
  pTmq->qall = taosAllocateQall();
L
Liu Jicong 已提交
1049

X
Xiaoyu Wang 已提交
1050 1051
  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL ||
      conf->groupId[0] == 0) {
L
Liu Jicong 已提交
1052
    terrno = TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
1053
    tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
1054
    goto _failed;
L
Liu Jicong 已提交
1055
  }
L
Liu Jicong 已提交
1056

L
Liu Jicong 已提交
1057 1058
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
1059 1060
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
1061

L
Liu Jicong 已提交
1062 1063 1064
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
1065
  pTmq->withTbName = conf->withTbName;
L
Liu Jicong 已提交
1066
  pTmq->useSnapshot = conf->snapEnable;
L
Liu Jicong 已提交
1067
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
1068
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
1069 1070
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
1071 1072
  pTmq->resetOffsetCfg = conf->resetOffset;

1073 1074
  pTmq->hbBgEnable = conf->hbBgEnable;

L
Liu Jicong 已提交
1075
  // assign consumerId
L
Liu Jicong 已提交
1076
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
1077

L
Liu Jicong 已提交
1078 1079
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
1080
    tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
S
Shengliang Guan 已提交
1081
             pTmq->groupId);
1082
    goto _failed;
L
Liu Jicong 已提交
1083
  }
L
Liu Jicong 已提交
1084

L
Liu Jicong 已提交
1085 1086 1087
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
1088
    tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
L
Liu Jicong 已提交
1089
    tsem_destroy(&pTmq->rspSem);
1090
    goto _failed;
L
Liu Jicong 已提交
1091
  }
L
Liu Jicong 已提交
1092

1093 1094
  pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
  if (pTmq->refId < 0) {
1095
    goto _failed;
1096 1097
  }

1098
  if (pTmq->hbBgEnable) {
L
Liu Jicong 已提交
1099 1100
    int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
    *pRefId = pTmq->refId;
1101
    pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
1102 1103
  }

1104 1105 1106
  char         buf[80] = {0};
  STqOffsetVal offset = {.type = pTmq->resetOffsetCfg};
  tFormatOffset(buf, tListLen(buf), &offset);
X
Xiaoyu Wang 已提交
1107 1108 1109 1110
  tscInfo("consumer:0x%" PRIx64 " is setup, refId:%" PRId64
          ", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s, backgroudHB:%d",
          pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval,
          buf, pTmq->hbBgEnable);
L
Liu Jicong 已提交
1111

1112
  return pTmq;
1113

1114 1115
_failed:
  tmqFreeImpl(pTmq);
L
Liu Jicong 已提交
1116
  return NULL;
1117 1118
}

L
Liu Jicong 已提交
1119
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
1120
  const int32_t   MAX_RETRY_COUNT = 120 * 4;  // let's wait for 4 mins at most
L
Liu Jicong 已提交
1121 1122 1123
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
L
Liu Jicong 已提交
1124
  SMsgSendInfo*   sendInfo = NULL;
L
Liu Jicong 已提交
1125
  SCMSubscribeReq req = {0};
1126
  int32_t         code = 0;
1127

1128
  tscDebug("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz);
L
Liu Jicong 已提交
1129

1130
  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
1131
  tstrncpy(req.clientId, tmq->clientId, 256);
L
Liu Jicong 已提交
1132
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
1133 1134
  req.topicNames = taosArrayInit(sz, sizeof(void*));

1135 1136 1137 1138
  if (req.topicNames == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
1139

L
Liu Jicong 已提交
1140 1141
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
1142 1143

    SName name = {0};
L
Liu Jicong 已提交
1144 1145 1146 1147
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
1148 1149
    }

1150
    tNameExtractFullName(&name, topicFName);
X
Xiaoyu Wang 已提交
1151
    tscDebug("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName);
L
Liu Jicong 已提交
1152 1153

    taosArrayPush(req.topicNames, &topicFName);
1154 1155
  }

L
Liu Jicong 已提交
1156
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
1157

L
Liu Jicong 已提交
1158
  buf = taosMemoryMalloc(tlen);
1159 1160 1161 1162
  if (buf == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
L
Liu Jicong 已提交
1163

1164 1165 1166
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

L
Liu Jicong 已提交
1167
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
1168 1169 1170 1171
  if (sendInfo == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
1172

1173
  SMqSubscribeCbParam param = { .rspErr = 0, .refId = tmq->refId, .epoch = tmq->epoch };
1174
  if (tsem_init(&param.rspSem, 0, 0) != 0) {
wmmhello's avatar
wmmhello 已提交
1175
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
1176 1177
    goto FAIL;
  }
L
Liu Jicong 已提交
1178

1179
  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
1180

L
Liu Jicong 已提交
1181 1182
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1183 1184
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
1185
  sendInfo->msgType = TDMT_MND_TMQ_SUBSCRIBE;
L
Liu Jicong 已提交
1186

1187 1188 1189 1190 1191
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
1192 1193
  // avoid double free if msg is sent
  buf = NULL;
L
Liu Jicong 已提交
1194
  sendInfo = NULL;
L
Liu Jicong 已提交
1195

L
Liu Jicong 已提交
1196 1197
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
1198

1199 1200 1201 1202
  if (param.rspErr != 0) {
    code = param.rspErr;
    goto FAIL;
  }
L
Liu Jicong 已提交
1203

L
Liu Jicong 已提交
1204
  int32_t retryCnt = 0;
1205
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) {
1206
    if (retryCnt++ > MAX_RETRY_COUNT) {
1207
      tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, max retry reached:%d", tmq->consumerId, retryCnt);
wmmhello's avatar
wmmhello 已提交
1208
      code = TSDB_CODE_TSC_INTERNAL_ERROR;
L
Liu Jicong 已提交
1209 1210
      goto FAIL;
    }
1211

X
Xiaoyu Wang 已提交
1212
    tscDebug("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
L
Liu Jicong 已提交
1213 1214
    taosMsleep(500);
  }
1215

1216 1217
  // init ep timer
  if (tmq->epTimer == NULL) {
1218 1219 1220
    int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId1 = tmq->refId;
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, pRefId1, tmqMgmt.timer);
1221
  }
L
Liu Jicong 已提交
1222 1223

  // init auto commit timer
1224
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
1225 1226 1227
    int64_t* pRefId2 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId2 = tmq->refId;
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId2, tmqMgmt.timer);
L
Liu Jicong 已提交
1228 1229
  }

L
Liu Jicong 已提交
1230
FAIL:
L
Liu Jicong 已提交
1231
  taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
1232
  taosMemoryFree(buf);
L
Liu Jicong 已提交
1233
  taosMemoryFree(sendInfo);
L
Liu Jicong 已提交
1234

L
Liu Jicong 已提交
1235
  return code;
1236 1237
}

L
Liu Jicong 已提交
1238
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
1239
  conf->commitCb = cb;
L
Liu Jicong 已提交
1240
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
1241
}
1242

1243
static int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1244
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
1245 1246

  int64_t         refId = pParam->refId;
X
Xiaoyu Wang 已提交
1247
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
1248
  SMqClientTopic* pTopic = pParam->pTopic;
1249

1250
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
1251 1252 1253
  if (tmq == NULL) {
    taosMemoryFree(pParam);
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1254
    taosMemoryFree(pMsg->pEpSet);
1255 1256 1257 1258
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

H
Haojun Liao 已提交
1259 1260 1261 1262
  int32_t  epoch = pParam->epoch;
  int32_t  vgId = pParam->vgId;
  uint64_t requestId = pParam->requestId;

L
Liu Jicong 已提交
1263
  taosMemoryFree(pParam);
H
Haojun Liao 已提交
1264

L
Liu Jicong 已提交
1265
  if (code != 0) {
L
Liu Jicong 已提交
1266
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1267 1268
    if (pMsg->pEpSet) taosMemoryFree(pMsg->pEpSet);

H
Haojun Liao 已提交
1269
    // in case of consumer mismatch, wait for 500ms and retry
L
Liu Jicong 已提交
1270
    if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
1271
      taosMsleep(500);
L
Liu Jicong 已提交
1272
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
X
Xiaoyu Wang 已提交
1273 1274
      tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, wait for 500ms and set status to be RECOVER",
               tmq->consumerId);
H
Haojun Liao 已提交
1275
    } else if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
S
Shengliang Guan 已提交
1276
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1277
      if (pRspWrapper == NULL) {
H
Haojun Liao 已提交
1278 1279
        tscWarn("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d since out of memory, reqId:0x%" PRIx64,
                tmq->consumerId, vgId, epoch, requestId);
L
Liu Jicong 已提交
1280 1281
        goto CREATE_MSG_FAIL;
      }
H
Haojun Liao 已提交
1282

L
Liu Jicong 已提交
1283 1284
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      taosWriteQitem(tmq->mqueue, pRspWrapper);
X
Xiaoyu Wang 已提交
1285
    } else if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) {  // poll data while insert
1286
      taosMsleep(500);
wmmhello's avatar
wmmhello 已提交
1287 1288 1289
    } else{
      tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d, since %s, reqId:0x%" PRIx64, tmq->consumerId,
               vgId, epoch, tstrerror(code), requestId);
L
Liu Jicong 已提交
1290
    }
H
Haojun Liao 已提交
1291

L
fix txn  
Liu Jicong 已提交
1292
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1293 1294
  }

X
Xiaoyu Wang 已提交
1295
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
1296 1297
  int32_t clientEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < clientEpoch) {
L
Liu Jicong 已提交
1298
    // do not write into queue since updating epoch reset
X
Xiaoyu Wang 已提交
1299 1300
    tscWarn("consumer:0x%" PRIx64
            " msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64,
1301
            tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
H
Haojun Liao 已提交
1302

1303
    tsem_post(&tmq->rspSem);
1304 1305
    taosReleaseRef(tmqMgmt.rsetId, refId);

L
Liu Jicong 已提交
1306
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1307
    taosMemoryFree(pMsg->pEpSet);
X
Xiaoyu Wang 已提交
1308 1309 1310
    return 0;
  }

1311
  if (msgEpoch != clientEpoch) {
H
Haojun Liao 已提交
1312
    tscWarn("consumer:0x%" PRIx64 " mismatch rsp from vgId:%d, epoch %d, current epoch %d, reqId:0x%" PRIx64,
1313
            tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
X
Xiaoyu Wang 已提交
1314 1315
  }

L
Liu Jicong 已提交
1316 1317 1318
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

S
Shengliang Guan 已提交
1319
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1320
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1321
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1322
    taosMemoryFree(pMsg->pEpSet);
X
Xiaoyu Wang 已提交
1323 1324
    tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, epoch %d since out of memory", tmq->consumerId, vgId,
            epoch);
L
fix txn  
Liu Jicong 已提交
1325
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1326
  }
L
Liu Jicong 已提交
1327

L
Liu Jicong 已提交
1328
  pRspWrapper->tmqRspType = rspType;
L
Liu Jicong 已提交
1329 1330
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
H
Haojun Liao 已提交
1331
  pRspWrapper->reqId = requestId;
1332
  pRspWrapper->pEpset = pMsg->pEpSet;
1333
  pRspWrapper->vgId = pVg->vgId;
L
Liu Jicong 已提交
1334

1335
  pMsg->pEpSet = NULL;
L
Liu Jicong 已提交
1336
  if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1337 1338
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
1339
    tDecodeMqDataRsp(&decoder, &pRspWrapper->dataRsp);
wmmhello's avatar
wmmhello 已提交
1340
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1341
    memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1342

H
Haojun Liao 已提交
1343 1344
    char buf[80];
    tFormatOffset(buf, 80, &pRspWrapper->dataRsp.rspOffset);
H
Haojun Liao 已提交
1345
    tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req ver:%" PRId64 ", rsp:%s type %d, reqId:0x%" PRIx64,
H
Haojun Liao 已提交
1346
             tmq->consumerId, vgId, pRspWrapper->dataRsp.reqOffset.version, buf, rspType, requestId);
L
Liu Jicong 已提交
1347
  } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
1348 1349
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
1350
    tDecodeMqMetaRsp(&decoder, &pRspWrapper->metaRsp);
1351
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1352
    memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1353 1354 1355 1356 1357 1358
  } else if (rspType == TMQ_MSG_TYPE__TAOSX_RSP) {
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp);
    tDecoderClear(&decoder);
    memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead));
X
Xiaoyu Wang 已提交
1359 1360
  } else {  // invalid rspType
    tscError("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType);
L
Liu Jicong 已提交
1361
  }
L
Liu Jicong 已提交
1362

L
Liu Jicong 已提交
1363
  taosMemoryFree(pMsg->pData);
H
Haojun Liao 已提交
1364
  taosWriteQitem(tmq->mqueue, pRspWrapper);
L
Liu Jicong 已提交
1365

1366
  int32_t total = taosQueueItemSize(tmq->mqueue);
H
Haojun Liao 已提交
1367
  tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64,
1368
           tmq->consumerId, rspType, vgId, total, requestId);
H
Haojun Liao 已提交
1369

1370
  tsem_post(&tmq->rspSem);
1371 1372
  taosReleaseRef(tmqMgmt.rsetId, refId);

L
Liu Jicong 已提交
1373
  return 0;
H
Haojun Liao 已提交
1374

L
fix txn  
Liu Jicong 已提交
1375
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1376
  if (epoch == tmq->epoch) {
L
Liu Jicong 已提交
1377 1378
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
H
Haojun Liao 已提交
1379

1380
  tsem_post(&tmq->rspSem);
1381 1382
  taosReleaseRef(tmqMgmt.rsetId, refId);

L
Liu Jicong 已提交
1383
  return -1;
1384 1385
}

H
Haojun Liao 已提交
1386 1387 1388 1389 1390
typedef struct SVgroupSaveInfo {
  STqOffsetVal offset;
  int64_t      numOfRows;
} SVgroupSaveInfo;

H
Haojun Liao 已提交
1391 1392 1393 1394 1395 1396
static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap,
                                   tmq_t* tmq) {
  pTopic->schema = pTopicEp->schema;
  pTopicEp->schema.nCols = 0;
  pTopicEp->schema.pSchema = NULL;

X
Xiaoyu Wang 已提交
1397
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
H
Haojun Liao 已提交
1398 1399 1400 1401 1402
  int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);

  tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN);

wmmhello's avatar
wmmhello 已提交
1403
  tscDebug("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet);
H
Haojun Liao 已提交
1404 1405 1406 1407
  pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));

  for (int32_t j = 0; j < vgNumGet; j++) {
    SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
H
Haojun Liao 已提交
1408 1409

    makeTopicVgroupKey(vgKey, pTopic->topicName, pVgEp->vgId);
H
Haojun Liao 已提交
1410
    SVgroupSaveInfo* pInfo = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
H
Haojun Liao 已提交
1411

X
Xiaoyu Wang 已提交
1412 1413
    int64_t      numOfRows = 0;
    STqOffsetVal offsetNew = {.type = tmq->resetOffsetCfg};
H
Haojun Liao 已提交
1414 1415 1416
    if (pInfo != NULL) {
      offsetNew = pInfo->offset;
      numOfRows = pInfo->numOfRows;
H
Haojun Liao 已提交
1417 1418 1419 1420 1421 1422 1423 1424
    }

    SMqClientVg clientVg = {
        .pollCnt = 0,
        .vgId = pVgEp->vgId,
        .epSet = pVgEp->epSet,
        .vgStatus = TMQ_VG_STATUS__IDLE,
        .vgSkipCnt = 0,
H
Haojun Liao 已提交
1425
        .emptyBlockReceiveTs = 0,
H
Haojun Liao 已提交
1426
        .numOfRows = numOfRows,
H
Haojun Liao 已提交
1427 1428
    };

H
Haojun Liao 已提交
1429 1430 1431 1432
    clientVg.offsetInfo.currentOffset = offsetNew;
    clientVg.offsetInfo.committedOffset = offsetNew;
    clientVg.offsetInfo.walVerBegin = -1;
    clientVg.offsetInfo.walVerEnd = -1;
H
Haojun Liao 已提交
1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445
    taosArrayPush(pTopic->vgs, &clientVg);
  }
}

static void freeClientVgInfo(void* param) {
  SMqClientTopic* pTopic = param;
  if (pTopic->schema.nCols) {
    taosMemoryFreeClear(pTopic->schema.pSchema);
  }

  taosArrayDestroy(pTopic->vgs);
}

1446
static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
1447 1448
  bool set = false;

1449
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
1450
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
1451

X
Xiaoyu Wang 已提交
1452 1453
  char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
  tscDebug("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
1454
           tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur);
wmmhello's avatar
wmmhello 已提交
1455 1456 1457
  if (epoch <= tmq->epoch) {
    return false;
  }
1458 1459 1460 1461 1462 1463

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

H
Haojun Liao 已提交
1464 1465
  SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pVgOffsetHashMap == NULL) {
1466 1467 1468
    taosArrayDestroy(newTopics);
    return false;
  }
1469

H
Haojun Liao 已提交
1470
  // todo extract method
1471 1472 1473 1474 1475
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
wmmhello's avatar
wmmhello 已提交
1476
      tscDebug("consumer:0x%" PRIx64 ", current vg num: %d", tmq->consumerId, vgNumCur);
1477 1478
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
H
Haojun Liao 已提交
1479 1480
        makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId);

L
Liu Jicong 已提交
1481
        char buf[80];
H
Haojun Liao 已提交
1482
        tFormatOffset(buf, 80, &pVgCur->offsetInfo.currentOffset);
X
Xiaoyu Wang 已提交
1483 1484
        tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId,
                 vgKey, buf);
H
Haojun Liao 已提交
1485

H
Haojun Liao 已提交
1486
        SVgroupSaveInfo info = {.offset = pVgCur->offsetInfo.currentOffset, .numOfRows = pVgCur->numOfRows};
H
Haojun Liao 已提交
1487
        taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo));
1488 1489 1490 1491 1492 1493 1494
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
H
Haojun Liao 已提交
1495
    initClientTopicFromRsp(&topic, pTopicEp, pVgOffsetHashMap, tmq);
1496 1497
    taosArrayPush(newTopics, &topic);
  }
1498

H
Haojun Liao 已提交
1499 1500
  taosHashCleanup(pVgOffsetHashMap);

1501
  // destroy current buffered existed topics info
1502
  if (tmq->clientTopics) {
H
Haojun Liao 已提交
1503
    taosArrayDestroyEx(tmq->clientTopics, freeClientVgInfo);
X
Xiaoyu Wang 已提交
1504
  }
H
Haojun Liao 已提交
1505
  tmq->clientTopics = newTopics;
1506

X
Xiaoyu Wang 已提交
1507
  int8_t flag = (topicNumGet == 0) ? TMQ_CONSUMER_STATUS__NO_TOPIC : TMQ_CONSUMER_STATUS__READY;
H
Haojun Liao 已提交
1508
  atomic_store_8(&tmq->status, flag);
X
Xiaoyu Wang 已提交
1509
  atomic_store_32(&tmq->epoch, epoch);
H
Haojun Liao 已提交
1510

1511
  tscDebug("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId);
X
Xiaoyu Wang 已提交
1512 1513 1514
  return set;
}

1515
int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
1516
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
1517 1518 1519
  tmq_t*           tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);

  if (tmq == NULL) {
1520 1521 1522
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    pParam->pUserFn(tmq, terrno, NULL, pParam->pParam);

1523
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1524
    taosMemoryFree(pMsg->pEpSet);
1525 1526
    taosMemoryFree(pParam);
    return terrno;
1527 1528
  }

H
Haojun Liao 已提交
1529
  if (code != TSDB_CODE_SUCCESS) {
1530 1531 1532 1533 1534 1535 1536 1537 1538
    tscError("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code));
    pParam->pUserFn(tmq, code, NULL, pParam->pParam);

    taosReleaseRef(tmqMgmt.rsetId, pParam->refId);

    taosMemoryFree(pMsg->pData);
    taosMemoryFree(pMsg->pEpSet);
    taosMemoryFree(pParam);
    return code;
1539
  }
L
Liu Jicong 已提交
1540

L
Liu Jicong 已提交
1541
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1542
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1543
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1544 1545 1546
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
  if (head->epoch <= epoch) {
1547 1548
    tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep",
             tmq->consumerId, head->epoch, epoch);
1549

1550 1551 1552 1553 1554 1555 1556 1557
    if (tmq->status == TMQ_CONSUMER_STATUS__RECOVER) {
      SMqAskEpRsp rsp;
      tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
      int8_t flag = (taosArrayGetSize(rsp.topics) == 0) ? TMQ_CONSUMER_STATUS__NO_TOPIC : TMQ_CONSUMER_STATUS__READY;
      atomic_store_8(&tmq->status, flag);
      tDeleteSMqAskEpRsp(&rsp);
    }

X
Xiaoyu Wang 已提交
1558
  } else {
1559 1560
    tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId,
             head->epoch, epoch);
1561
  }
L
Liu Jicong 已提交
1562

1563
  pParam->pUserFn(tmq, code, pMsg, pParam->pParam);
1564 1565
  taosReleaseRef(tmqMgmt.rsetId, pParam->refId);

dengyihao's avatar
dengyihao 已提交
1566
  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
1567
  taosMemoryFree(pMsg->pData);
1568
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
1569
  return code;
1570 1571
}

L
Liu Jicong 已提交
1572
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1573 1574 1575 1576
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1577

1578
  pReq->withTbName = tmq->withTbName;
L
Liu Jicong 已提交
1579
  pReq->consumerId = tmq->consumerId;
1580
  pReq->timeout = timeout;
X
Xiaoyu Wang 已提交
1581
  pReq->epoch = tmq->epoch;
H
Haojun Liao 已提交
1582
  pReq->reqOffset = pVg->offsetInfo.currentOffset;
D
dapan1121 已提交
1583
  pReq->head.vgId = pVg->vgId;
1584 1585
  pReq->useSnapshot = tmq->useSnapshot;
  pReq->reqId = generateRequestId();
1586 1587
}

L
Liu Jicong 已提交
1588 1589
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
L
Liu Jicong 已提交
1590
  pRspObj->resType = RES_TYPE__TMQ_META;
L
Liu Jicong 已提交
1591 1592 1593 1594 1595 1596 1597 1598
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

1599
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
L
Liu Jicong 已提交
1600 1601
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
1602

1603
  (*numOfRows) = 0;
L
Liu Jicong 已提交
1604 1605
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
1606

L
Liu Jicong 已提交
1607
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1608
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1609
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
L
Liu Jicong 已提交
1610

L
Liu Jicong 已提交
1611 1612
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
1613

L
Liu Jicong 已提交
1614
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1615 1616
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1617

1618
  // extract the rows in this data packet
X
Xiaoyu Wang 已提交
1619
  for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) {
1620
    SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, i);
X
Xiaoyu Wang 已提交
1621
    int64_t            rows = htobe64(pRetrieve->numOfRows);
1622
    pVg->numOfRows += rows;
1623
    (*numOfRows) += rows;
1624 1625
  }

L
Liu Jicong 已提交
1626
  return pRspObj;
X
Xiaoyu Wang 已提交
1627 1628
}

L
Liu Jicong 已提交
1629 1630
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
1631
  pRspObj->resType = RES_TYPE__TMQ_METADATA;
L
Liu Jicong 已提交
1632 1633 1634 1635
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;
  pRspObj->resIter = -1;
1636
  memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp));
L
Liu Jicong 已提交
1637 1638 1639

  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
1640
  if (!pWrapper->taosxRsp.withSchema) {
L
Liu Jicong 已提交
1641 1642 1643 1644 1645 1646
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }

  return pRspObj;
}

1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679
static int32_t handleErrorBeforePoll(SMqClientVg* pVg, tmq_t* pTmq) {
  atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  tsem_post(&pTmq->rspSem);
  return -1;
}

static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg, int64_t timeout) {
  SMqPollReq req = {0};
  tmqBuildConsumeReqImpl(&req, pTmq, timeout, pTopic, pVg);

  int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
  if (msgSize < 0) {
    return handleErrorBeforePoll(pVg, pTmq);
  }

  char* msg = taosMemoryCalloc(1, msgSize);
  if (NULL == msg) {
    return handleErrorBeforePoll(pVg, pTmq);
  }

  if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
    taosMemoryFree(msg);
    return handleErrorBeforePoll(pVg, pTmq);
  }

  SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
  if (pParam == NULL) {
    taosMemoryFree(msg);
    return handleErrorBeforePoll(pVg, pTmq);
  }

  pParam->refId = pTmq->refId;
  pParam->epoch = pTmq->epoch;
H
Haojun Liao 已提交
1680
  pParam->pVg = pVg;
1681 1682
  pParam->pTopic = pTopic;
  pParam->vgId = pVg->vgId;
H
Haojun Liao 已提交
1683
  pParam->requestId = req.reqId;
1684 1685 1686 1687 1688 1689 1690 1691

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pParam);
    taosMemoryFree(msg);
    return handleErrorBeforePoll(pVg, pTmq);
  }

H
Haojun Liao 已提交
1692
  sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
1693 1694 1695 1696 1697 1698 1699 1700
  sendInfo->requestId = req.reqId;
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqPollCb;
  sendInfo->msgType = TDMT_VND_TMQ_CONSUME;

  int64_t transporterId = 0;
  char    offsetFormatBuf[80];
H
Haojun Liao 已提交
1701
  tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.currentOffset);
1702

X
Xiaoyu Wang 已提交
1703 1704
  tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId,
           pTopic->topicName, pVg->vgId, pTmq->epoch, offsetFormatBuf, req.reqId);
1705 1706 1707
  asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);

  pVg->pollCnt++;
1708
  pVg->seekUpdated = false;   // reset this flag.
1709 1710 1711 1712 1713
  pTmq->pollCnt++;

  return TSDB_CODE_SUCCESS;
}

1714
// broadcast the poll request to all related vnodes
H
Haojun Liao 已提交
1715
static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
1716
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
X
Xiaoyu Wang 已提交
1717
  tscDebug("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics);
1718 1719

  for (int i = 0; i < numOfTopics; i++) {
X
Xiaoyu Wang 已提交
1720
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
X
Xiaoyu Wang 已提交
1721
    int32_t         numOfVg = taosArrayGetSize(pTopic->vgs);
1722 1723

    for (int j = 0; j < numOfVg; j++) {
X
Xiaoyu Wang 已提交
1724
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
X
Xiaoyu Wang 已提交
1725 1726 1727
      if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) {  // less than 100ms
        tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId,
                 tmq->epoch, pVg->vgId);
H
Haojun Liao 已提交
1728 1729 1730
        continue;
      }

1731
      int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
1732
      if (vgStatus == TMQ_VG_STATUS__WAIT) {
L
Liu Jicong 已提交
1733
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
H
Haojun Liao 已提交
1734
        tscTrace("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
X
Xiaoyu Wang 已提交
1735
                 pVg->vgId, vgSkipCnt);
X
Xiaoyu Wang 已提交
1736 1737
        continue;
      }
1738

L
Liu Jicong 已提交
1739
      atomic_store_32(&pVg->vgSkipCnt, 0);
1740 1741 1742
      int32_t code = doTmqPollImpl(tmq, pTopic, pVg, timeout);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
D
dapan1121 已提交
1743
      }
X
Xiaoyu Wang 已提交
1744 1745
    }
  }
1746

1747
  tscDebug("consumer:0x%" PRIx64 " end to poll data", tmq->consumerId);
X
Xiaoyu Wang 已提交
1748 1749 1750
  return 0;
}

H
Haojun Liao 已提交
1751
static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
L
Liu Jicong 已提交
1752
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1753
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1754 1755
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1756
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
1757
      doUpdateLocalEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1758
      /*tmqClearUnhandleMsg(tmq);*/
L
Liu Jicong 已提交
1759
      tDeleteSMqAskEpRsp(rspMsg);
X
Xiaoyu Wang 已提交
1760 1761
      *pReset = true;
    } else {
L
Liu Jicong 已提交
1762
      tmqFreeRspWrapper(rspWrapper);
X
Xiaoyu Wang 已提交
1763 1764 1765 1766 1767 1768 1769 1770
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

H
Haojun Liao 已提交
1771
static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
H
Haojun Liao 已提交
1772
  tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems);
1773

X
Xiaoyu Wang 已提交
1774
  while (1) {
1775 1776
    SMqRspWrapper* pRspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&pRspWrapper);
1777

1778
    if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1779
      taosReadAllQitems(tmq->mqueue, tmq->qall);
1780 1781
      taosGetQitem(tmq->qall, (void**)&pRspWrapper);
      if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1782 1783
        return NULL;
      }
X
Xiaoyu Wang 已提交
1784 1785
    }

X
Xiaoyu Wang 已提交
1786
    tscDebug("consumer:0x%" PRIx64 " handle rsp, type:%d", tmq->consumerId, pRspWrapper->tmqRspType);
H
Haojun Liao 已提交
1787

1788 1789
    if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(pRspWrapper);
L
Liu Jicong 已提交
1790
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
H
Haojun Liao 已提交
1791
      tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(terrno));
L
Liu Jicong 已提交
1792
      return NULL;
1793 1794
    } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
H
Haojun Liao 已提交
1795

X
Xiaoyu Wang 已提交
1796
      int32_t     consumerEpoch = atomic_load_32(&tmq->epoch);
1797 1798 1799
      SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp;

      if (pDataRsp->head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1800
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
1801 1802 1803 1804 1805

        // update the epset
        if (pollRspWrapper->pEpset != NULL) {
          SEp* pEp = GET_ACTIVE_EP(pollRspWrapper->pEpset);
          SEp* pOld = GET_ACTIVE_EP(&(pVg->epSet));
X
Xiaoyu Wang 已提交
1806 1807
          tscDebug("consumer:0x%" PRIx64 " update epset vgId:%d, ep:%s:%d, old ep:%s:%d", tmq->consumerId, pVg->vgId,
                   pEp->fqdn, pEp->port, pOld->fqdn, pOld->port);
1808 1809 1810
          pVg->epSet = *pollRspWrapper->pEpset;
        }

1811 1812 1813 1814 1815
        // update the local offset value only for the returned values, only when the local offset is NOT updated
        // by tmq_offset_seek function
        if (!pVg->seekUpdated) {
          pVg->offsetInfo.currentOffset = pDataRsp->rspOffset;
        }
1816 1817

        // update the status
X
Xiaoyu Wang 已提交
1818
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
H
Haojun Liao 已提交
1819

1820 1821 1822
        // update the valid wal version range
        pVg->offsetInfo.walVerBegin = pDataRsp->head.walsver;
        pVg->offsetInfo.walVerEnd = pDataRsp->head.walever;
1823
        pVg->receivedInfoFromVnode = true;
1824

1825 1826 1827
        char buf[80];
        tFormatOffset(buf, 80, &pDataRsp->rspOffset);
        if (pDataRsp->blockNum == 0) {
X
Xiaoyu Wang 已提交
1828 1829 1830
          tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64
                   " total:%" PRId64 " reqId:0x%" PRIx64,
                   tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
1831
          pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
1832
          pVg->emptyBlockReceiveTs = taosGetTimestampMs();
L
Liu Jicong 已提交
1833
          taosFreeQitem(pollRspWrapper);
1834
        } else {  // build rsp
X
Xiaoyu Wang 已提交
1835
          int64_t    numOfRows = 0;
1836
          SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
H
Haojun Liao 已提交
1837
          tmq->totalRows += numOfRows;
1838
          pVg->emptyBlockReceiveTs = 0;
H
Haojun Liao 已提交
1839
          tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
H
Haojun Liao 已提交
1840
                   " vg total:%" PRId64 " total:%" PRId64 ", reqId:0x%" PRIx64,
H
Haojun Liao 已提交
1841
                   tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
H
Haojun Liao 已提交
1842
                   pollRspWrapper->reqId);
1843 1844 1845
          taosFreeQitem(pollRspWrapper);
          return pRsp;
        }
X
Xiaoyu Wang 已提交
1846
      } else {
H
Haojun Liao 已提交
1847
        tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
1848
                 tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch);
1849
        pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
L
Liu Jicong 已提交
1850 1851
        taosFreeQitem(pollRspWrapper);
      }
1852
    } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
1853
      // todo handle the wal range and epset for each vgroup
1854
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
L
Liu Jicong 已提交
1855
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
1856 1857 1858

      tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId);

L
Liu Jicong 已提交
1859
      if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1860
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
1861
        if(pollRspWrapper->metaRsp.rspOffset.type != 0){    // if offset is validate
H
Haojun Liao 已提交
1862
          pVg->offsetInfo.currentOffset = pollRspWrapper->metaRsp.rspOffset;
1863
        }
L
Liu Jicong 已提交
1864 1865
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // build rsp
L
Liu Jicong 已提交
1866
        SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1867 1868 1869
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
H
Haojun Liao 已提交
1870
        tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
1871
                 tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
1872
        pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
L
Liu Jicong 已提交
1873
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1874
      }
1875 1876
    } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
X
Xiaoyu Wang 已提交
1877
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
H
Haojun Liao 已提交
1878

L
Liu Jicong 已提交
1879 1880
      if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
1881
        if(pollRspWrapper->taosxRsp.rspOffset.type != 0){    // if offset is validate
H
Haojun Liao 已提交
1882
          pVg->offsetInfo.currentOffset = pollRspWrapper->taosxRsp.rspOffset;
1883
        }
L
Liu Jicong 已提交
1884
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
H
Haojun Liao 已提交
1885

L
Liu Jicong 已提交
1886
        if (pollRspWrapper->taosxRsp.blockNum == 0) {
H
Haojun Liao 已提交
1887 1888
          tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 " reqId:0x%" PRIx64,
                   tmq->consumerId, pVg->vgId, pVg->numOfRows, pollRspWrapper->reqId);
H
Haojun Liao 已提交
1889
          pVg->emptyBlockReceiveTs = taosGetTimestampMs();
1890
          pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
H
Haojun Liao 已提交
1891
          taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1892
          continue;
H
Haojun Liao 已提交
1893
        } else {
X
Xiaoyu Wang 已提交
1894
          pVg->emptyBlockReceiveTs = 0;  // reset the ts
L
Liu Jicong 已提交
1895
        }
wmmhello's avatar
wmmhello 已提交
1896

L
Liu Jicong 已提交
1897
        // build rsp
X
Xiaoyu Wang 已提交
1898
        void*   pRsp = NULL;
1899
        int64_t numOfRows = 0;
L
Liu Jicong 已提交
1900
        if (pollRspWrapper->taosxRsp.createTableNum == 0) {
1901
          pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
L
Liu Jicong 已提交
1902
        } else {
wmmhello's avatar
wmmhello 已提交
1903 1904
          pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper);
        }
H
Haojun Liao 已提交
1905

1906 1907
        tmq->totalRows += numOfRows;

H
Haojun Liao 已提交
1908
        char buf[80];
H
Haojun Liao 已提交
1909
        tFormatOffset(buf, 80, &pVg->offsetInfo.currentOffset);
H
Haojun Liao 已提交
1910
        tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
X
Xiaoyu Wang 已提交
1911
                 ", vg total:%" PRId64 " total:%" PRId64 " reqId:0x%" PRIx64,
H
Haojun Liao 已提交
1912
                 tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows,
H
Haojun Liao 已提交
1913
                 tmq->totalRows, pollRspWrapper->reqId);
H
Haojun Liao 已提交
1914 1915

        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1916
        return pRsp;
H
Haojun Liao 已提交
1917

L
Liu Jicong 已提交
1918
      } else {
H
Haojun Liao 已提交
1919
        tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
1920
                 tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
1921
        pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
L
Liu Jicong 已提交
1922 1923
        taosFreeQitem(pollRspWrapper);
      }
X
Xiaoyu Wang 已提交
1924
    } else {
H
Haojun Liao 已提交
1925 1926
      tscDebug("consumer:0x%" PRIx64 " not data msg received", tmq->consumerId);

X
Xiaoyu Wang 已提交
1927
      bool reset = false;
1928 1929
      tmqHandleNoPollRsp(tmq, pRspWrapper, &reset);
      taosFreeQitem(pRspWrapper);
X
Xiaoyu Wang 已提交
1930
      if (pollIfReset && reset) {
1931
        tscDebug("consumer:0x%" PRIx64 ", reset and repoll", tmq->consumerId);
1932
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
1933 1934 1935 1936 1937
      }
    }
  }
}

1938
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1939 1940
  void*   rspObj;
  int64_t startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1941

X
Xiaoyu Wang 已提交
1942 1943
  tscDebug("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime,
           timeout);
L
Liu Jicong 已提交
1944

1945
  // in no topic status, delayed task also need to be processed
L
Liu Jicong 已提交
1946
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1947
    tscDebug("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId);
1948
    taosMsleep(500);  //     sleep for a while
1949 1950 1951
    return NULL;
  }

wmmhello's avatar
wmmhello 已提交
1952
  while (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
L
Liu Jicong 已提交
1953
    int32_t retryCnt = 0;
1954
    while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) {
H
Haojun Liao 已提交
1955
      if (retryCnt++ > 40) {
L
Liu Jicong 已提交
1956 1957
        return NULL;
      }
1958

H
Haojun Liao 已提交
1959
      tscDebug("consumer:0x%" PRIx64 " not ready, retry:%d/40 in 500ms", tmq->consumerId, retryCnt);
L
Liu Jicong 已提交
1960 1961 1962 1963
      taosMsleep(500);
    }
  }

X
Xiaoyu Wang 已提交
1964
  while (1) {
L
Liu Jicong 已提交
1965
    tmqHandleAllDelayedTask(tmq);
1966

L
Liu Jicong 已提交
1967
    if (tmqPollImpl(tmq, timeout) < 0) {
1968
      tscDebug("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId);
L
Liu Jicong 已提交
1969
    }
L
Liu Jicong 已提交
1970

1971
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1972
    if (rspObj) {
1973
      tscDebug("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj);
L
Liu Jicong 已提交
1974
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
1975
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
1976
      tscDebug("consumer:0x%" PRIx64 " return null since no committed offset", tmq->consumerId);
L
Liu Jicong 已提交
1977
      return NULL;
X
Xiaoyu Wang 已提交
1978
    }
1979

1980
    if (timeout >= 0) {
L
Liu Jicong 已提交
1981
      int64_t currentTime = taosGetTimestampMs();
1982 1983 1984
      int64_t elapsedTime = currentTime - startTime;
      if (elapsedTime > timeout) {
        tscDebug("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
L
Liu Jicong 已提交
1985
                 tmq->consumerId, tmq->epoch, startTime, currentTime);
X
Xiaoyu Wang 已提交
1986 1987
        return NULL;
      }
1988
      tsem_timewait(&tmq->rspSem, (timeout - elapsedTime));
L
Liu Jicong 已提交
1989 1990
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
L
Liu Jicong 已提交
1991
      tsem_timewait(&tmq->rspSem, 1000);
X
Xiaoyu Wang 已提交
1992 1993 1994 1995
    }
  }
}

1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009
static void displayConsumeStatistics(const tmq_t* pTmq) {
  int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics);
  tscDebug("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d",
           pTmq->consumerId, pTmq->pollCnt, pTmq->totalRows, numOfTopics, pTmq->epoch);

  tscDebug("consumer:0x%" PRIx64 " rows dist begin: ", pTmq->consumerId);
  for (int32_t i = 0; i < numOfTopics; ++i) {
    SMqClientTopic* pTopics = taosArrayGet(pTmq->clientTopics, i);

    tscDebug("consumer:0x%" PRIx64 " topic:%d", pTmq->consumerId, i);
    int32_t numOfVgs = taosArrayGetSize(pTopics->vgs);
    for (int32_t j = 0; j < numOfVgs; ++j) {
      SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j);
      tscDebug("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows);
2010
    }
2011
  }
2012

2013 2014
  tscDebug("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId);
}
2015

2016
int32_t tmq_consumer_close(tmq_t* tmq) {
X
Xiaoyu Wang 已提交
2017
  tscDebug("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status);
2018
  displayConsumeStatistics(tmq);
2019

2020 2021 2022 2023 2024 2025
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
    // if auto commit is set, commit before close consumer. Otherwise, do nothing.
    if (tmq->autoCommit) {
      int32_t rsp = tmq_commit_sync(tmq, NULL);
      if (rsp != 0) {
        return rsp;
2026 2027 2028
      }
    }

L
Liu Jicong 已提交
2029
    int32_t     retryCnt = 0;
2030
    tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
2031
    while (1) {
2032
      int32_t rsp = tmq_subscribe(tmq, lst);
L
Liu Jicong 已提交
2033 2034 2035 2036 2037 2038 2039 2040
      if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
        break;
      } else {
        retryCnt++;
        taosMsleep(500);
      }
    }

2041
    tmq_list_destroy(lst);
2042
  } else {
X
Xiaoyu Wang 已提交
2043
    tscWarn("consumer:0x%" PRIx64 " not in ready state, close it directly", tmq->consumerId);
L
Liu Jicong 已提交
2044
  }
H
Haojun Liao 已提交
2045

2046
  taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
L
Liu Jicong 已提交
2047
  return 0;
2048
}
L
Liu Jicong 已提交
2049

L
Liu Jicong 已提交
2050 2051
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
2052
    return "success";
L
Liu Jicong 已提交
2053
  } else if (err == -1) {
L
Liu Jicong 已提交
2054 2055 2056
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
2057 2058
  }
}
L
Liu Jicong 已提交
2059

L
Liu Jicong 已提交
2060 2061 2062 2063 2064
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
    return TMQ_RES_TABLE_META;
2065 2066
  } else if (TD_RES_TMQ_METADATA(res)) {
    return TMQ_RES_METADATA;
L
Liu Jicong 已提交
2067 2068 2069 2070 2071
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
2072
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
2073 2074
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
2075
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
2076 2077 2078
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
2079 2080 2081
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
2082 2083 2084 2085 2086
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
2087 2088 2089 2090
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
2091 2092 2093
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
2094 2095 2096
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
2097 2098 2099 2100 2101
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
2102 2103 2104 2105
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
2106 2107 2108
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
2109
  } else if (TD_RES_TMQ_METADATA(res)) {
2110 2111
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
2112 2113 2114 2115
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
2116

2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139
int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*) res;
    STqOffsetVal* pOffset = &pRspObj->rsp.rspOffset;
    if (pOffset->type == TMQ_OFFSET__LOG) {
      return pRspObj->rsp.rspOffset.version;
    }
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pRspObj = (SMqMetaRspObj*)res;
    if (pRspObj->metaRsp.rspOffset.type == TMQ_OFFSET__LOG) {
      return pRspObj->metaRsp.rspOffset.version;
    }
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*) res;
    if (pRspObj->rsp.rspOffset.type == TMQ_OFFSET__LOG) {
      return pRspObj->rsp.rspOffset.version;
    }
  }

  // data from tsdb, no valid offset info
  return -1;
}

L
Liu Jicong 已提交
2140 2141 2142 2143 2144 2145 2146
const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
2147
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
2148 2149
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
L
Liu Jicong 已提交
2150 2151 2152
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
2153
    }
L
Liu Jicong 已提交
2154 2155
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
  }
L
Liu Jicong 已提交
2156 2157
  return NULL;
}
2158

2159 2160 2161 2162
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) {
  if (pRes == NULL) {  // here needs to commit all offsets.
    asyncCommitAllOffsets(tmq, cb, param);
  } else {  // only commit one offset
2163
    asyncCommitOffset(tmq, pRes, TDMT_VND_TMQ_COMMIT_OFFSET, cb, param);
2164
  }
L
Liu Jicong 已提交
2165 2166
}

2167
static void commitCallBackFn(tmq_t *UNUSED_PARAM(tmq), int32_t code, void* param) {
2168 2169 2170
  SSyncCommitInfo* pInfo = (SSyncCommitInfo*) param;
  pInfo->code = code;
  tsem_post(&pInfo->sem);
2171
}
2172

2173 2174 2175 2176 2177 2178 2179 2180 2181
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
  int32_t code = 0;

  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
  tsem_init(&pInfo->sem, 0, 0);
  pInfo->code = 0;

  if (pRes == NULL) {
    asyncCommitAllOffsets(tmq, commitCallBackFn, pInfo);
H
Haojun Liao 已提交
2182
  } else {
2183
    asyncCommitOffset(tmq, pRes, TDMT_VND_TMQ_COMMIT_OFFSET, commitCallBackFn, pInfo);
2184 2185
  }

2186 2187
  tsem_wait(&pInfo->sem);
  code = pInfo->code;
H
Haojun Liao 已提交
2188 2189

  tsem_destroy(&pInfo->sem);
2190 2191
  taosMemoryFree(pInfo);

X
Xiaoyu Wang 已提交
2192
  tscDebug("consumer:0x%" PRIx64 " sync commit done, code:%s", tmq->consumerId, tstrerror(code));
2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204
  return code;
}

void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
  SAskEpInfo* pInfo = param;
  pInfo->code = code;

  if (code == TSDB_CODE_SUCCESS) {
    SMqRspHead* head = pDataBuf->pData;

    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &rsp);
2205
    doUpdateLocalEp(pTmq, head->epoch, &rsp);
2206 2207 2208
    tDeleteSMqAskEpRsp(&rsp);
  }

H
Haojun Liao 已提交
2209
  tsem_post(&pInfo->sem);
2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235
}

void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return;
  }

  SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0);
  if (pWrapper == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return;
  }

  SMqRspHead* head = pDataBuf->pData;

  pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
  pWrapper->epoch = head->epoch;
  memcpy(&pWrapper->msg, pDataBuf->pData, sizeof(SMqRspHead));
  tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &pWrapper->msg);

  taosWriteQitem(pTmq->mqueue, pWrapper);
}

int32_t doAskEp(tmq_t* pTmq) {
  SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo));
H
Haojun Liao 已提交
2236
  tsem_init(&pInfo->sem, 0, 0);
2237 2238

  asyncAskEp(pTmq, updateEpCallbackFn, pInfo);
H
Haojun Liao 已提交
2239
  tsem_wait(&pInfo->sem);
2240 2241

  int32_t code = pInfo->code;
H
Haojun Liao 已提交
2242
  tsem_destroy(&pInfo->sem);
2243 2244 2245 2246 2247
  taosMemoryFree(pInfo);
  return code;
}

void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) {
2248
  SMqAskEpReq req = {0};
2249 2250 2251
  req.consumerId = pTmq->consumerId;
  req.epoch = pTmq->epoch;
  strcpy(req.cgroup, pTmq->groupId);
2252 2253 2254

  int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
  if (tlen < 0) {
2255 2256 2257
    tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", pTmq->consumerId);
    askEpFn(pTmq, TSDB_CODE_INVALID_PARA, NULL, param);
    return;
2258 2259 2260 2261
  }

  void* pReq = taosMemoryCalloc(1, tlen);
  if (pReq == NULL) {
2262 2263 2264
    tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", pTmq->consumerId, tlen);
    askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
    return;
2265 2266 2267
  }

  if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
2268
    tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen);
2269
    taosMemoryFree(pReq);
2270 2271 2272

    askEpFn(pTmq, TSDB_CODE_INVALID_PARA, NULL, param);
    return;
2273 2274 2275 2276
  }

  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
  if (pParam == NULL) {
2277
    tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId);
2278
    taosMemoryFree(pReq);
2279 2280 2281

    askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
    return;
2282 2283
  }

2284 2285 2286 2287
  pParam->refId = pTmq->refId;
  pParam->epoch = pTmq->epoch;
  pParam->pUserFn = askEpFn;
  pParam->pParam = param;
2288 2289 2290 2291 2292

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pParam);
    taosMemoryFree(pReq);
2293 2294
    askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
    return;
2295 2296
  }

X
Xiaoyu Wang 已提交
2297
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
2298 2299 2300 2301

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
2302
  sendInfo->fp = askEpCallbackFn;
2303 2304
  sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;

2305 2306
  SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
  tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
2307 2308

  int64_t transporterId = 0;
2309
  asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
2310 2311 2312 2313 2314 2315 2316
}

int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
2317 2318 2319
  int64_t refId = pParamSet->refId;

  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
2320 2321 2322 2323 2324 2325 2326
  if (tmq == NULL) {
    taosMemoryFree(pParamSet);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  // if no more waiting rsp
2327
  pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam);
2328
  taosMemoryFree(pParamSet);
2329 2330

  taosReleaseRef(tmqMgmt.rsetId, refId);
2331
  return 0;
2332 2333
}

2334
void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) {
2335 2336
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
  if (waitingRspNum == 0) {
H
Haojun Liao 已提交
2337 2338
    tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic,
             vgId);
2339
    tmqCommitDone(pParamSet);
H
Haojun Liao 已提交
2340 2341 2342
  } else {
    tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId,
             waitingRspNum);
2343 2344
  }
}
2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366

SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) {
  SMqRspObj* pRspObj = (SMqRspObj*)res;
  pRspObj->resIter++;

  if (pRspObj->resIter < pRspObj->rsp.blockNum) {
    SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter);
    if (pRspObj->rsp.withSchema) {
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRspObj->rsp.blockSchema, pRspObj->resIter);
      setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols);
      taosMemoryFreeClear(pRspObj->resInfo.row);
      taosMemoryFreeClear(pRspObj->resInfo.pCol);
      taosMemoryFreeClear(pRspObj->resInfo.length);
      taosMemoryFreeClear(pRspObj->resInfo.convertBuf);
      taosMemoryFreeClear(pRspObj->resInfo.convertJson);
    }

    setQueryResultFromRsp(&pRspObj->resInfo, pRetrieve, convertUcs4, false);
    return &pRspObj->resInfo;
  }

  return NULL;
H
Haojun Liao 已提交
2367 2368
}

2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389
static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
  SMqVgWalInfoParam* pParam = param;
  SMqVgCommon* pCommon = pParam->pCommon;

  int32_t total = atomic_add_fetch_32(&pCommon->numOfRsp, 1);
  if (code != TSDB_CODE_SUCCESS) {
    tscError("consumer:0x%" PRIx64 " failed to get the wal info from vgId:%d for topic:%s", pCommon->consumerId,
             pParam->vgId, pCommon->pTopicName);
    pCommon->code = code;
  } else {
    SMqDataRsp rsp;
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeMqDataRsp(&decoder, &rsp);
    tDecoderClear(&decoder);

    SMqRspHead* pHead = pMsg->pData;

    tmq_topic_assignment assignment = {.begin = pHead->walsver,
                                       .end = pHead->walever,
                                       .currentOffset = rsp.rspOffset.version,
2390
                                       .vgId = pParam->vgId};
2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412

    taosThreadMutexLock(&pCommon->mutex);
    taosArrayPush(pCommon->pList, &assignment);
    taosThreadMutexUnlock(&pCommon->mutex);
  }

  if (total == pParam->totalReq) {
    tsem_post(&pCommon->rsp);
  }

  taosMemoryFree(pParam);
  return 0;
}

static void destroyCommonInfo(SMqVgCommon* pCommon) {
  taosArrayDestroy(pCommon->pList);
  tsem_destroy(&pCommon->rsp);
  taosThreadMutexDestroy(&pCommon->mutex);
  taosMemoryFree(pCommon->pTopicName);
  taosMemoryFree(pCommon);
}

H
Haojun Liao 已提交
2413
int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment,
H
Haojun Liao 已提交
2414
                                 int32_t* numOfAssignment) {
H
Haojun Liao 已提交
2415 2416 2417
  *numOfAssignment = 0;
  *assignment = NULL;

2418
  int32_t accId = tmq->pTscObj->acctId;
2419
  char    tname[128] = {0};
2420 2421 2422
  sprintf(tname, "%d.%s", accId, pTopicName);

  SMqClientTopic* pTopic = getTopicByName(tmq, tname);
H
Haojun Liao 已提交
2423 2424 2425 2426 2427 2428
  if (pTopic == NULL) {
    return TSDB_CODE_INVALID_PARA;
  }

  // in case of snapshot is opened, no valid offset will return
  *numOfAssignment = taosArrayGetSize(pTopic->vgs);
2429 2430 2431 2432 2433 2434 2435 2436

  *assignment = taosMemoryCalloc(*numOfAssignment, sizeof(tmq_topic_assignment));
  if (*assignment == NULL) {
    tscError("consumer:0x%" PRIx64 " failed to malloc buffer, size:%" PRIzu, tmq->consumerId,
             (*numOfAssignment) * sizeof(tmq_topic_assignment));
    return TSDB_CODE_OUT_OF_MEMORY;
  }

2437 2438
  bool needFetch = false;

H
Haojun Liao 已提交
2439 2440
  for (int32_t j = 0; j < (*numOfAssignment); ++j) {
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
2441
    if (!pClientVg->receivedInfoFromVnode) {
2442 2443 2444
      needFetch = true;
      break;
    }
H
Haojun Liao 已提交
2445 2446 2447 2448 2449 2450 2451 2452 2453 2454

    tmq_topic_assignment* pAssignment = &(*assignment)[j];
    if (pClientVg->offsetInfo.currentOffset.type == TMQ_OFFSET__LOG) {
      pAssignment->currentOffset = pClientVg->offsetInfo.currentOffset.version;
    } else {
      pAssignment->currentOffset = 0;
    }

    pAssignment->begin = pClientVg->offsetInfo.walVerBegin;
    pAssignment->end = pClientVg->offsetInfo.walVerEnd;
2455
    pAssignment->vgId = pClientVg->vgId;
H
Haojun Liao 已提交
2456 2457
  }

2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539
  if (needFetch) {
    SMqVgCommon* pCommon = taosMemoryCalloc(1, sizeof(SMqVgCommon));
    if (pCommon == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return terrno;
    }

    pCommon->pList= taosArrayInit(4, sizeof(tmq_topic_assignment));
    tsem_init(&pCommon->rsp, 0, 0);
    taosThreadMutexInit(&pCommon->mutex, 0);
    pCommon->pTopicName = taosStrdup(pTopic->topicName);
    pCommon->consumerId = tmq->consumerId;

    terrno = TSDB_CODE_OUT_OF_MEMORY;
    for (int32_t i = 0; i < (*numOfAssignment); ++i) {
      SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);

      SMqVgWalInfoParam* pParam = taosMemoryMalloc(sizeof(SMqVgWalInfoParam));
      if (pParam == NULL) {
        destroyCommonInfo(pCommon);
        return terrno;
      }

      pParam->epoch = tmq->epoch;
      pParam->vgId = pClientVg->vgId;
      pParam->totalReq = *numOfAssignment;
      pParam->pCommon = pCommon;

      SMqPollReq req = {0};
      tmqBuildConsumeReqImpl(&req, tmq, 10, pTopic, pClientVg);

      int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
      if (msgSize < 0) {
        taosMemoryFree(pParam);
        destroyCommonInfo(pCommon);
        return terrno;
      }

      char* msg = taosMemoryCalloc(1, msgSize);
      if (NULL == msg) {
        taosMemoryFree(pParam);
        destroyCommonInfo(pCommon);
        return terrno;
      }

      if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
        taosMemoryFree(msg);
        taosMemoryFree(pParam);
        destroyCommonInfo(pCommon);
        return terrno;
      }

      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
      if (sendInfo == NULL) {
        taosMemoryFree(pParam);
        taosMemoryFree(msg);
        destroyCommonInfo(pCommon);
        return terrno;
      }

      sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
      sendInfo->requestId = req.reqId;
      sendInfo->requestObjRefId = 0;
      sendInfo->param = pParam;
      sendInfo->fp = tmqGetWalInfoCb;
      sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO;

      int64_t transporterId = 0;
      char    offsetFormatBuf[80];
      tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.currentOffset);

      tscDebug("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64,
               tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo);
    }

    tsem_wait(&pCommon->rsp);
    int32_t code = pCommon->code;

    terrno = code;
    if (code != TSDB_CODE_SUCCESS) {
      taosMemoryFree(*assignment);
2540
      *assignment = NULL;
2541 2542 2543 2544 2545 2546 2547 2548 2549
      *numOfAssignment = 0;
    } else {
      int32_t num = taosArrayGetSize(pCommon->pList);
      for(int32_t i = 0; i < num; ++i) {
        (*assignment)[i] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i);
      }
      *numOfAssignment = num;
    }

2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574
    for (int32_t j = 0; j < (*numOfAssignment); ++j) {
      tmq_topic_assignment* p = &(*assignment)[j];

      for(int32_t i = 0; i < taosArrayGetSize(pTopic->vgs); ++i) {
        SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
        if (pClientVg->vgId != p->vgId) {
          continue;
        }

        SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;

        pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG;

        char offsetBuf[80] = {0};
        tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset);

        tscDebug("vgId:%d offset is update to:%s", p->vgId, offsetBuf);

        pOffsetInfo->walVerBegin = p->begin;
        pOffsetInfo->walVerEnd = p->end;
        pOffsetInfo->currentOffset.version = p->currentOffset;
        pOffsetInfo->committedOffset.version = p->currentOffset;
      }
    }

2575 2576 2577 2578 2579
    destroyCommonInfo(pCommon);
    return code;
  } else {
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
2580 2581
}

2582
int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
H
Haojun Liao 已提交
2583
  if (tmq == NULL) {
H
Haojun Liao 已提交
2584
    tscError("invalid tmq handle, null");
H
Haojun Liao 已提交
2585 2586 2587
    return TSDB_CODE_INVALID_PARA;
  }

2588 2589 2590 2591 2592
  int32_t accId = tmq->pTscObj->acctId;
  char tname[128] = {0};
  sprintf(tname, "%d.%s", accId, pTopicName);

  SMqClientTopic* pTopic = getTopicByName(tmq, tname);
H
Haojun Liao 已提交
2593
  if (pTopic == NULL) {
2594
    tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
H
Haojun Liao 已提交
2595 2596 2597 2598
    return TSDB_CODE_INVALID_PARA;
  }

  SMqClientVg* pVg = NULL;
H
Haojun Liao 已提交
2599 2600
  int32_t      numOfVgs = taosArrayGetSize(pTopic->vgs);
  for (int32_t i = 0; i < numOfVgs; ++i) {
H
Haojun Liao 已提交
2601
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
2602
    if (pClientVg->vgId == vgId) {
H
Haojun Liao 已提交
2603 2604 2605 2606 2607 2608
      pVg = pClientVg;
      break;
    }
  }

  if (pVg == NULL) {
2609
    tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgId);
H
Haojun Liao 已提交
2610 2611 2612 2613 2614 2615 2616
    return TSDB_CODE_INVALID_PARA;
  }

  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;

  int32_t type = pOffsetInfo->currentOffset.type;
  if (type != TMQ_OFFSET__LOG) {
2617
    tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type);
H
Haojun Liao 已提交
2618 2619 2620
    return TSDB_CODE_INVALID_PARA;
  }

H
Haojun Liao 已提交
2621
  if (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd) {
H
Haojun Liao 已提交
2622 2623
    tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]",
             tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd);
H
Haojun Liao 已提交
2624 2625 2626
    return TSDB_CODE_INVALID_PARA;
  }

H
Haojun Liao 已提交
2627 2628 2629
  // update the offset, and then commit to vnode
  if (pOffsetInfo->currentOffset.type == TMQ_OFFSET__LOG) {
    pOffsetInfo->currentOffset.version = offset;
2630
    pOffsetInfo->committedOffset.version = INT64_MIN;
2631
    pVg->seekUpdated = true;
H
Haojun Liao 已提交
2632 2633
  }

H
Haojun Liao 已提交
2634
  SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId};
2635
  tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic));
H
Haojun Liao 已提交
2636

H
Haojun Liao 已提交
2637
  tscDebug("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, pVg->vgId);
2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655

  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
  if (pInfo == NULL) {
    tscError("consumer:0x%"PRIx64" failed to prepare seek operation", tmq->consumerId);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  tsem_init(&pInfo->sem, 0, 0);
  pInfo->code = 0;

  asyncCommitOffset(tmq, &rspObj, TDMT_VND_TMQ_SEEK_TO_OFFSET, commitCallBackFn, pInfo);

  tsem_wait(&pInfo->sem);
  int32_t code = pInfo->code;

  tsem_destroy(&pInfo->sem);
  taosMemoryFree(pInfo);

H
Haojun Liao 已提交
2656 2657 2658 2659 2660 2661
  if (code != TSDB_CODE_SUCCESS) {
    tscError("consumer:0x%" PRIx64 " failed to send seek to vgId:%d, code:%s", tmq->consumerId, pVg->vgId,
             tstrerror(code));
  }

  return code;
2662
}