clientTmq.c 91.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "cJSON.h"
17 18 19
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22
#include "tdef.h"
#include "tglobal.h"
X
Xiaoyu Wang 已提交
23
#include "tqueue.h"
24
#include "tref.h"
L
Liu Jicong 已提交
25 26
#include "ttimer.h"

X
Xiaoyu Wang 已提交
27 28
#define EMPTY_BLOCK_POLL_IDLE_DURATION 10
#define DEFAULT_AUTO_COMMIT_INTERVAL   5000
29

30 31
#define OFFSET_IS_RESET_OFFSET(_of)  ((_of) < 0)

32 33
typedef void (*__tmq_askep_fn_t)(tmq_t* pTmq, int32_t code, SDataBuf* pBuf, void* pParam);

X
Xiaoyu Wang 已提交
34
struct SMqMgmt {
35 36 37
  int8_t  inited;
  tmr_h   timer;
  int32_t rsetId;
38
};
L
Liu Jicong 已提交
39

X
Xiaoyu Wang 已提交
40 41
static TdThreadOnce   tmqInit = PTHREAD_ONCE_INIT;  // initialize only once
volatile int32_t      tmqInitRes = 0;               // initialize rsp code
42
static struct SMqMgmt tmqMgmt = {0};
43

L
Liu Jicong 已提交
44 45 46 47 48 49
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
50 51 52
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
53 54
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
55
struct tmq_list_t {
L
Liu Jicong 已提交
56
  SArray container;
L
Liu Jicong 已提交
57
};
L
Liu Jicong 已提交
58

L
Liu Jicong 已提交
59
struct tmq_conf_t {
60 61 62 63 64 65 66 67
  char           clientId[256];
  char           groupId[TSDB_CGROUP_LEN];
  int8_t         autoCommit;
  int8_t         resetOffset;
  int8_t         withTbName;
  int8_t         snapEnable;
  int32_t        snapBatchSize;
  bool           hbBgEnable;
68 69 70 71 72
  uint16_t       port;
  int32_t        autoCommitInterval;
  char*          ip;
  char*          user;
  char*          pass;
73
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
74
  void*          commitCbUserParam;
L
Liu Jicong 已提交
75 76 77
};

struct tmq_t {
78 79 80 81 82 83 84
  int64_t        refId;
  char           groupId[TSDB_CGROUP_LEN];
  char           clientId[256];
  int8_t         withTbName;
  int8_t         useSnapshot;
  int8_t         autoCommit;
  int32_t        autoCommitInterval;
85
  int8_t         resetOffsetCfg;
86 87
  uint64_t       consumerId;
  bool           hbBgEnable;
L
Liu Jicong 已提交
88 89
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
90 91

  // status
wmmhello's avatar
wmmhello 已提交
92
  SRWLatch        lock;
L
Liu Jicong 已提交
93 94
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
95 96
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
97
  int32_t epSkipCnt;
L
Liu Jicong 已提交
98
#endif
99
  // poll info
X
Xiaoyu Wang 已提交
100 101
  int64_t pollCnt;
  int64_t totalRows;
L
Liu Jicong 已提交
102

L
Liu Jicong 已提交
103
  // timer
X
Xiaoyu Wang 已提交
104 105 106 107 108 109 110 111 112 113
  tmr_h       hbLiveTimer;
  tmr_h       epTimer;
  tmr_h       reportTimer;
  tmr_h       commitTimer;
  STscObj*    pTscObj;       // connection
  SArray*     clientTopics;  // SArray<SMqClientTopic>
  STaosQueue* mqueue;        // queue of rsp
  STaosQall*  qall;
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit
  tsem_t      rspSem;
L
Liu Jicong 已提交
114 115
};

116 117
typedef struct SAskEpInfo {
  int32_t code;
H
Haojun Liao 已提交
118
  tsem_t  sem;
119 120
} SAskEpInfo;

X
Xiaoyu Wang 已提交
121 122 123 124 125 126 127 128
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
129
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
130
  TMQ_CONSUMER_STATUS__RECOVER,
L
Liu Jicong 已提交
131 132
};

L
Liu Jicong 已提交
133
enum {
134
  TMQ_DELAYED_TASK__ASK_EP = 1,
L
Liu Jicong 已提交
135 136 137 138
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

H
Haojun Liao 已提交
139
typedef struct SVgOffsetInfo {
L
Liu Jicong 已提交
140 141
  STqOffsetVal committedOffset;
  STqOffsetVal currentOffset;
H
Haojun Liao 已提交
142 143 144 145 146 147 148 149 150 151
  int64_t      walVerBegin;
  int64_t      walVerEnd;
} SVgOffsetInfo;

typedef struct {
  int64_t       pollCnt;
  int64_t       numOfRows;
  SVgOffsetInfo offsetInfo;
  int32_t       vgId;
  int32_t       vgStatus;
H
Haojun Liao 已提交
152 153 154 155
  int32_t       vgSkipCnt;              // here used to mark the slow vgroups
  bool          receivedInfoFromVnode;  // has already received info from vnode
  int64_t       emptyBlockReceiveTs;    // once empty block is received, idle for ignoreCnt then start to poll data
  bool          seekUpdated;            // offset is updated by seek operator, therefore, not update by vnode rsp.
H
Haojun Liao 已提交
156
  SEpSet        epSet;
157 158
} SMqClientVg;

L
Liu Jicong 已提交
159
typedef struct {
160 161 162
  char           topicName[TSDB_TOPIC_FNAME_LEN];
  char           db[TSDB_DB_FNAME_LEN];
  SArray*        vgs;  // SArray<SMqClientVg>
L
Liu Jicong 已提交
163
  SSchemaWrapper schema;
164 165
} SMqClientTopic;

L
Liu Jicong 已提交
166 167
typedef struct {
  int8_t          tmqRspType;
168
  int32_t         epoch;  // epoch can be used to guard the vgHandle
169
  int32_t         vgId;
wmmhello's avatar
wmmhello 已提交
170
  char            topicName[TSDB_TOPIC_FNAME_LEN];
L
Liu Jicong 已提交
171 172
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
H
Haojun Liao 已提交
173
  uint64_t        reqId;
174
  SEpSet*         pEpset;
L
Liu Jicong 已提交
175
  union {
L
Liu Jicong 已提交
176 177
    SMqDataRsp dataRsp;
    SMqMetaRsp metaRsp;
L
Liu Jicong 已提交
178
    STaosxRsp  taosxRsp;
L
Liu Jicong 已提交
179
  };
L
Liu Jicong 已提交
180 181
} SMqPollRspWrapper;

L
Liu Jicong 已提交
182
typedef struct {
wmmhello's avatar
wmmhello 已提交
183 184
//  int64_t refId;
//  int32_t epoch;
L
Liu Jicong 已提交
185 186
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
187
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
188

L
Liu Jicong 已提交
189
typedef struct {
190 191 192 193
  int64_t          refId;
  int32_t          epoch;
  void*            pParam;
  __tmq_askep_fn_t pUserFn;
194 195
} SMqAskEpCbParam;

L
Liu Jicong 已提交
196
typedef struct {
197 198
  int64_t         refId;
  int32_t         epoch;
wmmhello's avatar
wmmhello 已提交
199 200 201
  char            topicName[TSDB_TOPIC_FNAME_LEN];
//  SMqClientVg*    pVg;
//  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
202
  int32_t         vgId;
X
Xiaoyu Wang 已提交
203
  uint64_t        requestId;  // request id for debug purpose
X
Xiaoyu Wang 已提交
204
} SMqPollCbParam;
205

206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
typedef struct SMqVgCommon {
  tsem_t        rsp;
  int32_t       numOfRsp;
  SArray*       pList;
  TdThreadMutex mutex;
  int64_t       consumerId;
  char*         pTopicName;
  int32_t       code;
} SMqVgCommon;

typedef struct SMqVgWalInfoParam {
  int32_t      vgId;
  int32_t      epoch;
  int32_t      totalReq;
  SMqVgCommon* pCommon;
} SMqVgWalInfoParam;

223
typedef struct {
224 225
  int64_t        refId;
  int32_t        epoch;
L
Liu Jicong 已提交
226 227
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
228
  int32_t        code;
229
  tmq_commit_cb* callbackFn;
L
Liu Jicong 已提交
230 231
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
X
Xiaoyu Wang 已提交
232
  void* userParam;
233 234 235 236
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
237
  SMqVgOffset*         pOffset;
H
Haojun Liao 已提交
238 239 240
  char                 topicName[TSDB_TOPIC_FNAME_LEN];
  int32_t              vgId;
  tmq_t*               pTmq;
241
} SMqCommitCbParam;
242

243 244 245 246 247
typedef struct SSyncCommitInfo {
  tsem_t  sem;
  int32_t code;
} SSyncCommitInfo;

248
static int32_t doAskEp(tmq_t* tmq);
249 250
static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg);
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet);
251
static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet,
252
                               int32_t index, int32_t totalVgroups, int32_t type);
X
Xiaoyu Wang 已提交
253 254 255
static void    commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId);
static void    asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param);
static void    addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param);
256

257
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
258
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
259 260 261 262 263
  if (conf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return conf;
  }

264
  conf->withTbName = false;
L
Liu Jicong 已提交
265
  conf->autoCommit = true;
266
  conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL;
267
  conf->resetOffset = TMQ_OFFSET__RESET_EARLIEST;
268
  conf->hbBgEnable = true;
269

270 271 272
  return conf;
}

L
Liu Jicong 已提交
273
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
274
  if (conf) {
275 276 277 278 279 280 281 282 283
    if (conf->ip) {
      taosMemoryFree(conf->ip);
    }
    if (conf->user) {
      taosMemoryFree(conf->user);
    }
    if (conf->pass) {
      taosMemoryFree(conf->pass);
    }
L
Liu Jicong 已提交
284 285
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
286 287 288
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
289
  if (strcasecmp(key, "group.id") == 0) {
L
Liu Jicong 已提交
290
    tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN);
L
Liu Jicong 已提交
291
    return TMQ_CONF_OK;
292
  }
L
Liu Jicong 已提交
293

294
  if (strcasecmp(key, "client.id") == 0) {
L
Liu Jicong 已提交
295
    tstrncpy(conf->clientId, value, 256);
L
Liu Jicong 已提交
296 297
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
298

299 300
  if (strcasecmp(key, "enable.auto.commit") == 0) {
    if (strcasecmp(value, "true") == 0) {
L
Liu Jicong 已提交
301
      conf->autoCommit = true;
L
Liu Jicong 已提交
302
      return TMQ_CONF_OK;
303
    } else if (strcasecmp(value, "false") == 0) {
L
Liu Jicong 已提交
304
      conf->autoCommit = false;
L
Liu Jicong 已提交
305 306 307 308
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
309
  }
L
Liu Jicong 已提交
310

311
  if (strcasecmp(key, "auto.commit.interval.ms") == 0) {
312
    conf->autoCommitInterval = taosStr2int64(value);
L
Liu Jicong 已提交
313 314 315
    return TMQ_CONF_OK;
  }

316 317 318
  if (strcasecmp(key, "auto.offset.reset") == 0) {
    if (strcasecmp(value, "none") == 0) {
      conf->resetOffset = TMQ_OFFSET__RESET_NONE;
L
Liu Jicong 已提交
319
      return TMQ_CONF_OK;
320
    } else if (strcasecmp(value, "earliest") == 0) {
321
      conf->resetOffset = TMQ_OFFSET__RESET_EARLIEST;
L
Liu Jicong 已提交
322
      return TMQ_CONF_OK;
323 324
    } else if (strcasecmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
L
Liu Jicong 已提交
325 326 327 328 329
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
330

331 332
  if (strcasecmp(key, "msg.with.table.name") == 0) {
    if (strcasecmp(value, "true") == 0) {
333
      conf->withTbName = true;
L
Liu Jicong 已提交
334
      return TMQ_CONF_OK;
335
    } else if (strcasecmp(value, "false") == 0) {
336
      conf->withTbName = false;
L
Liu Jicong 已提交
337
      return TMQ_CONF_OK;
338 339 340 341 342
    } else {
      return TMQ_CONF_INVALID;
    }
  }

343 344
  if (strcasecmp(key, "experimental.snapshot.enable") == 0) {
    if (strcasecmp(value, "true") == 0) {
L
Liu Jicong 已提交
345
      conf->snapEnable = true;
346
      return TMQ_CONF_OK;
347
    } else if (strcasecmp(value, "false") == 0) {
L
Liu Jicong 已提交
348
      conf->snapEnable = false;
349 350 351 352 353 354
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

355
  if (strcasecmp(key, "experimental.snapshot.batch.size") == 0) {
356
    conf->snapBatchSize = taosStr2int64(value);
L
Liu Jicong 已提交
357 358 359
    return TMQ_CONF_OK;
  }

360
  if (strcasecmp(key, "enable.heartbeat.background") == 0) {
X
Xiaoyu Wang 已提交
361 362 363 364 365 366 367 368 369 370
    //    if (strcasecmp(value, "true") == 0) {
    //      conf->hbBgEnable = true;
    //      return TMQ_CONF_OK;
    //    } else if (strcasecmp(value, "false") == 0) {
    //      conf->hbBgEnable = false;
    //      return TMQ_CONF_OK;
    //    } else {
    tscError("the default value of enable.heartbeat.background is true, can not be seted");
    return TMQ_CONF_INVALID;
    //    }
L
Liu Jicong 已提交
371 372
  }

373
  if (strcasecmp(key, "td.connect.ip") == 0) {
374
    conf->ip = taosStrdup(value);
L
Liu Jicong 已提交
375 376
    return TMQ_CONF_OK;
  }
377

378
  if (strcasecmp(key, "td.connect.user") == 0) {
379
    conf->user = taosStrdup(value);
L
Liu Jicong 已提交
380 381
    return TMQ_CONF_OK;
  }
382

383
  if (strcasecmp(key, "td.connect.pass") == 0) {
384
    conf->pass = taosStrdup(value);
L
Liu Jicong 已提交
385 386
    return TMQ_CONF_OK;
  }
387

388
  if (strcasecmp(key, "td.connect.port") == 0) {
389
    conf->port = taosStr2int64(value);
L
Liu Jicong 已提交
390 391
    return TMQ_CONF_OK;
  }
392

393
  if (strcasecmp(key, "td.connect.db") == 0) {
L
Liu Jicong 已提交
394 395 396
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
397
  return TMQ_CONF_UNKNOWN;
398 399
}

X
Xiaoyu Wang 已提交
400
tmq_list_t* tmq_list_new() { return (tmq_list_t*)taosArrayInit(0, sizeof(void*)); }
401

L
Liu Jicong 已提交
402 403
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
404
  if (src == NULL || src[0] == 0) return -1;
405
  char* topic = taosStrdup(src);
L
fix  
Liu Jicong 已提交
406
  if (taosArrayPush(container, &topic) == NULL) return -1;
407 408 409
  return 0;
}

L
Liu Jicong 已提交
410
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
411
  SArray* container = &list->container;
L
Liu Jicong 已提交
412
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
413 414
}

L
Liu Jicong 已提交
415 416 417 418 419 420 421 422 423 424
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

X
Xiaoyu Wang 已提交
425 426
static SMqClientVg* foundClientVg(SArray* pTopicList, const char* pName, int32_t vgId, int32_t* index,
                                  int32_t* numOfVgroups) {
427 428 429 430
  int32_t numOfTopics = taosArrayGetSize(pTopicList);
  *index = -1;
  *numOfVgroups = 0;

X
Xiaoyu Wang 已提交
431
  for (int32_t i = 0; i < numOfTopics; ++i) {
432 433
    SMqClientTopic* pTopic = taosArrayGet(pTopicList, i);
    if (strcmp(pTopic->topicName, pName) != 0) {
434 435 436
      continue;
    }

437 438
    *numOfVgroups = taosArrayGetSize(pTopic->vgs);
    for (int32_t j = 0; j < (*numOfVgroups); ++j) {
439
      SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
440 441 442
      if (pClientVg->vgId == vgId) {
        *index = j;
        return pClientVg;
443 444
      }
    }
L
Liu Jicong 已提交
445
  }
446 447

  return NULL;
L
Liu Jicong 已提交
448
}
449

450 451 452
// Two problems do not need to be addressed here
// 1. update to of epset. the response of poll request will automatically handle this problem
// 2. commit failure. This one needs to be resolved.
H
Haojun Liao 已提交
453
static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
454
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
455
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
H
Haojun Liao 已提交
456

X
Xiaoyu Wang 已提交
457 458 459 460 461 462 463 464 465 466 467 468 469 470
  //  if (code != TSDB_CODE_SUCCESS) { // if commit offset failed, let's try again
  //    taosThreadMutexLock(&pParam->pTmq->lock);
  //    int32_t numOfVgroups, index;
  //    SMqClientVg* pVg = foundClientVg(pParam->pTmq->clientTopics, pParam->topicName, pParam->vgId, &index,
  //    &numOfVgroups); if (pVg == NULL) {
  //      tscDebug("consumer:0x%" PRIx64
  //               " subKey:%s vgId:%d commit failed, code:%s has been transferred to other consumer, no need retry
  //               ordinal:%d/%d", pParam->pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, tstrerror(code),
  //               index + 1, numOfVgroups);
  //    } else { // let's retry the commit
  //      int32_t code1 = doSendCommitMsg(pParam->pTmq, pVg, pParam->topicName, pParamSet, index, numOfVgroups);
  //      if (code1 != TSDB_CODE_SUCCESS) {  // retry failed.
  //        tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64
  //                 " retry failed, ignore this commit. code:%s ordinal:%d/%d",
H
Haojun Liao 已提交
471
  //                 pParam->pTmq->consumerId, pParam->topicName, pVg->vgId, pVg->offsetInfo.committedOffset.version,
X
Xiaoyu Wang 已提交
472 473 474 475 476 477 478 479 480 481 482 483 484 485 486
  //                 tstrerror(terrno), index + 1, numOfVgroups);
  //      }
  //    }
  //
  //    taosThreadMutexUnlock(&pParam->pTmq->lock);
  //
  //    taosMemoryFree(pParam->pOffset);
  //    taosMemoryFree(pBuf->pData);
  //    taosMemoryFree(pBuf->pEpSet);
  //
  //    commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
  //    return 0;
  //  }
  //
  //  // todo replace the pTmq with refId
487

L
Liu Jicong 已提交
488
  taosMemoryFree(pParam->pOffset);
L
Liu Jicong 已提交
489
  taosMemoryFree(pBuf->pData);
dengyihao's avatar
dengyihao 已提交
490
  taosMemoryFree(pBuf->pEpSet);
L
Liu Jicong 已提交
491

492
  commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
493 494 495
  return 0;
}

496
static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet,
497
                               int32_t index, int32_t totalVgroups, int32_t type) {
498
  SMqVgOffset* pOffset = taosMemoryCalloc(1, sizeof(SMqVgOffset));
L
Liu Jicong 已提交
499
  if (pOffset == NULL) {
500
    return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
501
  }
502

503 504
  pOffset->consumerId = tmq->consumerId;
  pOffset->offset.val = pVg->offsetInfo.currentOffset;
505

L
Liu Jicong 已提交
506
  int32_t groupLen = strlen(tmq->groupId);
507 508 509
  memcpy(pOffset->offset.subKey, tmq->groupId, groupLen);
  pOffset->offset.subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pOffset->offset.subKey + groupLen + 1, pTopicName);
L
Liu Jicong 已提交
510

511 512
  int32_t len = 0;
  int32_t code = 0;
513
  tEncodeSize(tEncodeMqVgOffset, pOffset, len, code);
L
Liu Jicong 已提交
514
  if (code < 0) {
515
    return TSDB_CODE_INVALID_PARA;
L
Liu Jicong 已提交
516
  }
517

L
Liu Jicong 已提交
518
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
L
Liu Jicong 已提交
519 520
  if (buf == NULL) {
    taosMemoryFree(pOffset);
521
    return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
522
  }
523

L
Liu Jicong 已提交
524
  ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
525

L
Liu Jicong 已提交
526 527 528 529
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
530
  tEncodeMqVgOffset(&encoder, pOffset);
L
Liu Jicong 已提交
531
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
532 533

  // build param
534
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
535
  if (pParam == NULL) {
L
Liu Jicong 已提交
536
    taosMemoryFree(pOffset);
L
Liu Jicong 已提交
537
    taosMemoryFree(buf);
538
    return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
539
  }
540

L
Liu Jicong 已提交
541 542
  pParam->params = pParamSet;
  pParam->pOffset = pOffset;
H
Haojun Liao 已提交
543 544 545
  pParam->vgId = pVg->vgId;
  pParam->pTmq = tmq;

H
Haojun Liao 已提交
546
  tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName));
L
Liu Jicong 已提交
547 548 549 550

  // build send info
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
L
Liu Jicong 已提交
551
    taosMemoryFree(pOffset);
L
Liu Jicong 已提交
552 553
    taosMemoryFree(buf);
    taosMemoryFree(pParam);
554
    return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
555
  }
556

557
  pMsgSendInfo->msgInfo = (SDataBuf) { .pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL };
L
Liu Jicong 已提交
558 559 560 561

  pMsgSendInfo->requestId = generateRequestId();
  pMsgSendInfo->requestObjRefId = 0;
  pMsgSendInfo->param = pParam;
L
Liu Jicong 已提交
562
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
563
  pMsgSendInfo->fp = tmqCommitCb;
564
  pMsgSendInfo->msgType = type;
L
Liu Jicong 已提交
565

L
Liu Jicong 已提交
566 567 568
  atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
  atomic_add_fetch_32(&pParamSet->totalRspNum, 1);

H
Haojun Liao 已提交
569
  SEp* pEp = GET_ACTIVE_EP(&pVg->epSet);
570
  char offsetBuf[TSDB_OFFSET_LEN] = {0};
571
  tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffset->offset.val);
572

573
  char commitBuf[TSDB_OFFSET_LEN] = {0};
H
Haojun Liao 已提交
574
  tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
575
  tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%s prev:%s, ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64,
576
           tmq->consumerId, pOffset->offset.subKey, pVg->vgId, offsetBuf, commitBuf, pEp->fqdn, pEp->port, index + 1,
577
           totalVgroups, pMsgSendInfo->requestId);
H
Haojun Liao 已提交
578

L
Liu Jicong 已提交
579 580
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
581 582

  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
583 584
}

H
Haojun Liao 已提交
585 586 587 588 589 590 591 592 593 594 595
static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) {
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < numOfTopics; ++i) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(pTopic->topicName, pTopicName) != 0) {
      continue;
    }

    return pTopic;
  }

H
Haojun Liao 已提交
596
  tscError("consumer:0x%" PRIx64 ", total:%d, failed to find topic:%s", tmq->consumerId, numOfTopics, pTopicName);
H
Haojun Liao 已提交
597 598 599
  return NULL;
}

600
static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tmq_commit_cb* pCommitFp, void* userParam) {
601 602 603 604 605 606 607 608 609 610 611 612
  char*   pTopicName = NULL;
  int32_t vgId = 0;
  int32_t code = 0;

  if (pRes == NULL || tmq == NULL) {
    pCommitFp(tmq, TSDB_CODE_INVALID_PARA, userParam);
    return;
  }

  if (TD_RES_TMQ(pRes)) {
    SMqRspObj* pRspObj = (SMqRspObj*)pRes;
    pTopicName = pRspObj->topic;
L
Liu Jicong 已提交
613
    vgId = pRspObj->vgId;
614 615 616
  } else if (TD_RES_TMQ_META(pRes)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)pRes;
    pTopicName = pMetaRspObj->topic;
L
Liu Jicong 已提交
617
    vgId = pMetaRspObj->vgId;
618 619 620
  } else if (TD_RES_TMQ_METADATA(pRes)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)pRes;
    pTopicName = pRspObj->topic;
621
    vgId = pRspObj->vgId;
L
Liu Jicong 已提交
622
  } else {
623 624
    pCommitFp(tmq, TSDB_CODE_TMQ_INVALID_MSG, userParam);
    return;
L
Liu Jicong 已提交
625 626 627 628
  }

  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
629 630
    pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam);
    return;
L
Liu Jicong 已提交
631
  }
H
Haojun Liao 已提交
632

633 634
  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;
635
  pParamSet->callbackFn = pCommitFp;
L
Liu Jicong 已提交
636
  pParamSet->userParam = userParam;
L
Liu Jicong 已提交
637

H
Haojun Liao 已提交
638 639
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);

640 641
  tscDebug("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId);

H
Haojun Liao 已提交
642 643
  SMqClientTopic* pTopic = getTopicByName(tmq, pTopicName);
  if (pTopic == NULL) {
X
Xiaoyu Wang 已提交
644 645
    tscWarn("consumer:0x%" PRIx64 " failed to find the specified topic:%s, total topics:%d", tmq->consumerId,
            pTopicName, numOfTopics);
646 647 648 649
    taosMemoryFree(pParamSet);
    pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
    return;
  }
L
Liu Jicong 已提交
650

651 652 653 654 655 656
  int32_t j = 0;
  int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
  for (j = 0; j < numOfVgroups; j++) {
    SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
    if (pVg->vgId == vgId) {
      break;
L
Liu Jicong 已提交
657
    }
L
Liu Jicong 已提交
658
  }
L
Liu Jicong 已提交
659

660
  if (j == numOfVgroups) {
X
Xiaoyu Wang 已提交
661 662
    tscWarn("consumer:0x%" PRIx64 " failed to find the specified vgId:%d, total Vgs:%d, topic:%s", tmq->consumerId,
            vgId, numOfVgroups, pTopicName);
L
Liu Jicong 已提交
663
    taosMemoryFree(pParamSet);
664 665
    pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
    return;
L
Liu Jicong 已提交
666 667
  }

668
  SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
H
Haojun Liao 已提交
669
  if (pVg->offsetInfo.currentOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.currentOffset, &pVg->offsetInfo.committedOffset)) {
670
    code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups, type);
L
Liu Jicong 已提交
671

672 673 674 675 676
    // failed to commit, callback user function directly.
    if (code != TSDB_CODE_SUCCESS) {
      taosMemoryFree(pParamSet);
      pCommitFp(tmq, code, userParam);
    }
X
Xiaoyu Wang 已提交
677
  } else {  // do not perform commit, callback user function directly.
678 679
    taosMemoryFree(pParamSet);
    pCommitFp(tmq, code, userParam);
L
Liu Jicong 已提交
680 681 682
  }
}

683
static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) {
684 685
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
686 687
    pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam);
    return;
688
  }
689 690 691

  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;
692
  pParamSet->callbackFn = pCommitFp;
693 694
  pParamSet->userParam = userParam;

695 696 697
  // init as 1 to prevent concurrency issue
  pParamSet->waitingRspNum = 1;

698
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
X
Xiaoyu Wang 已提交
699
  tscDebug("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
700 701

  for (int32_t i = 0; i < numOfTopics; i++) {
702
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
703
    int32_t         numOfVgroups = taosArrayGetSize(pTopic->vgs);
L
Liu Jicong 已提交
704

705 706
    tscDebug("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName,
             numOfVgroups);
707
    for (int32_t j = 0; j < numOfVgroups; j++) {
708 709
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);

H
Haojun Liao 已提交
710
      if (pVg->offsetInfo.currentOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.currentOffset, &pVg->offsetInfo.committedOffset)) {
711
        int32_t code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups, TDMT_VND_TMQ_COMMIT_OFFSET);
712 713
        if (code != TSDB_CODE_SUCCESS) {
          tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64 " failed, code:%s ordinal:%d/%d",
H
Haojun Liao 已提交
714
                   tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.committedOffset.version, tstrerror(terrno),
715
                   j + 1, numOfVgroups);
L
Liu Jicong 已提交
716 717
          continue;
        }
H
Haojun Liao 已提交
718 719

        // update the offset value.
H
Haojun Liao 已提交
720
        pVg->offsetInfo.committedOffset = pVg->offsetInfo.currentOffset;
721
      } else {
D
dapan1121 已提交
722
        tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d",
H
Haojun Liao 已提交
723
                 tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.currentOffset.version, j + 1, numOfVgroups);
724 725 726 727
      }
    }
  }

H
Haojun Liao 已提交
728
  tscDebug("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1,
729
           numOfTopics);
H
Haojun Liao 已提交
730

L
Liu Jicong 已提交
731
  // no request is sent
L
Liu Jicong 已提交
732 733
  if (pParamSet->totalRspNum == 0) {
    taosMemoryFree(pParamSet);
734 735
    pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
    return;
L
Liu Jicong 已提交
736 737
  }

L
Liu Jicong 已提交
738
  // count down since waiting rsp num init as 1
739
  commitRspCountDown(pParamSet, tmq->consumerId, "", 0);
740 741
}

742 743
static void generateTimedTask(int64_t refId, int32_t type) {
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
744
  if (tmq != NULL) {
S
Shengliang Guan 已提交
745
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
746
    *pTaskType = type;
747 748
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
wmmhello's avatar
wmmhello 已提交
749
    taosReleaseRef(tmqMgmt.rsetId, refId);
750
  }
751 752 753 754 755
}

void tmqAssignAskEpTask(void* param, void* tmrId) {
  int64_t refId = *(int64_t*)param;
  generateTimedTask(refId, TMQ_DELAYED_TASK__ASK_EP);
756
  taosMemoryFree(param);
L
Liu Jicong 已提交
757 758 759
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
760
  int64_t refId = *(int64_t*)param;
761
  generateTimedTask(refId, TMQ_DELAYED_TASK__COMMIT);
762
  taosMemoryFree(param);
L
Liu Jicong 已提交
763 764 765
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
766 767 768
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
S
Shengliang Guan 已提交
769
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
770 771 772 773
    *pTaskType = TMQ_DELAYED_TASK__REPORT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
774 775

  taosReleaseRef(tmqMgmt.rsetId, refId);
776
  taosMemoryFree(param);
L
Liu Jicong 已提交
777 778
}

779
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
dengyihao's avatar
dengyihao 已提交
780 781 782 783
  if (pMsg) {
    taosMemoryFree(pMsg->pData);
    taosMemoryFree(pMsg->pEpSet);
  }
784 785 786 787
  return 0;
}

void tmqSendHbReq(void* param, void* tmrId) {
788
  int64_t refId = *(int64_t*)param;
789

X
Xiaoyu Wang 已提交
790
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
791
  if (tmq == NULL) {
L
Liu Jicong 已提交
792
    taosMemoryFree(param);
793 794
    return;
  }
D
dapan1121 已提交
795 796 797 798

  SMqHbReq req = {0};
  req.consumerId = tmq->consumerId;
  req.epoch = tmq->epoch;
799 800 801 802 803 804 805 806 807 808 809 810 811 812 813
  req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows));
  for(int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++){
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    int32_t         numOfVgroups = taosArrayGetSize(pTopic->vgs);
    TopicOffsetRows* data = taosArrayReserve(req.topics, 1);
    strcpy(data->topicName, pTopic->topicName);
    data->offsetRows = taosArrayInit(numOfVgroups, sizeof(OffsetRows));
    for(int j = 0; j < numOfVgroups; j++){
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1);
      offRows->vgId = pVg->vgId;
      offRows->rows = pVg->numOfRows;
      offRows->offset = pVg->offsetInfo.committedOffset;
    }
  }
D
dapan1121 已提交
814

L
Liu Jicong 已提交
815
  int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
D
dapan1121 已提交
816 817
  if (tlen < 0) {
    tscError("tSerializeSMqHbReq failed");
818
    goto OVER;
D
dapan1121 已提交
819
  }
820

L
Liu Jicong 已提交
821
  void* pReq = taosMemoryCalloc(1, tlen);
D
dapan1121 已提交
822 823
  if (tlen < 0) {
    tscError("failed to malloc MqHbReq msg, size:%d", tlen);
824
    goto OVER;
D
dapan1121 已提交
825
  }
826

D
dapan1121 已提交
827 828 829
  if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) {
    tscError("tSerializeSMqHbReq %d failed", tlen);
    taosMemoryFree(pReq);
830
    goto OVER;
D
dapan1121 已提交
831
  }
832 833 834 835

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
836
    goto OVER;
837
  }
838

839
  sendInfo->msgInfo = (SDataBuf){ .pData = pReq, .len = tlen, .handle = NULL };
840 841 842 843 844

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = NULL;
  sendInfo->fp = tmqHbCb;
L
Liu Jicong 已提交
845
  sendInfo->msgType = TDMT_MND_TMQ_HB;
846 847 848 849 850 851 852

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

OVER:
853
  tDeatroySMqHbReq(&req);
854
  taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer);
855
  taosReleaseRef(tmqMgmt.rsetId, refId);
856 857
}

858 859
static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) {
  if (code != 0) {
X
Xiaoyu Wang 已提交
860
    tscDebug("consumer:0x%" PRIx64 ", failed to commit offset, code:%s", pTmq->consumerId, tstrerror(code));
861 862 863
  }
}

864
int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
L
Liu Jicong 已提交
865
  STaosQall* qall = taosAllocateQall();
866
  taosReadAllQitems(pTmq->delayedTask, qall);
L
Liu Jicong 已提交
867

868 869 870 871
  if (qall->numOfItems == 0) {
    taosFreeQall(qall);
    return TSDB_CODE_SUCCESS;
  }
872

X
Xiaoyu Wang 已提交
873
  tscDebug("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, qall->numOfItems);
874 875
  int8_t* pTaskType = NULL;
  taosGetQitem(qall, (void**)&pTaskType);
L
Liu Jicong 已提交
876

877
  while (pTaskType != NULL) {
878
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
879
      asyncAskEp(pTmq, addToQueueCallbackFn, NULL);
880 881

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
882
      *pRefId = pTmq->refId;
883

X
Xiaoyu Wang 已提交
884
      tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
885
      taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer);
L
Liu Jicong 已提交
886
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
X
Xiaoyu Wang 已提交
887
      tmq_commit_cb* pCallbackFn = pTmq->commitCb ? pTmq->commitCb : defaultCommitCbFn;
888 889

      asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam);
890
      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
891
      *pRefId = pTmq->refId;
892

893
      tscDebug("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId,
X
Xiaoyu Wang 已提交
894
               pTmq->autoCommitInterval / 1000.0);
895
      taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer);
L
Liu Jicong 已提交
896 897
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    }
898

L
Liu Jicong 已提交
899
    taosFreeQitem(pTaskType);
900
    taosGetQitem(qall, (void**)&pTaskType);
L
Liu Jicong 已提交
901
  }
902

L
Liu Jicong 已提交
903 904 905 906
  taosFreeQall(qall);
  return 0;
}

907
static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
L
Liu Jicong 已提交
908 909 910 911 912 913 914
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
    // do nothing
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
    SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
    tDeleteSMqAskEpRsp(&pEpRspWrapper->msg);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
915 916
    taosMemoryFreeClear(pRsp->pEpset);

L
Liu Jicong 已提交
917 918 919
    taosArrayDestroyP(pRsp->dataRsp.blockData, taosMemoryFree);
    taosArrayDestroy(pRsp->dataRsp.blockDataLen);
    taosArrayDestroyP(pRsp->dataRsp.blockTbName, taosMemoryFree);
920
    taosArrayDestroyP(pRsp->dataRsp.blockSchema, (FDelete)tDeleteSchemaWrapper);
L
Liu Jicong 已提交
921 922
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
923 924
    taosMemoryFreeClear(pRsp->pEpset);

L
Liu Jicong 已提交
925 926 927
    taosMemoryFree(pRsp->metaRsp.metaRsp);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
928 929
    taosMemoryFreeClear(pRsp->pEpset);

L
Liu Jicong 已提交
930 931 932
    taosArrayDestroyP(pRsp->taosxRsp.blockData, taosMemoryFree);
    taosArrayDestroy(pRsp->taosxRsp.blockDataLen);
    taosArrayDestroyP(pRsp->taosxRsp.blockTbName, taosMemoryFree);
933
    taosArrayDestroyP(pRsp->taosxRsp.blockSchema, (FDelete)tDeleteSchemaWrapper);
L
Liu Jicong 已提交
934 935 936 937
    // taosx
    taosArrayDestroy(pRsp->taosxRsp.createTableLen);
    taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree);
  }
938 939

  return NULL;
L
Liu Jicong 已提交
940 941
}

L
Liu Jicong 已提交
942
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
943
  SMqRspWrapper* rspWrapper = NULL;
L
Liu Jicong 已提交
944
  while (1) {
L
Liu Jicong 已提交
945 946 947 948 949
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper) {
      tmqFreeRspWrapper(rspWrapper);
      taosFreeQitem(rspWrapper);
    } else {
L
Liu Jicong 已提交
950
      break;
L
Liu Jicong 已提交
951
    }
L
Liu Jicong 已提交
952 953
  }

L
Liu Jicong 已提交
954
  rspWrapper = NULL;
L
Liu Jicong 已提交
955 956
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
L
Liu Jicong 已提交
957 958 959 960 961
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper) {
      tmqFreeRspWrapper(rspWrapper);
      taosFreeQitem(rspWrapper);
    } else {
L
Liu Jicong 已提交
962
      break;
L
Liu Jicong 已提交
963
    }
L
Liu Jicong 已提交
964 965 966
  }
}

D
dapan1121 已提交
967
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
968 969
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
dengyihao's avatar
dengyihao 已提交
970 971

  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
972 973 974
  tsem_post(&pParam->rspSem);
  return 0;
}
975

L
Liu Jicong 已提交
976
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
X
Xiaoyu Wang 已提交
977 978 979 980
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
981
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
982
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
983
  }
L
Liu Jicong 已提交
984
  return 0;
X
Xiaoyu Wang 已提交
985 986
}

L
Liu Jicong 已提交
987
int32_t tmq_unsubscribe(tmq_t* tmq) {
988 989 990 991 992 993 994 995
  if (tmq->autoCommit) {
    int32_t rsp = tmq_commit_sync(tmq, NULL);
    if (rsp != 0) {
      return rsp;
    }
  }
  taosSsleep(2);  // sleep 2s for hb to send offset and rows to server

L
Liu Jicong 已提交
996 997
  int32_t     rsp;
  int32_t     retryCnt = 0;
L
Liu Jicong 已提交
998
  tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
999 1000 1001 1002 1003 1004 1005 1006 1007 1008
  while (1) {
    rsp = tmq_subscribe(tmq, lst);
    if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
      break;
    } else {
      retryCnt++;
      taosMsleep(500);
    }
  }

L
Liu Jicong 已提交
1009 1010
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
1011 1012
}

1013 1014 1015 1016 1017 1018
static void freeClientVgImpl(void* param) {
  SMqClientTopic* pTopic = param;
  taosMemoryFreeClear(pTopic->schema.pSchema);
  taosArrayDestroy(pTopic->vgs);
}

1019
void tmqFreeImpl(void* handle) {
1020 1021
  tmq_t*  tmq = (tmq_t*)handle;
  int64_t id = tmq->consumerId;
L
Liu Jicong 已提交
1022

1023
  // TODO stop timer
L
Liu Jicong 已提交
1024 1025 1026 1027
  if (tmq->mqueue) {
    tmqClearUnhandleMsg(tmq);
    taosCloseQueue(tmq->mqueue);
  }
L
Liu Jicong 已提交
1028

H
Haojun Liao 已提交
1029 1030 1031 1032 1033
  if (tmq->delayedTask) {
    taosCloseQueue(tmq->delayedTask);
  }

  taosFreeQall(tmq->qall);
1034
  tsem_destroy(&tmq->rspSem);
L
Liu Jicong 已提交
1035

1036
  taosArrayDestroyEx(tmq->clientTopics, freeClientVgImpl);
1037 1038
  taos_close_internal(tmq->pTscObj);
  taosMemoryFree(tmq);
1039 1040

  tscDebug("consumer:0x%" PRIx64 " closed", id);
L
Liu Jicong 已提交
1041 1042
}

1043 1044 1045 1046 1047 1048 1049 1050 1051
static void tmqMgmtInit(void) {
  tmqInitRes = 0;
  tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");

  if (tmqMgmt.timer == NULL) {
    tmqInitRes = TSDB_CODE_OUT_OF_MEMORY;
  }

  tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
1052
  if (tmqMgmt.rsetId < 0) {
1053 1054 1055 1056
    tmqInitRes = terrno;
  }
}

L
Liu Jicong 已提交
1057
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
1058 1059 1060 1061
  taosThreadOnce(&tmqInit, tmqMgmtInit);
  if (tmqInitRes != 0) {
    terrno = tmqInitRes;
    return NULL;
L
Liu Jicong 已提交
1062 1063
  }

L
Liu Jicong 已提交
1064 1065
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
L
Liu Jicong 已提交
1066
    terrno = TSDB_CODE_OUT_OF_MEMORY;
1067
    tscError("failed to create consumer, groupId:%s, code:%s", conf->groupId, terrstr());
L
Liu Jicong 已提交
1068 1069
    return NULL;
  }
L
Liu Jicong 已提交
1070

L
Liu Jicong 已提交
1071 1072 1073
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

L
Liu Jicong 已提交
1074 1075 1076
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->delayedTask = taosOpenQueue();
H
Haojun Liao 已提交
1077
  pTmq->qall = taosAllocateQall();
L
Liu Jicong 已提交
1078

X
Xiaoyu Wang 已提交
1079 1080
  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL ||
      conf->groupId[0] == 0) {
L
Liu Jicong 已提交
1081
    terrno = TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
1082
    tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
1083
    goto _failed;
L
Liu Jicong 已提交
1084
  }
L
Liu Jicong 已提交
1085

L
Liu Jicong 已提交
1086 1087
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
1088 1089
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
1090

L
Liu Jicong 已提交
1091 1092 1093
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
1094
  pTmq->withTbName = conf->withTbName;
L
Liu Jicong 已提交
1095
  pTmq->useSnapshot = conf->snapEnable;
L
Liu Jicong 已提交
1096
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
1097
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
1098 1099
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
1100
  pTmq->resetOffsetCfg = conf->resetOffset;
wmmhello's avatar
wmmhello 已提交
1101
  taosInitRWLatch(&pTmq->lock);
L
Liu Jicong 已提交
1102

1103 1104
  pTmq->hbBgEnable = conf->hbBgEnable;

L
Liu Jicong 已提交
1105
  // assign consumerId
L
Liu Jicong 已提交
1106
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
1107

L
Liu Jicong 已提交
1108 1109
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
1110
    tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
S
Shengliang Guan 已提交
1111
             pTmq->groupId);
1112
    goto _failed;
L
Liu Jicong 已提交
1113
  }
L
Liu Jicong 已提交
1114

L
Liu Jicong 已提交
1115 1116 1117
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
1118
    tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
L
Liu Jicong 已提交
1119
    tsem_destroy(&pTmq->rspSem);
1120
    goto _failed;
L
Liu Jicong 已提交
1121
  }
L
Liu Jicong 已提交
1122

1123 1124
  pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
  if (pTmq->refId < 0) {
1125
    goto _failed;
1126 1127
  }

1128
  if (pTmq->hbBgEnable) {
L
Liu Jicong 已提交
1129 1130
    int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
    *pRefId = pTmq->refId;
1131
    pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
1132 1133
  }

1134
  char         buf[TSDB_OFFSET_LEN] = {0};
1135 1136
  STqOffsetVal offset = {.type = pTmq->resetOffsetCfg};
  tFormatOffset(buf, tListLen(buf), &offset);
X
Xiaoyu Wang 已提交
1137 1138 1139 1140
  tscInfo("consumer:0x%" PRIx64 " is setup, refId:%" PRId64
          ", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s, backgroudHB:%d",
          pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval,
          buf, pTmq->hbBgEnable);
L
Liu Jicong 已提交
1141

1142
  return pTmq;
1143

1144 1145
_failed:
  tmqFreeImpl(pTmq);
L
Liu Jicong 已提交
1146
  return NULL;
1147 1148
}

L
Liu Jicong 已提交
1149
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
1150
  const int32_t   MAX_RETRY_COUNT = 120 * 2;  // let's wait for 2 mins at most
L
Liu Jicong 已提交
1151 1152 1153
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
L
Liu Jicong 已提交
1154
  SMsgSendInfo*   sendInfo = NULL;
L
Liu Jicong 已提交
1155
  SCMSubscribeReq req = {0};
1156
  int32_t         code = 0;
1157

1158
  tscDebug("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz);
L
Liu Jicong 已提交
1159

1160
  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
1161
  tstrncpy(req.clientId, tmq->clientId, 256);
L
Liu Jicong 已提交
1162
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
1163 1164
  req.topicNames = taosArrayInit(sz, sizeof(void*));

1165 1166 1167 1168
  if (req.topicNames == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
1169

1170 1171 1172 1173 1174 1175
  req.withTbName = tmq->withTbName;
  req.useSnapshot = tmq->useSnapshot;
  req.autoCommit = tmq->autoCommit;
  req.autoCommitInterval = tmq->autoCommitInterval;
  req.resetOffsetCfg = tmq->resetOffsetCfg;

L
Liu Jicong 已提交
1176 1177
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
1178 1179

    SName name = {0};
L
Liu Jicong 已提交
1180 1181 1182 1183
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
1184 1185
    }

1186
    tNameExtractFullName(&name, topicFName);
X
Xiaoyu Wang 已提交
1187
    tscDebug("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName);
L
Liu Jicong 已提交
1188 1189

    taosArrayPush(req.topicNames, &topicFName);
1190 1191
  }

L
Liu Jicong 已提交
1192
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
1193

L
Liu Jicong 已提交
1194
  buf = taosMemoryMalloc(tlen);
1195 1196 1197 1198
  if (buf == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
L
Liu Jicong 已提交
1199

1200 1201 1202
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

L
Liu Jicong 已提交
1203
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
1204 1205 1206 1207
  if (sendInfo == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
1208

H
Haojun Liao 已提交
1209
  SMqSubscribeCbParam param = { .rspErr = 0};
1210
  if (tsem_init(&param.rspSem, 0, 0) != 0) {
wmmhello's avatar
wmmhello 已提交
1211
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
1212 1213
    goto FAIL;
  }
L
Liu Jicong 已提交
1214

1215
  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
1216

L
Liu Jicong 已提交
1217 1218
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1219 1220
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
1221
  sendInfo->msgType = TDMT_MND_TMQ_SUBSCRIBE;
L
Liu Jicong 已提交
1222

1223 1224 1225 1226 1227
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
1228 1229
  // avoid double free if msg is sent
  buf = NULL;
L
Liu Jicong 已提交
1230
  sendInfo = NULL;
L
Liu Jicong 已提交
1231

L
Liu Jicong 已提交
1232 1233
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
1234

1235 1236 1237 1238
  if (param.rspErr != 0) {
    code = param.rspErr;
    goto FAIL;
  }
L
Liu Jicong 已提交
1239

L
Liu Jicong 已提交
1240
  int32_t retryCnt = 0;
1241
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) {
1242
    if (retryCnt++ > MAX_RETRY_COUNT) {
wmmhello's avatar
wmmhello 已提交
1243
      tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
1244
      code = TSDB_CODE_MND_CONSUMER_NOT_READY;
L
Liu Jicong 已提交
1245 1246
      goto FAIL;
    }
1247

X
Xiaoyu Wang 已提交
1248
    tscDebug("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
L
Liu Jicong 已提交
1249 1250
    taosMsleep(500);
  }
1251

1252 1253
  // init ep timer
  if (tmq->epTimer == NULL) {
1254 1255 1256
    int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId1 = tmq->refId;
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, pRefId1, tmqMgmt.timer);
1257
  }
L
Liu Jicong 已提交
1258 1259

  // init auto commit timer
1260
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
1261 1262 1263
    int64_t* pRefId2 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId2 = tmq->refId;
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId2, tmqMgmt.timer);
L
Liu Jicong 已提交
1264 1265
  }

L
Liu Jicong 已提交
1266
FAIL:
L
Liu Jicong 已提交
1267
  taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
1268
  taosMemoryFree(buf);
L
Liu Jicong 已提交
1269
  taosMemoryFree(sendInfo);
L
Liu Jicong 已提交
1270

L
Liu Jicong 已提交
1271
  return code;
1272 1273
}

L
Liu Jicong 已提交
1274
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
1275
  conf->commitCb = cb;
L
Liu Jicong 已提交
1276
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
1277
}
1278

wmmhello's avatar
wmmhello 已提交
1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306
static SMqClientVg* getVgInfo(tmq_t* tmq, char* topicName, int32_t  vgId){
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
  for(int i = 0; i < topicNumCur; i++){
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if(strcmp(pTopicCur->topicName, topicName) == 0){
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
        if(pVgCur->vgId == vgId){
          return pVgCur;
        }
      }
    }
  }
  return NULL;
}

static SMqClientTopic* getTopicInfo(tmq_t* tmq, char* topicName){
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
  for(int i = 0; i < topicNumCur; i++){
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if(strcmp(pTopicCur->topicName, topicName) == 0){
      return pTopicCur;
    }
  }
  return NULL;
}

D
dapan1121 已提交
1307
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1308
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
1309 1310

  int64_t         refId = pParam->refId;
wmmhello's avatar
wmmhello 已提交
1311 1312
//  SMqClientVg*    pVg = pParam->pVg;
//  SMqClientTopic* pTopic = pParam->pTopic;
1313

1314
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
1315 1316 1317
  if (tmq == NULL) {
    taosMemoryFree(pParam);
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1318
    taosMemoryFree(pMsg->pEpSet);
1319 1320 1321 1322
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

H
Haojun Liao 已提交
1323 1324 1325 1326
  int32_t  epoch = pParam->epoch;
  int32_t  vgId = pParam->vgId;
  uint64_t requestId = pParam->requestId;

L
Liu Jicong 已提交
1327
  if (code != 0) {
L
Liu Jicong 已提交
1328
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1329 1330
    if (pMsg->pEpSet) taosMemoryFree(pMsg->pEpSet);

H
Haojun Liao 已提交
1331
    // in case of consumer mismatch, wait for 500ms and retry
L
Liu Jicong 已提交
1332
    if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
1333
//      taosMsleep(500);
L
Liu Jicong 已提交
1334
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
X
Xiaoyu Wang 已提交
1335 1336
      tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, wait for 500ms and set status to be RECOVER",
               tmq->consumerId);
H
Haojun Liao 已提交
1337
    } else if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
S
Shengliang Guan 已提交
1338
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1339
      if (pRspWrapper == NULL) {
H
Haojun Liao 已提交
1340 1341
        tscWarn("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d since out of memory, reqId:0x%" PRIx64,
                tmq->consumerId, vgId, epoch, requestId);
L
Liu Jicong 已提交
1342 1343
        goto CREATE_MSG_FAIL;
      }
H
Haojun Liao 已提交
1344

L
Liu Jicong 已提交
1345 1346
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      taosWriteQitem(tmq->mqueue, pRspWrapper);
1347 1348
//    } else if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) {  // poll data while insert
//      taosMsleep(5);
wmmhello's avatar
wmmhello 已提交
1349 1350 1351
    } else{
      tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d, since %s, reqId:0x%" PRIx64, tmq->consumerId,
               vgId, epoch, tstrerror(code), requestId);
L
Liu Jicong 已提交
1352
    }
H
Haojun Liao 已提交
1353

L
fix txn  
Liu Jicong 已提交
1354
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1355 1356
  }

X
Xiaoyu Wang 已提交
1357
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
1358 1359
  int32_t clientEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < clientEpoch) {
L
Liu Jicong 已提交
1360
    // do not write into queue since updating epoch reset
X
Xiaoyu Wang 已提交
1361 1362
    tscWarn("consumer:0x%" PRIx64
            " msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64,
1363
            tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
H
Haojun Liao 已提交
1364

1365
    tsem_post(&tmq->rspSem);
1366 1367
    taosReleaseRef(tmqMgmt.rsetId, refId);

L
Liu Jicong 已提交
1368
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1369
    taosMemoryFree(pMsg->pEpSet);
wmmhello's avatar
wmmhello 已提交
1370 1371
    taosMemoryFree(pParam);

X
Xiaoyu Wang 已提交
1372 1373 1374
    return 0;
  }

1375
  if (msgEpoch != clientEpoch) {
H
Haojun Liao 已提交
1376
    tscWarn("consumer:0x%" PRIx64 " mismatch rsp from vgId:%d, epoch %d, current epoch %d, reqId:0x%" PRIx64,
1377
            tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
X
Xiaoyu Wang 已提交
1378 1379
  }

L
Liu Jicong 已提交
1380 1381 1382
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

S
Shengliang Guan 已提交
1383
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1384
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1385
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1386
    taosMemoryFree(pMsg->pEpSet);
X
Xiaoyu Wang 已提交
1387 1388
    tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, epoch %d since out of memory", tmq->consumerId, vgId,
            epoch);
L
fix txn  
Liu Jicong 已提交
1389
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1390
  }
L
Liu Jicong 已提交
1391

L
Liu Jicong 已提交
1392
  pRspWrapper->tmqRspType = rspType;
wmmhello's avatar
wmmhello 已提交
1393 1394
//  pRspWrapper->vgHandle = pVg;
//  pRspWrapper->topicHandle = pTopic;
H
Haojun Liao 已提交
1395
  pRspWrapper->reqId = requestId;
1396
  pRspWrapper->pEpset = pMsg->pEpSet;
wmmhello's avatar
wmmhello 已提交
1397 1398
  pRspWrapper->vgId = vgId;
  strcpy(pRspWrapper->topicName, pParam->topicName);
L
Liu Jicong 已提交
1399

1400
  pMsg->pEpSet = NULL;
L
Liu Jicong 已提交
1401
  if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1402 1403
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
1404
    tDecodeMqDataRsp(&decoder, &pRspWrapper->dataRsp);
wmmhello's avatar
wmmhello 已提交
1405
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1406
    memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1407

1408 1409
    char buf[TSDB_OFFSET_LEN];
    tFormatOffset(buf, TSDB_OFFSET_LEN, &pRspWrapper->dataRsp.rspOffset);
H
Haojun Liao 已提交
1410
    tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req ver:%" PRId64 ", rsp:%s type %d, reqId:0x%" PRIx64,
H
Haojun Liao 已提交
1411
             tmq->consumerId, vgId, pRspWrapper->dataRsp.reqOffset.version, buf, rspType, requestId);
L
Liu Jicong 已提交
1412
  } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
1413 1414
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
1415
    tDecodeMqMetaRsp(&decoder, &pRspWrapper->metaRsp);
1416
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1417
    memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1418 1419 1420 1421 1422 1423
  } else if (rspType == TMQ_MSG_TYPE__TAOSX_RSP) {
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp);
    tDecoderClear(&decoder);
    memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead));
X
Xiaoyu Wang 已提交
1424 1425
  } else {  // invalid rspType
    tscError("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType);
L
Liu Jicong 已提交
1426
  }
L
Liu Jicong 已提交
1427

L
Liu Jicong 已提交
1428
  taosMemoryFree(pMsg->pData);
H
Haojun Liao 已提交
1429
  taosWriteQitem(tmq->mqueue, pRspWrapper);
L
Liu Jicong 已提交
1430

1431
  int32_t total = taosQueueItemSize(tmq->mqueue);
H
Haojun Liao 已提交
1432
  tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64,
1433
           tmq->consumerId, rspType, vgId, total, requestId);
H
Haojun Liao 已提交
1434

1435
  tsem_post(&tmq->rspSem);
1436
  taosReleaseRef(tmqMgmt.rsetId, refId);
wmmhello's avatar
wmmhello 已提交
1437
  taosMemoryFree(pParam);
1438

L
Liu Jicong 已提交
1439
  return 0;
H
Haojun Liao 已提交
1440

L
fix txn  
Liu Jicong 已提交
1441
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1442
  if (epoch == tmq->epoch) {
wmmhello's avatar
wmmhello 已提交
1443 1444 1445 1446 1447
    taosWLockLatch(&tmq->lock);
    SMqClientVg* pVg = getVgInfo(tmq, pParam->topicName, vgId);
    if(pVg){
      atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
    }
wmmhello's avatar
wmmhello 已提交
1448
    taosWUnLockLatch(&tmq->lock);
L
Liu Jicong 已提交
1449
  }
H
Haojun Liao 已提交
1450

1451
  tsem_post(&tmq->rspSem);
1452
  taosReleaseRef(tmqMgmt.rsetId, refId);
wmmhello's avatar
wmmhello 已提交
1453
  taosMemoryFree(pParam);
1454

L
Liu Jicong 已提交
1455
  return -1;
1456 1457
}

H
Haojun Liao 已提交
1458 1459 1460 1461 1462
typedef struct SVgroupSaveInfo {
  STqOffsetVal offset;
  int64_t      numOfRows;
} SVgroupSaveInfo;

H
Haojun Liao 已提交
1463 1464 1465 1466 1467 1468
static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap,
                                   tmq_t* tmq) {
  pTopic->schema = pTopicEp->schema;
  pTopicEp->schema.nCols = 0;
  pTopicEp->schema.pSchema = NULL;

X
Xiaoyu Wang 已提交
1469
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
H
Haojun Liao 已提交
1470 1471 1472 1473 1474
  int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);

  tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN);

wmmhello's avatar
wmmhello 已提交
1475
  tscDebug("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet);
H
Haojun Liao 已提交
1476 1477 1478 1479
  pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));

  for (int32_t j = 0; j < vgNumGet; j++) {
    SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
H
Haojun Liao 已提交
1480 1481

    makeTopicVgroupKey(vgKey, pTopic->topicName, pVgEp->vgId);
H
Haojun Liao 已提交
1482
    SVgroupSaveInfo* pInfo = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
H
Haojun Liao 已提交
1483

X
Xiaoyu Wang 已提交
1484 1485
    int64_t      numOfRows = 0;
    STqOffsetVal offsetNew = {.type = tmq->resetOffsetCfg};
H
Haojun Liao 已提交
1486 1487 1488
    if (pInfo != NULL) {
      offsetNew = pInfo->offset;
      numOfRows = pInfo->numOfRows;
H
Haojun Liao 已提交
1489 1490 1491 1492 1493 1494
    }

    SMqClientVg clientVg = {
        .pollCnt = 0,
        .vgId = pVgEp->vgId,
        .epSet = pVgEp->epSet,
wmmhello's avatar
wmmhello 已提交
1495
        .vgStatus = TMQ_VG_STATUS__IDLE,
H
Haojun Liao 已提交
1496
        .vgSkipCnt = 0,
H
Haojun Liao 已提交
1497
        .emptyBlockReceiveTs = 0,
H
Haojun Liao 已提交
1498
        .numOfRows = numOfRows,
H
Haojun Liao 已提交
1499 1500
    };

H
Haojun Liao 已提交
1501 1502 1503 1504
    clientVg.offsetInfo.currentOffset = offsetNew;
    clientVg.offsetInfo.committedOffset = offsetNew;
    clientVg.offsetInfo.walVerBegin = -1;
    clientVg.offsetInfo.walVerEnd = -1;
1505 1506 1507
    clientVg.seekUpdated = false;
    clientVg.receivedInfoFromVnode = false;

H
Haojun Liao 已提交
1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520
    taosArrayPush(pTopic->vgs, &clientVg);
  }
}

static void freeClientVgInfo(void* param) {
  SMqClientTopic* pTopic = param;
  if (pTopic->schema.nCols) {
    taosMemoryFreeClear(pTopic->schema.pSchema);
  }

  taosArrayDestroy(pTopic->vgs);
}

1521
static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
1522 1523
  bool set = false;

1524
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
1525
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
1526

X
Xiaoyu Wang 已提交
1527 1528
  char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
  tscDebug("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
1529
           tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur);
wmmhello's avatar
wmmhello 已提交
1530 1531 1532
  if (epoch <= tmq->epoch) {
    return false;
  }
1533 1534 1535 1536 1537 1538

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

H
Haojun Liao 已提交
1539 1540
  SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pVgOffsetHashMap == NULL) {
1541 1542 1543
    taosArrayDestroy(newTopics);
    return false;
  }
1544

H
Haojun Liao 已提交
1545
  // todo extract method
1546 1547 1548 1549 1550
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
wmmhello's avatar
wmmhello 已提交
1551
      tscDebug("consumer:0x%" PRIx64 ", current vg num: %d", tmq->consumerId, vgNumCur);
1552 1553
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
H
Haojun Liao 已提交
1554 1555
        makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId);

1556 1557
        char buf[TSDB_OFFSET_LEN];
        tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.currentOffset);
X
Xiaoyu Wang 已提交
1558 1559
        tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId,
                 vgKey, buf);
H
Haojun Liao 已提交
1560

H
Haojun Liao 已提交
1561
        SVgroupSaveInfo info = {.offset = pVgCur->offsetInfo.currentOffset, .numOfRows = pVgCur->numOfRows};
H
Haojun Liao 已提交
1562
        taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo));
1563 1564 1565 1566 1567 1568 1569
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
H
Haojun Liao 已提交
1570
    initClientTopicFromRsp(&topic, pTopicEp, pVgOffsetHashMap, tmq);
1571 1572
    taosArrayPush(newTopics, &topic);
  }
1573

H
Haojun Liao 已提交
1574 1575
  taosHashCleanup(pVgOffsetHashMap);

wmmhello's avatar
wmmhello 已提交
1576
  taosWLockLatch(&tmq->lock);
1577
  // destroy current buffered existed topics info
1578
  if (tmq->clientTopics) {
H
Haojun Liao 已提交
1579
    taosArrayDestroyEx(tmq->clientTopics, freeClientVgInfo);
X
Xiaoyu Wang 已提交
1580
  }
H
Haojun Liao 已提交
1581
  tmq->clientTopics = newTopics;
wmmhello's avatar
wmmhello 已提交
1582
  taosWUnLockLatch(&tmq->lock);
1583

X
Xiaoyu Wang 已提交
1584
  int8_t flag = (topicNumGet == 0) ? TMQ_CONSUMER_STATUS__NO_TOPIC : TMQ_CONSUMER_STATUS__READY;
H
Haojun Liao 已提交
1585
  atomic_store_8(&tmq->status, flag);
X
Xiaoyu Wang 已提交
1586
  atomic_store_32(&tmq->epoch, epoch);
H
Haojun Liao 已提交
1587

1588
  tscDebug("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId);
X
Xiaoyu Wang 已提交
1589 1590 1591
  return set;
}

1592
int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
1593
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
1594 1595 1596
  tmq_t*           tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);

  if (tmq == NULL) {
1597
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
wmmhello's avatar
wmmhello 已提交
1598
//    pParam->pUserFn(tmq, terrno, NULL, pParam->pParam);
1599

1600
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1601
    taosMemoryFree(pMsg->pEpSet);
1602 1603
    taosMemoryFree(pParam);
    return terrno;
1604 1605
  }

H
Haojun Liao 已提交
1606
  if (code != TSDB_CODE_SUCCESS) {
1607 1608 1609 1610 1611 1612 1613 1614 1615
    tscError("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code));
    pParam->pUserFn(tmq, code, NULL, pParam->pParam);

    taosReleaseRef(tmqMgmt.rsetId, pParam->refId);

    taosMemoryFree(pMsg->pData);
    taosMemoryFree(pMsg->pEpSet);
    taosMemoryFree(pParam);
    return code;
1616
  }
L
Liu Jicong 已提交
1617

L
Liu Jicong 已提交
1618
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1619
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1620
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1621 1622 1623
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
  if (head->epoch <= epoch) {
1624 1625
    tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep",
             tmq->consumerId, head->epoch, epoch);
1626

1627 1628 1629 1630 1631 1632 1633 1634
    if (tmq->status == TMQ_CONSUMER_STATUS__RECOVER) {
      SMqAskEpRsp rsp;
      tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
      int8_t flag = (taosArrayGetSize(rsp.topics) == 0) ? TMQ_CONSUMER_STATUS__NO_TOPIC : TMQ_CONSUMER_STATUS__READY;
      atomic_store_8(&tmq->status, flag);
      tDeleteSMqAskEpRsp(&rsp);
    }

X
Xiaoyu Wang 已提交
1635
  } else {
1636 1637
    tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId,
             head->epoch, epoch);
1638
  }
L
Liu Jicong 已提交
1639

1640
  pParam->pUserFn(tmq, code, pMsg, pParam->pParam);
1641 1642
  taosReleaseRef(tmqMgmt.rsetId, pParam->refId);

dengyihao's avatar
dengyihao 已提交
1643
  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
1644
  taosMemoryFree(pMsg->pData);
1645
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
1646
  return code;
1647 1648
}

L
Liu Jicong 已提交
1649
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1650 1651 1652 1653
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1654

1655
  pReq->withTbName = tmq->withTbName;
L
Liu Jicong 已提交
1656
  pReq->consumerId = tmq->consumerId;
1657
  pReq->timeout = timeout;
X
Xiaoyu Wang 已提交
1658
  pReq->epoch = tmq->epoch;
H
Haojun Liao 已提交
1659
  pReq->reqOffset = pVg->offsetInfo.currentOffset;
D
dapan1121 已提交
1660
  pReq->head.vgId = pVg->vgId;
1661 1662
  pReq->useSnapshot = tmq->useSnapshot;
  pReq->reqId = generateRequestId();
1663 1664
}

L
Liu Jicong 已提交
1665 1666
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
L
Liu Jicong 已提交
1667
  pRspObj->resType = RES_TYPE__TMQ_META;
L
Liu Jicong 已提交
1668 1669 1670 1671 1672 1673 1674 1675
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

1676
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
L
Liu Jicong 已提交
1677 1678
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
1679

1680
  (*numOfRows) = 0;
L
Liu Jicong 已提交
1681 1682
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
1683

L
Liu Jicong 已提交
1684
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1685
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1686
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
L
Liu Jicong 已提交
1687

L
Liu Jicong 已提交
1688 1689
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
1690

L
Liu Jicong 已提交
1691
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1692 1693
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1694

1695
  // extract the rows in this data packet
X
Xiaoyu Wang 已提交
1696
  for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) {
1697
    SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, i);
X
Xiaoyu Wang 已提交
1698
    int64_t            rows = htobe64(pRetrieve->numOfRows);
1699
    pVg->numOfRows += rows;
1700
    (*numOfRows) += rows;
1701 1702
  }

L
Liu Jicong 已提交
1703
  return pRspObj;
X
Xiaoyu Wang 已提交
1704 1705
}

1706
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
L
Liu Jicong 已提交
1707
  SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
1708
  pRspObj->resType = RES_TYPE__TMQ_METADATA;
L
Liu Jicong 已提交
1709 1710 1711 1712
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;
  pRspObj->resIter = -1;
1713
  memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp));
L
Liu Jicong 已提交
1714 1715 1716

  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
1717
  if (!pWrapper->taosxRsp.withSchema) {
L
Liu Jicong 已提交
1718 1719 1720
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }

1721 1722 1723 1724 1725 1726 1727
  // extract the rows in this data packet
  for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) {
    SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, i);
    int64_t            rows = htobe64(pRetrieve->numOfRows);
    pVg->numOfRows += rows;
    (*numOfRows) += rows;
  }
L
Liu Jicong 已提交
1728 1729 1730
  return pRspObj;
}

1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763
static int32_t handleErrorBeforePoll(SMqClientVg* pVg, tmq_t* pTmq) {
  atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  tsem_post(&pTmq->rspSem);
  return -1;
}

static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg, int64_t timeout) {
  SMqPollReq req = {0};
  tmqBuildConsumeReqImpl(&req, pTmq, timeout, pTopic, pVg);

  int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
  if (msgSize < 0) {
    return handleErrorBeforePoll(pVg, pTmq);
  }

  char* msg = taosMemoryCalloc(1, msgSize);
  if (NULL == msg) {
    return handleErrorBeforePoll(pVg, pTmq);
  }

  if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
    taosMemoryFree(msg);
    return handleErrorBeforePoll(pVg, pTmq);
  }

  SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
  if (pParam == NULL) {
    taosMemoryFree(msg);
    return handleErrorBeforePoll(pVg, pTmq);
  }

  pParam->refId = pTmq->refId;
  pParam->epoch = pTmq->epoch;
wmmhello's avatar
wmmhello 已提交
1764 1765 1766
//  pParam->pVg = pVg;  // pVg may be released,fix it
//  pParam->pTopic = pTopic;
  strcpy(pParam->topicName, pTopic->topicName);
1767
  pParam->vgId = pVg->vgId;
H
Haojun Liao 已提交
1768
  pParam->requestId = req.reqId;
1769 1770 1771 1772 1773 1774 1775 1776

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pParam);
    taosMemoryFree(msg);
    return handleErrorBeforePoll(pVg, pTmq);
  }

H
Haojun Liao 已提交
1777
  sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
1778 1779 1780 1781 1782 1783 1784
  sendInfo->requestId = req.reqId;
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqPollCb;
  sendInfo->msgType = TDMT_VND_TMQ_CONSUME;

  int64_t transporterId = 0;
1785
  char    offsetFormatBuf[TSDB_OFFSET_LEN];
H
Haojun Liao 已提交
1786
  tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.currentOffset);
1787

X
Xiaoyu Wang 已提交
1788 1789
  tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId,
           pTopic->topicName, pVg->vgId, pTmq->epoch, offsetFormatBuf, req.reqId);
1790 1791 1792
  asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);

  pVg->pollCnt++;
1793
  pVg->seekUpdated = false;   // reset this flag.
1794 1795 1796 1797 1798
  pTmq->pollCnt++;

  return TSDB_CODE_SUCCESS;
}

1799
// broadcast the poll request to all related vnodes
H
Haojun Liao 已提交
1800
static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
1801 1802 1803
  if(atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER){
    return 0;
  }
1804
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
X
Xiaoyu Wang 已提交
1805
  tscDebug("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics);
1806 1807

  for (int i = 0; i < numOfTopics; i++) {
X
Xiaoyu Wang 已提交
1808
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
X
Xiaoyu Wang 已提交
1809
    int32_t         numOfVg = taosArrayGetSize(pTopic->vgs);
1810 1811

    for (int j = 0; j < numOfVg; j++) {
X
Xiaoyu Wang 已提交
1812
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
X
Xiaoyu Wang 已提交
1813
      if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) {  // less than 100ms
1814
        tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId,
X
Xiaoyu Wang 已提交
1815
                 tmq->epoch, pVg->vgId);
H
Haojun Liao 已提交
1816 1817 1818
        continue;
      }

1819
      int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
1820
      if (vgStatus == TMQ_VG_STATUS__WAIT) {
L
Liu Jicong 已提交
1821
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
1822
        tscTrace("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
X
Xiaoyu Wang 已提交
1823
                 pVg->vgId, vgSkipCnt);
X
Xiaoyu Wang 已提交
1824 1825
        continue;
      }
1826

L
Liu Jicong 已提交
1827
      atomic_store_32(&pVg->vgSkipCnt, 0);
1828 1829 1830
      int32_t code = doTmqPollImpl(tmq, pTopic, pVg, timeout);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
D
dapan1121 已提交
1831
      }
X
Xiaoyu Wang 已提交
1832 1833
    }
  }
1834

1835
  tscDebug("consumer:0x%" PRIx64 " end to poll data", tmq->consumerId);
X
Xiaoyu Wang 已提交
1836 1837 1838
  return 0;
}

H
Haojun Liao 已提交
1839
static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
L
Liu Jicong 已提交
1840
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1841
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1842 1843
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1844
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
1845
      doUpdateLocalEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1846
      /*tmqClearUnhandleMsg(tmq);*/
L
Liu Jicong 已提交
1847
      tDeleteSMqAskEpRsp(rspMsg);
X
Xiaoyu Wang 已提交
1848 1849
      *pReset = true;
    } else {
L
Liu Jicong 已提交
1850
      tmqFreeRspWrapper(rspWrapper);
X
Xiaoyu Wang 已提交
1851 1852 1853 1854 1855 1856 1857 1858
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

H
Haojun Liao 已提交
1859
static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
H
Haojun Liao 已提交
1860
  tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems);
1861

X
Xiaoyu Wang 已提交
1862
  while (1) {
1863 1864
    SMqRspWrapper* pRspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&pRspWrapper);
1865

1866
    if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1867
      taosReadAllQitems(tmq->mqueue, tmq->qall);
1868 1869
      taosGetQitem(tmq->qall, (void**)&pRspWrapper);
      if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1870 1871
        return NULL;
      }
X
Xiaoyu Wang 已提交
1872 1873
    }

X
Xiaoyu Wang 已提交
1874
    tscDebug("consumer:0x%" PRIx64 " handle rsp, type:%d", tmq->consumerId, pRspWrapper->tmqRspType);
H
Haojun Liao 已提交
1875

1876 1877
    if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(pRspWrapper);
L
Liu Jicong 已提交
1878
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
H
Haojun Liao 已提交
1879
      tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(terrno));
L
Liu Jicong 已提交
1880
      return NULL;
1881 1882
    } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
H
Haojun Liao 已提交
1883

X
Xiaoyu Wang 已提交
1884
      int32_t     consumerEpoch = atomic_load_32(&tmq->epoch);
1885 1886 1887
      SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp;

      if (pDataRsp->head.epoch == consumerEpoch) {
wmmhello's avatar
wmmhello 已提交
1888 1889 1890 1891 1892 1893 1894 1895
        SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
        pollRspWrapper->vgHandle = pVg;
        pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
        if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){
          tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
                   pollRspWrapper->topicName, pollRspWrapper->vgId);
          return NULL;
        }
1896 1897 1898 1899
        // update the epset
        if (pollRspWrapper->pEpset != NULL) {
          SEp* pEp = GET_ACTIVE_EP(pollRspWrapper->pEpset);
          SEp* pOld = GET_ACTIVE_EP(&(pVg->epSet));
X
Xiaoyu Wang 已提交
1900 1901
          tscDebug("consumer:0x%" PRIx64 " update epset vgId:%d, ep:%s:%d, old ep:%s:%d", tmq->consumerId, pVg->vgId,
                   pEp->fqdn, pEp->port, pOld->fqdn, pOld->port);
1902 1903 1904
          pVg->epSet = *pollRspWrapper->pEpset;
        }

1905 1906 1907
        // update the local offset value only for the returned values, only when the local offset is NOT updated
        // by tmq_offset_seek function
        if (!pVg->seekUpdated) {
T
t_max 已提交
1908
          tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", tmq->consumerId);
1909
          pVg->offsetInfo.currentOffset = pDataRsp->rspOffset;
T
t_max 已提交
1910 1911
        } else {
          tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", tmq->consumerId);
wmmhello's avatar
wmmhello 已提交
1912
        }
1913 1914

        // update the status
X
Xiaoyu Wang 已提交
1915
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
H
Haojun Liao 已提交
1916

1917 1918 1919
        // update the valid wal version range
        pVg->offsetInfo.walVerBegin = pDataRsp->head.walsver;
        pVg->offsetInfo.walVerEnd = pDataRsp->head.walever;
1920
        pVg->receivedInfoFromVnode = true;
1921

1922 1923
        char buf[TSDB_OFFSET_LEN];
        tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset);
1924
        if (pDataRsp->blockNum == 0) {
X
Xiaoyu Wang 已提交
1925 1926 1927
          tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64
                   " total:%" PRId64 " reqId:0x%" PRIx64,
                   tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
1928
          pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
1929
          pVg->emptyBlockReceiveTs = taosGetTimestampMs();
L
Liu Jicong 已提交
1930
          taosFreeQitem(pollRspWrapper);
1931
        } else {  // build rsp
X
Xiaoyu Wang 已提交
1932
          int64_t    numOfRows = 0;
1933
          SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
H
Haojun Liao 已提交
1934
          tmq->totalRows += numOfRows;
1935
          pVg->emptyBlockReceiveTs = 0;
H
Haojun Liao 已提交
1936
          tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
H
Haojun Liao 已提交
1937
                   " vg total:%" PRId64 " total:%" PRId64 ", reqId:0x%" PRIx64,
H
Haojun Liao 已提交
1938
                   tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
H
Haojun Liao 已提交
1939
                   pollRspWrapper->reqId);
1940 1941 1942
          taosFreeQitem(pollRspWrapper);
          return pRsp;
        }
X
Xiaoyu Wang 已提交
1943
      } else {
H
Haojun Liao 已提交
1944
        tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
1945
                 tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch);
1946
        pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
L
Liu Jicong 已提交
1947 1948
        taosFreeQitem(pollRspWrapper);
      }
1949
    } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
1950
      // todo handle the wal range and epset for each vgroup
1951
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
L
Liu Jicong 已提交
1952
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
1953 1954 1955

      tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId);

L
Liu Jicong 已提交
1956
      if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
wmmhello's avatar
wmmhello 已提交
1957 1958 1959 1960 1961 1962 1963 1964
        SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
        pollRspWrapper->vgHandle = pVg;
        pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
        if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){
          tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
                   pollRspWrapper->topicName, pollRspWrapper->vgId);
          return NULL;
        }
H
Haojun Liao 已提交
1965

1966
        if(pollRspWrapper->metaRsp.rspOffset.type != 0){    // if offset is validate
H
Haojun Liao 已提交
1967
          pVg->offsetInfo.currentOffset = pollRspWrapper->metaRsp.rspOffset;
1968
        }
1969

L
Liu Jicong 已提交
1970 1971
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // build rsp
L
Liu Jicong 已提交
1972
        SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1973 1974 1975
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
H
Haojun Liao 已提交
1976
        tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
1977
                 tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
1978
        pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
L
Liu Jicong 已提交
1979
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1980
      }
1981 1982
    } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
X
Xiaoyu Wang 已提交
1983
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
H
Haojun Liao 已提交
1984

L
Liu Jicong 已提交
1985
      if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
wmmhello's avatar
wmmhello 已提交
1986 1987 1988 1989 1990 1991 1992 1993
        SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
        pollRspWrapper->vgHandle = pVg;
        pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
        if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){
          tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
                   pollRspWrapper->topicName, pollRspWrapper->vgId);
          return NULL;
        }
H
Haojun Liao 已提交
1994

T
t_max 已提交
1995 1996 1997 1998 1999 2000 2001 2002 2003
        // update the local offset value only for the returned values, only when the local offset is NOT updated
        // by tmq_offset_seek function
        if (!pVg->seekUpdated) {
          if(pollRspWrapper->taosxRsp.rspOffset.type != 0) {    // if offset is validate
            tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", tmq->consumerId);
            pVg->offsetInfo.currentOffset = pollRspWrapper->taosxRsp.rspOffset;
          }
        } else {
          tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", tmq->consumerId);
2004
        }
2005

L
Liu Jicong 已提交
2006
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
H
Haojun Liao 已提交
2007

L
Liu Jicong 已提交
2008
        if (pollRspWrapper->taosxRsp.blockNum == 0) {
H
Haojun Liao 已提交
2009 2010
          tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 " reqId:0x%" PRIx64,
                   tmq->consumerId, pVg->vgId, pVg->numOfRows, pollRspWrapper->reqId);
H
Haojun Liao 已提交
2011
          pVg->emptyBlockReceiveTs = taosGetTimestampMs();
2012
          pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
H
Haojun Liao 已提交
2013
          taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
2014
          continue;
H
Haojun Liao 已提交
2015
        } else {
X
Xiaoyu Wang 已提交
2016
          pVg->emptyBlockReceiveTs = 0;  // reset the ts
L
Liu Jicong 已提交
2017
        }
wmmhello's avatar
wmmhello 已提交
2018

L
Liu Jicong 已提交
2019
        // build rsp
X
Xiaoyu Wang 已提交
2020
        void*   pRsp = NULL;
2021
        int64_t numOfRows = 0;
L
Liu Jicong 已提交
2022
        if (pollRspWrapper->taosxRsp.createTableNum == 0) {
2023
          pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
L
Liu Jicong 已提交
2024
        } else {
2025
          pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
wmmhello's avatar
wmmhello 已提交
2026
        }
H
Haojun Liao 已提交
2027

2028 2029
        tmq->totalRows += numOfRows;

2030 2031
        char buf[TSDB_OFFSET_LEN];
        tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.currentOffset);
H
Haojun Liao 已提交
2032
        tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
X
Xiaoyu Wang 已提交
2033
                 ", vg total:%" PRId64 " total:%" PRId64 " reqId:0x%" PRIx64,
H
Haojun Liao 已提交
2034
                 tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows,
H
Haojun Liao 已提交
2035
                 tmq->totalRows, pollRspWrapper->reqId);
H
Haojun Liao 已提交
2036 2037

        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
2038
        return pRsp;
H
Haojun Liao 已提交
2039

L
Liu Jicong 已提交
2040
      } else {
H
Haojun Liao 已提交
2041
        tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
2042
                 tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
2043
        pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
L
Liu Jicong 已提交
2044 2045
        taosFreeQitem(pollRspWrapper);
      }
X
Xiaoyu Wang 已提交
2046
    } else {
H
Haojun Liao 已提交
2047 2048
      tscDebug("consumer:0x%" PRIx64 " not data msg received", tmq->consumerId);

X
Xiaoyu Wang 已提交
2049
      bool reset = false;
2050 2051
      tmqHandleNoPollRsp(tmq, pRspWrapper, &reset);
      taosFreeQitem(pRspWrapper);
X
Xiaoyu Wang 已提交
2052
      if (pollIfReset && reset) {
2053
        tscDebug("consumer:0x%" PRIx64 ", reset and repoll", tmq->consumerId);
2054
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
2055 2056 2057 2058 2059
      }
    }
  }
}

2060
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
2061 2062
  void*   rspObj;
  int64_t startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
2063

X
Xiaoyu Wang 已提交
2064 2065
  tscDebug("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime,
           timeout);
L
Liu Jicong 已提交
2066

2067
  // in no topic status, delayed task also need to be processed
L
Liu Jicong 已提交
2068
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
2069
    tscDebug("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId);
2070
    taosMsleep(500);  //     sleep for a while
2071 2072 2073
    return NULL;
  }

wmmhello's avatar
wmmhello 已提交
2074
  while (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
L
Liu Jicong 已提交
2075
    int32_t retryCnt = 0;
2076
    while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) {
H
Haojun Liao 已提交
2077
      if (retryCnt++ > 40) {
L
Liu Jicong 已提交
2078 2079
        return NULL;
      }
2080

H
Haojun Liao 已提交
2081
      tscDebug("consumer:0x%" PRIx64 " not ready, retry:%d/40 in 500ms", tmq->consumerId, retryCnt);
L
Liu Jicong 已提交
2082 2083 2084 2085
      taosMsleep(500);
    }
  }

X
Xiaoyu Wang 已提交
2086
  while (1) {
L
Liu Jicong 已提交
2087
    tmqHandleAllDelayedTask(tmq);
2088

L
Liu Jicong 已提交
2089
    if (tmqPollImpl(tmq, timeout) < 0) {
2090
      tscDebug("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId);
L
Liu Jicong 已提交
2091
    }
L
Liu Jicong 已提交
2092

2093
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
2094
    if (rspObj) {
2095
      tscDebug("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj);
L
Liu Jicong 已提交
2096
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
2097
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
2098
      tscDebug("consumer:0x%" PRIx64 " return null since no committed offset", tmq->consumerId);
L
Liu Jicong 已提交
2099
      return NULL;
X
Xiaoyu Wang 已提交
2100
    }
2101

2102
    if (timeout >= 0) {
L
Liu Jicong 已提交
2103
      int64_t currentTime = taosGetTimestampMs();
2104 2105 2106
      int64_t elapsedTime = currentTime - startTime;
      if (elapsedTime > timeout) {
        tscDebug("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
L
Liu Jicong 已提交
2107
                 tmq->consumerId, tmq->epoch, startTime, currentTime);
X
Xiaoyu Wang 已提交
2108 2109
        return NULL;
      }
2110
      tsem_timewait(&tmq->rspSem, (timeout - elapsedTime));
L
Liu Jicong 已提交
2111 2112
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
L
Liu Jicong 已提交
2113
      tsem_timewait(&tmq->rspSem, 1000);
X
Xiaoyu Wang 已提交
2114 2115 2116 2117
    }
  }
}

2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131
static void displayConsumeStatistics(const tmq_t* pTmq) {
  int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics);
  tscDebug("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d",
           pTmq->consumerId, pTmq->pollCnt, pTmq->totalRows, numOfTopics, pTmq->epoch);

  tscDebug("consumer:0x%" PRIx64 " rows dist begin: ", pTmq->consumerId);
  for (int32_t i = 0; i < numOfTopics; ++i) {
    SMqClientTopic* pTopics = taosArrayGet(pTmq->clientTopics, i);

    tscDebug("consumer:0x%" PRIx64 " topic:%d", pTmq->consumerId, i);
    int32_t numOfVgs = taosArrayGetSize(pTopics->vgs);
    for (int32_t j = 0; j < numOfVgs; ++j) {
      SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j);
      tscDebug("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows);
2132
    }
2133
  }
2134

2135 2136
  tscDebug("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId);
}
2137

2138
int32_t tmq_consumer_close(tmq_t* tmq) {
X
Xiaoyu Wang 已提交
2139
  tscDebug("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status);
2140
  displayConsumeStatistics(tmq);
2141

2142 2143 2144 2145 2146 2147
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
    // if auto commit is set, commit before close consumer. Otherwise, do nothing.
    if (tmq->autoCommit) {
      int32_t rsp = tmq_commit_sync(tmq, NULL);
      if (rsp != 0) {
        return rsp;
2148 2149
      }
    }
2150
    taosSsleep(2);  // sleep 2s for hb to send offset and rows to server
2151

L
Liu Jicong 已提交
2152
    int32_t     retryCnt = 0;
2153
    tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
2154
    while (1) {
2155
      int32_t rsp = tmq_subscribe(tmq, lst);
L
Liu Jicong 已提交
2156 2157 2158 2159 2160 2161 2162 2163
      if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
        break;
      } else {
        retryCnt++;
        taosMsleep(500);
      }
    }

2164
    tmq_list_destroy(lst);
2165
  } else {
X
Xiaoyu Wang 已提交
2166
    tscWarn("consumer:0x%" PRIx64 " not in ready state, close it directly", tmq->consumerId);
L
Liu Jicong 已提交
2167
  }
H
Haojun Liao 已提交
2168

2169
  taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
L
Liu Jicong 已提交
2170
  return 0;
2171
}
L
Liu Jicong 已提交
2172

L
Liu Jicong 已提交
2173 2174
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
2175
    return "success";
L
Liu Jicong 已提交
2176
  } else if (err == -1) {
L
Liu Jicong 已提交
2177 2178 2179
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
2180 2181
  }
}
L
Liu Jicong 已提交
2182

L
Liu Jicong 已提交
2183 2184 2185 2186 2187
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
    return TMQ_RES_TABLE_META;
2188 2189
  } else if (TD_RES_TMQ_METADATA(res)) {
    return TMQ_RES_METADATA;
L
Liu Jicong 已提交
2190 2191 2192 2193 2194
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
2195
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
2196 2197
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
2198
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
2199 2200 2201
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
2202 2203 2204
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
2205 2206 2207 2208 2209
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
2210 2211 2212 2213
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
2214 2215 2216
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
2217 2218 2219
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
2220 2221 2222 2223 2224
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
2225 2226 2227 2228
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
2229 2230 2231
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
2232
  } else if (TD_RES_TMQ_METADATA(res)) {
2233 2234
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
2235 2236 2237 2238
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
2239

2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262
int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*) res;
    STqOffsetVal* pOffset = &pRspObj->rsp.rspOffset;
    if (pOffset->type == TMQ_OFFSET__LOG) {
      return pRspObj->rsp.rspOffset.version;
    }
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pRspObj = (SMqMetaRspObj*)res;
    if (pRspObj->metaRsp.rspOffset.type == TMQ_OFFSET__LOG) {
      return pRspObj->metaRsp.rspOffset.version;
    }
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*) res;
    if (pRspObj->rsp.rspOffset.type == TMQ_OFFSET__LOG) {
      return pRspObj->rsp.rspOffset.version;
    }
  }

  // data from tsdb, no valid offset info
  return -1;
}

L
Liu Jicong 已提交
2263 2264 2265 2266 2267 2268 2269
const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
2270
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
2271 2272
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
L
Liu Jicong 已提交
2273 2274 2275
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
2276
    }
L
Liu Jicong 已提交
2277 2278
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
  }
L
Liu Jicong 已提交
2279 2280
  return NULL;
}
2281

2282 2283 2284 2285
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) {
  if (pRes == NULL) {  // here needs to commit all offsets.
    asyncCommitAllOffsets(tmq, cb, param);
  } else {  // only commit one offset
2286
    asyncCommitOffset(tmq, pRes, TDMT_VND_TMQ_COMMIT_OFFSET, cb, param);
2287
  }
L
Liu Jicong 已提交
2288 2289
}

2290
static void commitCallBackFn(tmq_t *UNUSED_PARAM(tmq), int32_t code, void* param) {
2291 2292 2293
  SSyncCommitInfo* pInfo = (SSyncCommitInfo*) param;
  pInfo->code = code;
  tsem_post(&pInfo->sem);
2294
}
2295

2296 2297 2298 2299 2300 2301 2302 2303 2304
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
  int32_t code = 0;

  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
  tsem_init(&pInfo->sem, 0, 0);
  pInfo->code = 0;

  if (pRes == NULL) {
    asyncCommitAllOffsets(tmq, commitCallBackFn, pInfo);
H
Haojun Liao 已提交
2305
  } else {
2306
    asyncCommitOffset(tmq, pRes, TDMT_VND_TMQ_COMMIT_OFFSET, commitCallBackFn, pInfo);
2307 2308
  }

2309 2310
  tsem_wait(&pInfo->sem);
  code = pInfo->code;
H
Haojun Liao 已提交
2311 2312

  tsem_destroy(&pInfo->sem);
2313 2314
  taosMemoryFree(pInfo);

X
Xiaoyu Wang 已提交
2315
  tscDebug("consumer:0x%" PRIx64 " sync commit done, code:%s", tmq->consumerId, tstrerror(code));
2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327
  return code;
}

void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
  SAskEpInfo* pInfo = param;
  pInfo->code = code;

  if (code == TSDB_CODE_SUCCESS) {
    SMqRspHead* head = pDataBuf->pData;

    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &rsp);
2328
    doUpdateLocalEp(pTmq, head->epoch, &rsp);
2329 2330 2331
    tDeleteSMqAskEpRsp(&rsp);
  }

H
Haojun Liao 已提交
2332
  tsem_post(&pInfo->sem);
2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358
}

void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return;
  }

  SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0);
  if (pWrapper == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return;
  }

  SMqRspHead* head = pDataBuf->pData;

  pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
  pWrapper->epoch = head->epoch;
  memcpy(&pWrapper->msg, pDataBuf->pData, sizeof(SMqRspHead));
  tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &pWrapper->msg);

  taosWriteQitem(pTmq->mqueue, pWrapper);
}

int32_t doAskEp(tmq_t* pTmq) {
  SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo));
H
Haojun Liao 已提交
2359
  tsem_init(&pInfo->sem, 0, 0);
2360 2361

  asyncAskEp(pTmq, updateEpCallbackFn, pInfo);
H
Haojun Liao 已提交
2362
  tsem_wait(&pInfo->sem);
2363 2364

  int32_t code = pInfo->code;
H
Haojun Liao 已提交
2365
  tsem_destroy(&pInfo->sem);
2366 2367 2368 2369 2370
  taosMemoryFree(pInfo);
  return code;
}

void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) {
2371
  SMqAskEpReq req = {0};
2372 2373 2374
  req.consumerId = pTmq->consumerId;
  req.epoch = pTmq->epoch;
  strcpy(req.cgroup, pTmq->groupId);
2375 2376 2377

  int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
  if (tlen < 0) {
2378 2379 2380
    tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", pTmq->consumerId);
    askEpFn(pTmq, TSDB_CODE_INVALID_PARA, NULL, param);
    return;
2381 2382 2383 2384
  }

  void* pReq = taosMemoryCalloc(1, tlen);
  if (pReq == NULL) {
2385 2386 2387
    tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", pTmq->consumerId, tlen);
    askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
    return;
2388 2389 2390
  }

  if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
2391
    tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen);
2392
    taosMemoryFree(pReq);
2393 2394 2395

    askEpFn(pTmq, TSDB_CODE_INVALID_PARA, NULL, param);
    return;
2396 2397 2398 2399
  }

  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
  if (pParam == NULL) {
2400
    tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId);
2401
    taosMemoryFree(pReq);
2402 2403 2404

    askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
    return;
2405 2406
  }

2407 2408 2409 2410
  pParam->refId = pTmq->refId;
  pParam->epoch = pTmq->epoch;
  pParam->pUserFn = askEpFn;
  pParam->pParam = param;
2411 2412 2413 2414 2415

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pParam);
    taosMemoryFree(pReq);
2416 2417
    askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
    return;
2418 2419
  }

X
Xiaoyu Wang 已提交
2420
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
2421 2422 2423 2424

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
2425
  sendInfo->fp = askEpCallbackFn;
2426 2427
  sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;

2428 2429
  SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
  tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
2430 2431

  int64_t transporterId = 0;
2432
  asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
2433 2434 2435 2436 2437 2438 2439
}

int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
2440 2441 2442
  int64_t refId = pParamSet->refId;

  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
2443 2444 2445 2446 2447 2448 2449
  if (tmq == NULL) {
    taosMemoryFree(pParamSet);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  // if no more waiting rsp
2450
  pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam);
2451
  taosMemoryFree(pParamSet);
2452 2453

  taosReleaseRef(tmqMgmt.rsetId, refId);
2454
  return 0;
2455 2456
}

2457
void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) {
2458 2459
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
  if (waitingRspNum == 0) {
H
Haojun Liao 已提交
2460 2461
    tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic,
             vgId);
2462
    tmqCommitDone(pParamSet);
H
Haojun Liao 已提交
2463 2464 2465
  } else {
    tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId,
             waitingRspNum);
2466 2467
  }
}
2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489

SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) {
  SMqRspObj* pRspObj = (SMqRspObj*)res;
  pRspObj->resIter++;

  if (pRspObj->resIter < pRspObj->rsp.blockNum) {
    SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter);
    if (pRspObj->rsp.withSchema) {
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRspObj->rsp.blockSchema, pRspObj->resIter);
      setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols);
      taosMemoryFreeClear(pRspObj->resInfo.row);
      taosMemoryFreeClear(pRspObj->resInfo.pCol);
      taosMemoryFreeClear(pRspObj->resInfo.length);
      taosMemoryFreeClear(pRspObj->resInfo.convertBuf);
      taosMemoryFreeClear(pRspObj->resInfo.convertJson);
    }

    setQueryResultFromRsp(&pRspObj->resInfo, pRetrieve, convertUcs4, false);
    return &pRspObj->resInfo;
  }

  return NULL;
H
Haojun Liao 已提交
2490 2491
}

2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512
static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
  SMqVgWalInfoParam* pParam = param;
  SMqVgCommon* pCommon = pParam->pCommon;

  int32_t total = atomic_add_fetch_32(&pCommon->numOfRsp, 1);
  if (code != TSDB_CODE_SUCCESS) {
    tscError("consumer:0x%" PRIx64 " failed to get the wal info from vgId:%d for topic:%s", pCommon->consumerId,
             pParam->vgId, pCommon->pTopicName);
    pCommon->code = code;
  } else {
    SMqDataRsp rsp;
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeMqDataRsp(&decoder, &rsp);
    tDecoderClear(&decoder);

    SMqRspHead* pHead = pMsg->pData;

    tmq_topic_assignment assignment = {.begin = pHead->walsver,
                                       .end = pHead->walever,
                                       .currentOffset = rsp.rspOffset.version,
2513
                                       .vgId = pParam->vgId};
2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535

    taosThreadMutexLock(&pCommon->mutex);
    taosArrayPush(pCommon->pList, &assignment);
    taosThreadMutexUnlock(&pCommon->mutex);
  }

  if (total == pParam->totalReq) {
    tsem_post(&pCommon->rsp);
  }

  taosMemoryFree(pParam);
  return 0;
}

static void destroyCommonInfo(SMqVgCommon* pCommon) {
  taosArrayDestroy(pCommon->pList);
  tsem_destroy(&pCommon->rsp);
  taosThreadMutexDestroy(&pCommon->mutex);
  taosMemoryFree(pCommon->pTopicName);
  taosMemoryFree(pCommon);
}

H
Haojun Liao 已提交
2536
int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment,
H
Haojun Liao 已提交
2537
                                 int32_t* numOfAssignment) {
H
Haojun Liao 已提交
2538 2539 2540
  *numOfAssignment = 0;
  *assignment = NULL;

2541
  int32_t accId = tmq->pTscObj->acctId;
2542
  char    tname[128] = {0};
2543 2544 2545
  sprintf(tname, "%d.%s", accId, pTopicName);

  SMqClientTopic* pTopic = getTopicByName(tmq, tname);
H
Haojun Liao 已提交
2546 2547 2548 2549 2550 2551
  if (pTopic == NULL) {
    return TSDB_CODE_INVALID_PARA;
  }

  // in case of snapshot is opened, no valid offset will return
  *numOfAssignment = taosArrayGetSize(pTopic->vgs);
2552 2553 2554 2555 2556 2557 2558 2559

  *assignment = taosMemoryCalloc(*numOfAssignment, sizeof(tmq_topic_assignment));
  if (*assignment == NULL) {
    tscError("consumer:0x%" PRIx64 " failed to malloc buffer, size:%" PRIzu, tmq->consumerId,
             (*numOfAssignment) * sizeof(tmq_topic_assignment));
    return TSDB_CODE_OUT_OF_MEMORY;
  }

2560 2561
  bool needFetch = false;

H
Haojun Liao 已提交
2562 2563
  for (int32_t j = 0; j < (*numOfAssignment); ++j) {
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
2564
    if (!pClientVg->receivedInfoFromVnode) {
2565 2566 2567
      needFetch = true;
      break;
    }
H
Haojun Liao 已提交
2568 2569 2570 2571 2572 2573 2574 2575 2576 2577

    tmq_topic_assignment* pAssignment = &(*assignment)[j];
    if (pClientVg->offsetInfo.currentOffset.type == TMQ_OFFSET__LOG) {
      pAssignment->currentOffset = pClientVg->offsetInfo.currentOffset.version;
    } else {
      pAssignment->currentOffset = 0;
    }

    pAssignment->begin = pClientVg->offsetInfo.walVerBegin;
    pAssignment->end = pClientVg->offsetInfo.walVerEnd;
2578
    pAssignment->vgId = pClientVg->vgId;
H
Haojun Liao 已提交
2579 2580
  }

2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648
  if (needFetch) {
    SMqVgCommon* pCommon = taosMemoryCalloc(1, sizeof(SMqVgCommon));
    if (pCommon == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return terrno;
    }

    pCommon->pList= taosArrayInit(4, sizeof(tmq_topic_assignment));
    tsem_init(&pCommon->rsp, 0, 0);
    taosThreadMutexInit(&pCommon->mutex, 0);
    pCommon->pTopicName = taosStrdup(pTopic->topicName);
    pCommon->consumerId = tmq->consumerId;

    terrno = TSDB_CODE_OUT_OF_MEMORY;
    for (int32_t i = 0; i < (*numOfAssignment); ++i) {
      SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);

      SMqVgWalInfoParam* pParam = taosMemoryMalloc(sizeof(SMqVgWalInfoParam));
      if (pParam == NULL) {
        destroyCommonInfo(pCommon);
        return terrno;
      }

      pParam->epoch = tmq->epoch;
      pParam->vgId = pClientVg->vgId;
      pParam->totalReq = *numOfAssignment;
      pParam->pCommon = pCommon;

      SMqPollReq req = {0};
      tmqBuildConsumeReqImpl(&req, tmq, 10, pTopic, pClientVg);

      int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
      if (msgSize < 0) {
        taosMemoryFree(pParam);
        destroyCommonInfo(pCommon);
        return terrno;
      }

      char* msg = taosMemoryCalloc(1, msgSize);
      if (NULL == msg) {
        taosMemoryFree(pParam);
        destroyCommonInfo(pCommon);
        return terrno;
      }

      if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
        taosMemoryFree(msg);
        taosMemoryFree(pParam);
        destroyCommonInfo(pCommon);
        return terrno;
      }

      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
      if (sendInfo == NULL) {
        taosMemoryFree(pParam);
        taosMemoryFree(msg);
        destroyCommonInfo(pCommon);
        return terrno;
      }

      sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
      sendInfo->requestId = req.reqId;
      sendInfo->requestObjRefId = 0;
      sendInfo->param = pParam;
      sendInfo->fp = tmqGetWalInfoCb;
      sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO;

      int64_t transporterId = 0;
2649
      char    offsetFormatBuf[TSDB_OFFSET_LEN];
2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662
      tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.currentOffset);

      tscDebug("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64,
               tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo);
    }

    tsem_wait(&pCommon->rsp);
    int32_t code = pCommon->code;

    terrno = code;
    if (code != TSDB_CODE_SUCCESS) {
      taosMemoryFree(*assignment);
2663
      *assignment = NULL;
2664 2665 2666 2667 2668 2669 2670 2671 2672
      *numOfAssignment = 0;
    } else {
      int32_t num = taosArrayGetSize(pCommon->pList);
      for(int32_t i = 0; i < num; ++i) {
        (*assignment)[i] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i);
      }
      *numOfAssignment = num;
    }

2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685
    for (int32_t j = 0; j < (*numOfAssignment); ++j) {
      tmq_topic_assignment* p = &(*assignment)[j];

      for(int32_t i = 0; i < taosArrayGetSize(pTopic->vgs); ++i) {
        SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
        if (pClientVg->vgId != p->vgId) {
          continue;
        }

        SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;

        pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG;

2686
        char offsetBuf[TSDB_OFFSET_LEN] = {0};
2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697
        tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset);

        tscDebug("vgId:%d offset is update to:%s", p->vgId, offsetBuf);

        pOffsetInfo->walVerBegin = p->begin;
        pOffsetInfo->walVerEnd = p->end;
        pOffsetInfo->currentOffset.version = p->currentOffset;
        pOffsetInfo->committedOffset.version = p->currentOffset;
      }
    }

2698 2699 2700 2701 2702
    destroyCommonInfo(pCommon);
    return code;
  } else {
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
2703 2704
}

T
t_max 已提交
2705 2706 2707 2708 2709 2710 2711 2712
void tmq_free_assignment(tmq_topic_assignment* pAssignment) {
    if (pAssignment == NULL) {
        return;
    }

    taosMemoryFree(pAssignment);
}

2713
int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
H
Haojun Liao 已提交
2714
  if (tmq == NULL) {
H
Haojun Liao 已提交
2715
    tscError("invalid tmq handle, null");
H
Haojun Liao 已提交
2716 2717 2718
    return TSDB_CODE_INVALID_PARA;
  }

2719 2720 2721 2722 2723
  int32_t accId = tmq->pTscObj->acctId;
  char tname[128] = {0};
  sprintf(tname, "%d.%s", accId, pTopicName);

  SMqClientTopic* pTopic = getTopicByName(tmq, tname);
H
Haojun Liao 已提交
2724
  if (pTopic == NULL) {
2725
    tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
H
Haojun Liao 已提交
2726 2727 2728 2729
    return TSDB_CODE_INVALID_PARA;
  }

  SMqClientVg* pVg = NULL;
H
Haojun Liao 已提交
2730 2731
  int32_t      numOfVgs = taosArrayGetSize(pTopic->vgs);
  for (int32_t i = 0; i < numOfVgs; ++i) {
H
Haojun Liao 已提交
2732
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
2733
    if (pClientVg->vgId == vgId) {
H
Haojun Liao 已提交
2734 2735 2736 2737 2738 2739
      pVg = pClientVg;
      break;
    }
  }

  if (pVg == NULL) {
2740
    tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgId);
H
Haojun Liao 已提交
2741 2742 2743 2744 2745 2746
    return TSDB_CODE_INVALID_PARA;
  }

  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;

  int32_t type = pOffsetInfo->currentOffset.type;
2747
  if (type != TMQ_OFFSET__LOG && !OFFSET_IS_RESET_OFFSET(type)) {
2748
    tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type);
H
Haojun Liao 已提交
2749 2750 2751
    return TSDB_CODE_INVALID_PARA;
  }

2752
  if (type == TMQ_OFFSET__LOG && (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd)) {
H
Haojun Liao 已提交
2753 2754
    tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]",
             tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd);
H
Haojun Liao 已提交
2755 2756 2757
    return TSDB_CODE_INVALID_PARA;
  }

H
Haojun Liao 已提交
2758 2759 2760
  // update the offset, and then commit to vnode
  if (pOffsetInfo->currentOffset.type == TMQ_OFFSET__LOG) {
    pOffsetInfo->currentOffset.version = offset;
2761
    pOffsetInfo->committedOffset.version = INT64_MIN;
2762
    pVg->seekUpdated = true;
H
Haojun Liao 已提交
2763 2764
  }

H
Haojun Liao 已提交
2765
  SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId};
2766
  tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic));
H
Haojun Liao 已提交
2767

H
Haojun Liao 已提交
2768
  tscDebug("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, pVg->vgId);
2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786

  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
  if (pInfo == NULL) {
    tscError("consumer:0x%"PRIx64" failed to prepare seek operation", tmq->consumerId);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  tsem_init(&pInfo->sem, 0, 0);
  pInfo->code = 0;

  asyncCommitOffset(tmq, &rspObj, TDMT_VND_TMQ_SEEK_TO_OFFSET, commitCallBackFn, pInfo);

  tsem_wait(&pInfo->sem);
  int32_t code = pInfo->code;

  tsem_destroy(&pInfo->sem);
  taosMemoryFree(pInfo);

H
Haojun Liao 已提交
2787 2788 2789 2790 2791 2792
  if (code != TSDB_CODE_SUCCESS) {
    tscError("consumer:0x%" PRIx64 " failed to send seek to vgId:%d, code:%s", tmq->consumerId, pVg->vgId,
             tstrerror(code));
  }

  return code;
2793
}