clientTmq.c 102.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "cJSON.h"
17 18 19
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22
#include "tdef.h"
#include "tglobal.h"
X
Xiaoyu Wang 已提交
23
#include "tqueue.h"
24
#include "tref.h"
L
Liu Jicong 已提交
25 26
#include "ttimer.h"

X
Xiaoyu Wang 已提交
27 28
#define EMPTY_BLOCK_POLL_IDLE_DURATION 10
#define DEFAULT_AUTO_COMMIT_INTERVAL   5000
29

30 31
#define OFFSET_IS_RESET_OFFSET(_of)  ((_of) < 0)

32 33
typedef void (*__tmq_askep_fn_t)(tmq_t* pTmq, int32_t code, SDataBuf* pBuf, void* pParam);

X
Xiaoyu Wang 已提交
34
struct SMqMgmt {
35 36 37
  int8_t  inited;
  tmr_h   timer;
  int32_t rsetId;
38
};
L
Liu Jicong 已提交
39

X
Xiaoyu Wang 已提交
40 41
static TdThreadOnce   tmqInit = PTHREAD_ONCE_INIT;  // initialize only once
volatile int32_t      tmqInitRes = 0;               // initialize rsp code
42
static struct SMqMgmt tmqMgmt = {0};
43

L
Liu Jicong 已提交
44 45 46 47 48 49
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
50 51 52
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
53 54
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
55
struct tmq_list_t {
L
Liu Jicong 已提交
56
  SArray container;
L
Liu Jicong 已提交
57
};
L
Liu Jicong 已提交
58

L
Liu Jicong 已提交
59
struct tmq_conf_t {
60 61 62 63 64 65 66 67
  char           clientId[256];
  char           groupId[TSDB_CGROUP_LEN];
  int8_t         autoCommit;
  int8_t         resetOffset;
  int8_t         withTbName;
  int8_t         snapEnable;
  int32_t        snapBatchSize;
  bool           hbBgEnable;
68 69 70 71 72
  uint16_t       port;
  int32_t        autoCommitInterval;
  char*          ip;
  char*          user;
  char*          pass;
73
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
74
  void*          commitCbUserParam;
L
Liu Jicong 已提交
75 76 77
};

struct tmq_t {
78 79 80 81 82 83 84
  int64_t        refId;
  char           groupId[TSDB_CGROUP_LEN];
  char           clientId[256];
  int8_t         withTbName;
  int8_t         useSnapshot;
  int8_t         autoCommit;
  int32_t        autoCommitInterval;
85
  int8_t         resetOffsetCfg;
86 87
  uint64_t       consumerId;
  bool           hbBgEnable;
L
Liu Jicong 已提交
88 89
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
90 91

  // status
wmmhello's avatar
wmmhello 已提交
92
  SRWLatch        lock;
L
Liu Jicong 已提交
93 94
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
95 96
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
97
  int32_t epSkipCnt;
L
Liu Jicong 已提交
98
#endif
99
  // poll info
X
Xiaoyu Wang 已提交
100 101
  int64_t pollCnt;
  int64_t totalRows;
wmmhello's avatar
wmmhello 已提交
102
//  bool    needReportOffsetRows;
L
Liu Jicong 已提交
103

L
Liu Jicong 已提交
104
  // timer
X
Xiaoyu Wang 已提交
105 106 107 108 109 110 111 112 113 114
  tmr_h       hbLiveTimer;
  tmr_h       epTimer;
  tmr_h       reportTimer;
  tmr_h       commitTimer;
  STscObj*    pTscObj;       // connection
  SArray*     clientTopics;  // SArray<SMqClientTopic>
  STaosQueue* mqueue;        // queue of rsp
  STaosQall*  qall;
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit
  tsem_t      rspSem;
L
Liu Jicong 已提交
115 116
};

117 118
typedef struct SAskEpInfo {
  int32_t code;
H
Haojun Liao 已提交
119
  tsem_t  sem;
120 121
} SAskEpInfo;

X
Xiaoyu Wang 已提交
122 123 124 125 126 127 128 129
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
130
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
131
  TMQ_CONSUMER_STATUS__RECOVER,
L
Liu Jicong 已提交
132 133
};

L
Liu Jicong 已提交
134
enum {
135
  TMQ_DELAYED_TASK__ASK_EP = 1,
L
Liu Jicong 已提交
136 137 138 139
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

H
Haojun Liao 已提交
140
typedef struct SVgOffsetInfo {
L
Liu Jicong 已提交
141
  STqOffsetVal committedOffset;
142 143
  STqOffsetVal endOffset;        // the last version in TAOS_RES + 1
  STqOffsetVal beginOffset;      // the first version in TAOS_RES
H
Haojun Liao 已提交
144 145 146 147 148 149 150 151 152 153
  int64_t      walVerBegin;
  int64_t      walVerEnd;
} SVgOffsetInfo;

typedef struct {
  int64_t       pollCnt;
  int64_t       numOfRows;
  SVgOffsetInfo offsetInfo;
  int32_t       vgId;
  int32_t       vgStatus;
H
Haojun Liao 已提交
154
  int32_t       vgSkipCnt;              // here used to mark the slow vgroups
155
//  bool          receivedInfoFromVnode;  // has already received info from vnode
H
Haojun Liao 已提交
156 157
  int64_t       emptyBlockReceiveTs;    // once empty block is received, idle for ignoreCnt then start to poll data
  bool          seekUpdated;            // offset is updated by seek operator, therefore, not update by vnode rsp.
H
Haojun Liao 已提交
158
  SEpSet        epSet;
159 160
} SMqClientVg;

L
Liu Jicong 已提交
161
typedef struct {
162 163 164
  char           topicName[TSDB_TOPIC_FNAME_LEN];
  char           db[TSDB_DB_FNAME_LEN];
  SArray*        vgs;  // SArray<SMqClientVg>
L
Liu Jicong 已提交
165
  SSchemaWrapper schema;
166 167
} SMqClientTopic;

L
Liu Jicong 已提交
168 169
typedef struct {
  int8_t          tmqRspType;
170
  int32_t         epoch;  // epoch can be used to guard the vgHandle
171
  int32_t         vgId;
wmmhello's avatar
wmmhello 已提交
172
  char            topicName[TSDB_TOPIC_FNAME_LEN];
L
Liu Jicong 已提交
173 174
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
H
Haojun Liao 已提交
175
  uint64_t        reqId;
176
  SEpSet*         pEpset;
L
Liu Jicong 已提交
177
  union {
L
Liu Jicong 已提交
178 179
    SMqDataRsp dataRsp;
    SMqMetaRsp metaRsp;
L
Liu Jicong 已提交
180
    STaosxRsp  taosxRsp;
L
Liu Jicong 已提交
181
  };
L
Liu Jicong 已提交
182 183
} SMqPollRspWrapper;

L
Liu Jicong 已提交
184
typedef struct {
wmmhello's avatar
wmmhello 已提交
185 186
//  int64_t refId;
//  int32_t epoch;
L
Liu Jicong 已提交
187 188
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
189
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
190

L
Liu Jicong 已提交
191
typedef struct {
192 193 194 195
  int64_t          refId;
  int32_t          epoch;
  void*            pParam;
  __tmq_askep_fn_t pUserFn;
196 197
} SMqAskEpCbParam;

L
Liu Jicong 已提交
198
typedef struct {
199 200
  int64_t         refId;
  int32_t         epoch;
wmmhello's avatar
wmmhello 已提交
201 202 203
  char            topicName[TSDB_TOPIC_FNAME_LEN];
//  SMqClientVg*    pVg;
//  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
204
  int32_t         vgId;
X
Xiaoyu Wang 已提交
205
  uint64_t        requestId;  // request id for debug purpose
X
Xiaoyu Wang 已提交
206
} SMqPollCbParam;
207

208 209 210 211 212 213 214 215 216 217
typedef struct SMqVgCommon {
  tsem_t        rsp;
  int32_t       numOfRsp;
  SArray*       pList;
  TdThreadMutex mutex;
  int64_t       consumerId;
  char*         pTopicName;
  int32_t       code;
} SMqVgCommon;

218 219 220 221 222
typedef struct SMqSeekParam {
  tsem_t        sem;
  int32_t       code;
} SMqSeekParam;

223 224 225 226 227 228
typedef struct SMqCommittedParam {
  tsem_t        sem;
  int32_t       code;
  SMqVgOffset   vgOffset;
} SMqCommittedParam;

229 230 231 232 233 234 235
typedef struct SMqVgWalInfoParam {
  int32_t      vgId;
  int32_t      epoch;
  int32_t      totalReq;
  SMqVgCommon* pCommon;
} SMqVgWalInfoParam;

236
typedef struct {
237 238
  int64_t        refId;
  int32_t        epoch;
L
Liu Jicong 已提交
239 240
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
241
  int32_t        code;
242
  tmq_commit_cb* callbackFn;
L
Liu Jicong 已提交
243 244
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
X
Xiaoyu Wang 已提交
245
  void* userParam;
246 247 248 249
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
250
//  SMqVgOffset*         pOffset;
H
Haojun Liao 已提交
251 252 253
  char                 topicName[TSDB_TOPIC_FNAME_LEN];
  int32_t              vgId;
  tmq_t*               pTmq;
254
} SMqCommitCbParam;
255

256 257 258 259 260
typedef struct SSyncCommitInfo {
  tsem_t  sem;
  int32_t code;
} SSyncCommitInfo;

261
static int32_t doAskEp(tmq_t* tmq);
262 263
static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg);
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet);
264
static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName, SMqCommitCbParamSet* pParamSet);
X
Xiaoyu Wang 已提交
265 266 267
static void    commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId);
static void    asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param);
static void    addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param);
268

269
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
270
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
271 272 273 274 275
  if (conf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return conf;
  }

276
  conf->withTbName = false;
L
Liu Jicong 已提交
277
  conf->autoCommit = true;
278
  conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL;
279
  conf->resetOffset = TMQ_OFFSET__RESET_EARLIEST;
280
  conf->hbBgEnable = true;
281

282 283 284
  return conf;
}

L
Liu Jicong 已提交
285
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
286
  if (conf) {
287 288 289 290 291 292 293 294 295
    if (conf->ip) {
      taosMemoryFree(conf->ip);
    }
    if (conf->user) {
      taosMemoryFree(conf->user);
    }
    if (conf->pass) {
      taosMemoryFree(conf->pass);
    }
L
Liu Jicong 已提交
296 297
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
298 299 300
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
301 302 303
  if (conf == NULL || key == NULL || value == NULL){
    return TMQ_CONF_INVALID;
  }
304
  if (strcasecmp(key, "group.id") == 0) {
L
Liu Jicong 已提交
305
    tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN);
L
Liu Jicong 已提交
306
    return TMQ_CONF_OK;
307
  }
L
Liu Jicong 已提交
308

309
  if (strcasecmp(key, "client.id") == 0) {
L
Liu Jicong 已提交
310
    tstrncpy(conf->clientId, value, 256);
L
Liu Jicong 已提交
311 312
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
313

314 315
  if (strcasecmp(key, "enable.auto.commit") == 0) {
    if (strcasecmp(value, "true") == 0) {
L
Liu Jicong 已提交
316
      conf->autoCommit = true;
L
Liu Jicong 已提交
317
      return TMQ_CONF_OK;
318
    } else if (strcasecmp(value, "false") == 0) {
L
Liu Jicong 已提交
319
      conf->autoCommit = false;
L
Liu Jicong 已提交
320 321 322 323
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
324
  }
L
Liu Jicong 已提交
325

326
  if (strcasecmp(key, "auto.commit.interval.ms") == 0) {
327
    conf->autoCommitInterval = taosStr2int64(value);
L
Liu Jicong 已提交
328 329 330
    return TMQ_CONF_OK;
  }

331 332 333
  if (strcasecmp(key, "auto.offset.reset") == 0) {
    if (strcasecmp(value, "none") == 0) {
      conf->resetOffset = TMQ_OFFSET__RESET_NONE;
L
Liu Jicong 已提交
334
      return TMQ_CONF_OK;
335
    } else if (strcasecmp(value, "earliest") == 0) {
336
      conf->resetOffset = TMQ_OFFSET__RESET_EARLIEST;
L
Liu Jicong 已提交
337
      return TMQ_CONF_OK;
338 339
    } else if (strcasecmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
L
Liu Jicong 已提交
340 341 342 343 344
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
345

346 347
  if (strcasecmp(key, "msg.with.table.name") == 0) {
    if (strcasecmp(value, "true") == 0) {
348
      conf->withTbName = true;
L
Liu Jicong 已提交
349
      return TMQ_CONF_OK;
350
    } else if (strcasecmp(value, "false") == 0) {
351
      conf->withTbName = false;
L
Liu Jicong 已提交
352
      return TMQ_CONF_OK;
353 354 355 356 357
    } else {
      return TMQ_CONF_INVALID;
    }
  }

358 359
  if (strcasecmp(key, "experimental.snapshot.enable") == 0) {
    if (strcasecmp(value, "true") == 0) {
L
Liu Jicong 已提交
360
      conf->snapEnable = true;
361
      return TMQ_CONF_OK;
362
    } else if (strcasecmp(value, "false") == 0) {
L
Liu Jicong 已提交
363
      conf->snapEnable = false;
364 365 366 367 368 369
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

370
  if (strcasecmp(key, "experimental.snapshot.batch.size") == 0) {
371
    conf->snapBatchSize = taosStr2int64(value);
L
Liu Jicong 已提交
372 373 374
    return TMQ_CONF_OK;
  }

375
//  if (strcasecmp(key, "enable.heartbeat.background") == 0) {
X
Xiaoyu Wang 已提交
376 377 378 379 380 381 382
    //    if (strcasecmp(value, "true") == 0) {
    //      conf->hbBgEnable = true;
    //      return TMQ_CONF_OK;
    //    } else if (strcasecmp(value, "false") == 0) {
    //      conf->hbBgEnable = false;
    //      return TMQ_CONF_OK;
    //    } else {
383 384
//    tscError("the default value of enable.heartbeat.background is true, can not be seted");
//    return TMQ_CONF_INVALID;
X
Xiaoyu Wang 已提交
385
    //    }
386
//  }
L
Liu Jicong 已提交
387

388
  if (strcasecmp(key, "td.connect.ip") == 0) {
389
    conf->ip = taosStrdup(value);
L
Liu Jicong 已提交
390 391
    return TMQ_CONF_OK;
  }
392

393
  if (strcasecmp(key, "td.connect.user") == 0) {
394
    conf->user = taosStrdup(value);
L
Liu Jicong 已提交
395 396
    return TMQ_CONF_OK;
  }
397

398
  if (strcasecmp(key, "td.connect.pass") == 0) {
399
    conf->pass = taosStrdup(value);
L
Liu Jicong 已提交
400 401
    return TMQ_CONF_OK;
  }
402

403
  if (strcasecmp(key, "td.connect.port") == 0) {
404
    conf->port = taosStr2int64(value);
L
Liu Jicong 已提交
405 406
    return TMQ_CONF_OK;
  }
407

408
  if (strcasecmp(key, "td.connect.db") == 0) {
L
Liu Jicong 已提交
409 410 411
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
412
  return TMQ_CONF_UNKNOWN;
413 414
}

X
Xiaoyu Wang 已提交
415
tmq_list_t* tmq_list_new() { return (tmq_list_t*)taosArrayInit(0, sizeof(void*)); }
416

L
Liu Jicong 已提交
417
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
418
  if(list == NULL) return -1;
L
Liu Jicong 已提交
419
  SArray* container = &list->container;
420
  if (src == NULL || src[0] == 0) return -1;
421
  char* topic = taosStrdup(src);
L
fix  
Liu Jicong 已提交
422
  if (taosArrayPush(container, &topic) == NULL) return -1;
423 424 425
  return 0;
}

L
Liu Jicong 已提交
426
void tmq_list_destroy(tmq_list_t* list) {
427
  if(list == NULL) return;
L
Liu Jicong 已提交
428
  SArray* container = &list->container;
L
Liu Jicong 已提交
429
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
430 431
}

L
Liu Jicong 已提交
432
int32_t tmq_list_get_size(const tmq_list_t* list) {
433
  if(list == NULL) return -1;
L
Liu Jicong 已提交
434 435 436 437 438
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
439
  if(list == NULL) return NULL;
L
Liu Jicong 已提交
440 441 442 443
  const SArray* container = &list->container;
  return container->pData;
}

H
Haojun Liao 已提交
444
static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
445
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
446
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
H
Haojun Liao 已提交
447

448
//  taosMemoryFree(pParam->pOffset);
L
Liu Jicong 已提交
449
  taosMemoryFree(pBuf->pData);
dengyihao's avatar
dengyihao 已提交
450
  taosMemoryFree(pBuf->pEpSet);
L
Liu Jicong 已提交
451

452
  commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
453 454 455
  return 0;
}

456
static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName, SMqCommitCbParamSet* pParamSet) {
457
  SMqVgOffset pOffset = {0};
458

459 460
  pOffset.consumerId = tmq->consumerId;
  pOffset.offset.val = *offset;
461

L
Liu Jicong 已提交
462
  int32_t groupLen = strlen(tmq->groupId);
463 464 465
  memcpy(pOffset.offset.subKey, tmq->groupId, groupLen);
  pOffset.offset.subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pOffset.offset.subKey + groupLen + 1, pTopicName);
L
Liu Jicong 已提交
466

467 468
  int32_t len = 0;
  int32_t code = 0;
469
  tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code);
L
Liu Jicong 已提交
470
  if (code < 0) {
471
    return TSDB_CODE_INVALID_PARA;
L
Liu Jicong 已提交
472
  }
473

L
Liu Jicong 已提交
474
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
L
Liu Jicong 已提交
475
  if (buf == NULL) {
476
    return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
477
  }
478

479
  ((SMsgHead*)buf)->vgId = htonl(vgId);
L
Liu Jicong 已提交
480

L
Liu Jicong 已提交
481 482 483 484
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
485
  tEncodeMqVgOffset(&encoder, &pOffset);
L
Liu Jicong 已提交
486
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
487 488

  // build param
489
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
490 491
  if (pParam == NULL) {
    taosMemoryFree(buf);
492
    return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
493
  }
494

L
Liu Jicong 已提交
495
  pParam->params = pParamSet;
496
//  pParam->pOffset = pOffset;
497
  pParam->vgId = vgId;
H
Haojun Liao 已提交
498 499
  pParam->pTmq = tmq;

H
Haojun Liao 已提交
500
  tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName));
L
Liu Jicong 已提交
501 502 503 504

  // build send info
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
L
Liu Jicong 已提交
505 506
    taosMemoryFree(buf);
    taosMemoryFree(pParam);
507
    return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
508
  }
509

510
  pMsgSendInfo->msgInfo = (SDataBuf) { .pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL };
L
Liu Jicong 已提交
511 512 513 514

  pMsgSendInfo->requestId = generateRequestId();
  pMsgSendInfo->requestObjRefId = 0;
  pMsgSendInfo->param = pParam;
L
Liu Jicong 已提交
515
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
516
  pMsgSendInfo->fp = tmqCommitCb;
517
  pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET;
L
Liu Jicong 已提交
518

L
Liu Jicong 已提交
519 520 521
  atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
  atomic_add_fetch_32(&pParamSet->totalRspNum, 1);

522
  SEp* pEp = GET_ACTIVE_EP(epSet);
523

H
Haojun Liao 已提交
524

L
Liu Jicong 已提交
525
  int64_t transporterId = 0;
526
  return asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo);
L
Liu Jicong 已提交
527 528
}

H
Haojun Liao 已提交
529 530 531 532 533 534 535 536 537 538 539
static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) {
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < numOfTopics; ++i) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(pTopic->topicName, pTopicName) != 0) {
      continue;
    }

    return pTopic;
  }

H
Haojun Liao 已提交
540
  tscError("consumer:0x%" PRIx64 ", total:%d, failed to find topic:%s", tmq->consumerId, numOfTopics, pTopicName);
H
Haojun Liao 已提交
541 542 543
  return NULL;
}

544
static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, int32_t rspNum){
L
Liu Jicong 已提交
545 546
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
547
    return NULL;
L
Liu Jicong 已提交
548
  }
H
Haojun Liao 已提交
549

550 551
  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;
552
  pParamSet->callbackFn = pCommitFp;
L
Liu Jicong 已提交
553
  pParamSet->userParam = userParam;
554
  pParamSet->waitingRspNum = rspNum;
L
Liu Jicong 已提交
555

556 557
  return pParamSet;
}
558

559 560 561


static int32_t getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId, SMqClientVg** pVg){
H
Haojun Liao 已提交
562 563
  SMqClientTopic* pTopic = getTopicByName(tmq, pTopicName);
  if (pTopic == NULL) {
564 565
    tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
    return TSDB_CODE_TMQ_INVALID_TOPIC;
566
  }
L
Liu Jicong 已提交
567

568 569 570 571 572
  int32_t  numOfVgs = taosArrayGetSize(pTopic->vgs);
  for (int32_t i = 0; i < numOfVgs; ++i) {
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
    if (pClientVg->vgId == vgId) {
      *pVg = pClientVg;
573
      break;
L
Liu Jicong 已提交
574
    }
L
Liu Jicong 已提交
575
  }
L
Liu Jicong 已提交
576

577
  return *pVg == NULL ? TSDB_CODE_TMQ_INVALID_VGID : TSDB_CODE_SUCCESS;
578 579 580 581 582 583
}

static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STqOffsetVal* offsetVal, tmq_commit_cb* pCommitFp, void* userParam) {
  int32_t code = 0;
  tscInfo("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId);
  taosRLockLatch(&tmq->lock);
584 585 586
  SMqClientVg* pVg = NULL;
  code = getClientVg(tmq, pTopicName, vgId, &pVg);
  if(code != 0){
587 588
    goto end;
  }
589 590 591 592 593 594
  if (offsetVal->type <= 0 || tOffsetEqual(offsetVal, &pVg->offsetInfo.committedOffset)) {
    code = TSDB_CODE_TMQ_INVALID_MSG;
    goto end;
  }
  char offsetBuf[TSDB_OFFSET_LEN] = {0};
  tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal);
L
Liu Jicong 已提交
595

596 597
  char commitBuf[TSDB_OFFSET_LEN] = {0};
  tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
598

599 600 601 602
  SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0);
  if (pParamSet == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
L
Liu Jicong 已提交
603
  }
604 605 606 607 608 609 610 611 612 613 614
  code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, offsetVal, pTopicName, pParamSet);
  if (code != TSDB_CODE_SUCCESS) {
    tscError("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s",
             tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno));
    taosMemoryFree(pParamSet);
    goto end;
  }

  tscInfo("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s",
          tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf);
  pVg->offsetInfo.committedOffset = *offsetVal;
615 616

end:
wmmhello's avatar
wmmhello 已提交
617
  taosRUnLockLatch(&tmq->lock);
618
  return code;
L
Liu Jicong 已提交
619 620
}

621 622 623 624 625 626 627 628 629
static void asyncCommitFromResult(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* pCommitFp, void* userParam){
  char*   pTopicName = NULL;
  int32_t vgId = 0;
  STqOffsetVal offsetVal = {0};
  int32_t code = 0;

  if (pRes == NULL || tmq == NULL) {
    code = TSDB_CODE_INVALID_PARA;
    goto end;
630
  }
631

632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654
  if (TD_RES_TMQ(pRes)) {
    SMqRspObj* pRspObj = (SMqRspObj*)pRes;
    pTopicName = pRspObj->topic;
    vgId = pRspObj->vgId;
    offsetVal = pRspObj->rsp.rspOffset;
  } else if (TD_RES_TMQ_META(pRes)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)pRes;
    pTopicName = pMetaRspObj->topic;
    vgId = pMetaRspObj->vgId;
    offsetVal = pMetaRspObj->metaRsp.rspOffset;
  } else if (TD_RES_TMQ_METADATA(pRes)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)pRes;
    pTopicName = pRspObj->topic;
    vgId = pRspObj->vgId;
    offsetVal = pRspObj->rsp.rspOffset;
  } else {
    code = TSDB_CODE_TMQ_INVALID_MSG;
    goto end;
  }

  code = asyncCommitOffset(tmq, pTopicName, vgId, &offsetVal, pCommitFp, userParam);

end:
655
  if(code != TSDB_CODE_SUCCESS && pCommitFp != NULL){
656 657 658
    pCommitFp(tmq, code, userParam);
  }
}
659

660 661
static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) {
  int32_t code = 0;
662
  // init as 1 to prevent concurrency issue
663 664 665 666 667
  SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 1);
  if (pParamSet == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
668

wmmhello's avatar
wmmhello 已提交
669
  taosRLockLatch(&tmq->lock);
670
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
671
  tscInfo("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
672 673

  for (int32_t i = 0; i < numOfTopics; i++) {
674
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
675
    int32_t         numOfVgroups = taosArrayGetSize(pTopic->vgs);
L
Liu Jicong 已提交
676

677
    tscInfo("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, numOfVgroups);
678
    for (int32_t j = 0; j < numOfVgroups; j++) {
679 680
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);

681 682 683 684 685 686 687 688
      if (pVg->offsetInfo.endOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.endOffset, &pVg->offsetInfo.committedOffset)) {
        char offsetBuf[TSDB_OFFSET_LEN] = {0};
        tFormatOffset(offsetBuf, tListLen(offsetBuf), &pVg->offsetInfo.endOffset);

        char commitBuf[TSDB_OFFSET_LEN] = {0};
        tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);

        code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, &pVg->offsetInfo.endOffset, pTopic->topicName, pParamSet);
689
        if (code != TSDB_CODE_SUCCESS) {
690 691
          tscError("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s ordinal:%d/%d",
                   tmq->consumerId, pTopic->topicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno), j + 1, numOfVgroups);
L
Liu Jicong 已提交
692 693
          continue;
        }
H
Haojun Liao 已提交
694

695 696 697
        tscInfo("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s, ordinal:%d/%d",
                tmq->consumerId, pTopic->topicName, pVg->vgId, offsetBuf, commitBuf, j + 1, numOfVgroups);
        pVg->offsetInfo.committedOffset = pVg->offsetInfo.endOffset;
698
      } else {
699 700
        tscInfo("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d",
                 tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.endOffset.version, j + 1, numOfVgroups);
701 702 703
      }
    }
  }
wmmhello's avatar
wmmhello 已提交
704
  taosRUnLockLatch(&tmq->lock);
705

706
  tscInfo("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1, numOfTopics);
H
Haojun Liao 已提交
707

708 709 710 711
  // request is sent
  if (pParamSet->totalRspNum != 0) {
    // count down since waiting rsp num init as 1
    commitRspCountDown(pParamSet, tmq->consumerId, "", 0);
712
    return;
L
Liu Jicong 已提交
713 714
  }

715 716
end:
  taosMemoryFree(pParamSet);
717
  if(pCommitFp != NULL) {
718 719
    pCommitFp(tmq, code, userParam);
  }
720
  return;
721 722
}

723 724
static void generateTimedTask(int64_t refId, int32_t type) {
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
wmmhello's avatar
wmmhello 已提交
725 726 727 728 729 730 731 732 733
  if(tmq == NULL) return;

  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
  if(pTaskType == NULL) return;

  *pTaskType = type;
  taosWriteQitem(tmq->delayedTask, pTaskType);
  tsem_post(&tmq->rspSem);
  taosReleaseRef(tmqMgmt.rsetId, refId);
734 735 736 737 738
}

void tmqAssignAskEpTask(void* param, void* tmrId) {
  int64_t refId = *(int64_t*)param;
  generateTimedTask(refId, TMQ_DELAYED_TASK__ASK_EP);
739
  taosMemoryFree(param);
L
Liu Jicong 已提交
740 741 742
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
743
  int64_t refId = *(int64_t*)param;
744
  generateTimedTask(refId, TMQ_DELAYED_TASK__COMMIT);
745
  taosMemoryFree(param);
L
Liu Jicong 已提交
746 747
}

wmmhello's avatar
wmmhello 已提交
748 749 750 751 752 753 754 755 756 757 758 759 760
//void tmqAssignDelayedReportTask(void* param, void* tmrId) {
//  int64_t refId = *(int64_t*)param;
//  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
//  if (tmq != NULL) {
//    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
//    *pTaskType = TMQ_DELAYED_TASK__REPORT;
//    taosWriteQitem(tmq->delayedTask, pTaskType);
//    tsem_post(&tmq->rspSem);
//  }
//
//  taosReleaseRef(tmqMgmt.rsetId, refId);
//  taosMemoryFree(param);
//}
L
Liu Jicong 已提交
761

762
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
dengyihao's avatar
dengyihao 已提交
763 764 765 766
  if (pMsg) {
    taosMemoryFree(pMsg->pData);
    taosMemoryFree(pMsg->pEpSet);
  }
767 768 769 770
  return 0;
}

void tmqSendHbReq(void* param, void* tmrId) {
771
  int64_t refId = *(int64_t*)param;
772

X
Xiaoyu Wang 已提交
773
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
774
  if (tmq == NULL) {
L
Liu Jicong 已提交
775
    taosMemoryFree(param);
776 777
    return;
  }
D
dapan1121 已提交
778 779 780 781

  SMqHbReq req = {0};
  req.consumerId = tmq->consumerId;
  req.epoch = tmq->epoch;
wmmhello's avatar
wmmhello 已提交
782
  taosRLockLatch(&tmq->lock);
wmmhello's avatar
wmmhello 已提交
783
//  if(tmq->needReportOffsetRows){
784 785 786 787 788 789 790 791 792 793 794 795
    req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows));
    for(int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++){
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
      int32_t         numOfVgroups = taosArrayGetSize(pTopic->vgs);
      TopicOffsetRows* data = taosArrayReserve(req.topics, 1);
      strcpy(data->topicName, pTopic->topicName);
      data->offsetRows = taosArrayInit(numOfVgroups, sizeof(OffsetRows));
      for(int j = 0; j < numOfVgroups; j++){
        SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
        OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1);
        offRows->vgId = pVg->vgId;
        offRows->rows = pVg->numOfRows;
796
        offRows->offset = pVg->offsetInfo.beginOffset;
797 798
        char buf[TSDB_OFFSET_LEN] = {0};
        tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
wmmhello's avatar
wmmhello 已提交
799
        tscInfo("consumer:0x%" PRIx64 ",report offset: vgId:%d, offset:%s, rows:%"PRId64, tmq->consumerId, offRows->vgId, buf, offRows->rows);
800
      }
801
    }
wmmhello's avatar
wmmhello 已提交
802 803
//    tmq->needReportOffsetRows = false;
//  }
wmmhello's avatar
wmmhello 已提交
804
  taosRUnLockLatch(&tmq->lock);
D
dapan1121 已提交
805

L
Liu Jicong 已提交
806
  int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
D
dapan1121 已提交
807 808
  if (tlen < 0) {
    tscError("tSerializeSMqHbReq failed");
809
    goto OVER;
D
dapan1121 已提交
810
  }
811

L
Liu Jicong 已提交
812
  void* pReq = taosMemoryCalloc(1, tlen);
D
dapan1121 已提交
813 814
  if (tlen < 0) {
    tscError("failed to malloc MqHbReq msg, size:%d", tlen);
815
    goto OVER;
D
dapan1121 已提交
816
  }
817

D
dapan1121 已提交
818 819 820
  if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) {
    tscError("tSerializeSMqHbReq %d failed", tlen);
    taosMemoryFree(pReq);
821
    goto OVER;
D
dapan1121 已提交
822
  }
823 824 825 826

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
827
    goto OVER;
828
  }
829

830
  sendInfo->msgInfo = (SDataBuf){ .pData = pReq, .len = tlen, .handle = NULL };
831 832 833 834 835

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = NULL;
  sendInfo->fp = tmqHbCb;
L
Liu Jicong 已提交
836
  sendInfo->msgType = TDMT_MND_TMQ_HB;
837 838 839 840 841 842 843

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

OVER:
844
  tDeatroySMqHbReq(&req);
845
  taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer);
846
  taosReleaseRef(tmqMgmt.rsetId, refId);
847 848
}

849 850
static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) {
  if (code != 0) {
851
    tscError("consumer:0x%" PRIx64 ", failed to commit offset, code:%s", pTmq->consumerId, tstrerror(code));
852 853 854
  }
}

855
int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
L
Liu Jicong 已提交
856
  STaosQall* qall = taosAllocateQall();
857
  taosReadAllQitems(pTmq->delayedTask, qall);
L
Liu Jicong 已提交
858

859 860 861 862
  if (qall->numOfItems == 0) {
    taosFreeQall(qall);
    return TSDB_CODE_SUCCESS;
  }
863

X
Xiaoyu Wang 已提交
864
  tscDebug("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, qall->numOfItems);
865 866
  int8_t* pTaskType = NULL;
  taosGetQitem(qall, (void**)&pTaskType);
L
Liu Jicong 已提交
867

868
  while (pTaskType != NULL) {
869
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
870
      asyncAskEp(pTmq, addToQueueCallbackFn, NULL);
871 872

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
873
      *pRefId = pTmq->refId;
874

X
Xiaoyu Wang 已提交
875
      tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
876
      taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer);
L
Liu Jicong 已提交
877
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
X
Xiaoyu Wang 已提交
878
      tmq_commit_cb* pCallbackFn = pTmq->commitCb ? pTmq->commitCb : defaultCommitCbFn;
879 880

      asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam);
881
      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
882
      *pRefId = pTmq->refId;
883

884
      tscDebug("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId,
X
Xiaoyu Wang 已提交
885
               pTmq->autoCommitInterval / 1000.0);
886
      taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer);
L
Liu Jicong 已提交
887 888
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    }
889

L
Liu Jicong 已提交
890
    taosFreeQitem(pTaskType);
891
    taosGetQitem(qall, (void**)&pTaskType);
L
Liu Jicong 已提交
892
  }
893

L
Liu Jicong 已提交
894 895 896 897
  taosFreeQall(qall);
  return 0;
}

898
static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
L
Liu Jicong 已提交
899 900 901 902 903 904 905
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
    // do nothing
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
    SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
    tDeleteSMqAskEpRsp(&pEpRspWrapper->msg);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
906 907
    taosMemoryFreeClear(pRsp->pEpset);

L
Liu Jicong 已提交
908 909 910
    taosArrayDestroyP(pRsp->dataRsp.blockData, taosMemoryFree);
    taosArrayDestroy(pRsp->dataRsp.blockDataLen);
    taosArrayDestroyP(pRsp->dataRsp.blockTbName, taosMemoryFree);
911
    taosArrayDestroyP(pRsp->dataRsp.blockSchema, (FDelete)tDeleteSchemaWrapper);
L
Liu Jicong 已提交
912 913
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
914 915
    taosMemoryFreeClear(pRsp->pEpset);

L
Liu Jicong 已提交
916 917 918
    taosMemoryFree(pRsp->metaRsp.metaRsp);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
919 920
    taosMemoryFreeClear(pRsp->pEpset);

L
Liu Jicong 已提交
921 922 923
    taosArrayDestroyP(pRsp->taosxRsp.blockData, taosMemoryFree);
    taosArrayDestroy(pRsp->taosxRsp.blockDataLen);
    taosArrayDestroyP(pRsp->taosxRsp.blockTbName, taosMemoryFree);
924
    taosArrayDestroyP(pRsp->taosxRsp.blockSchema, (FDelete)tDeleteSchemaWrapper);
L
Liu Jicong 已提交
925 926 927 928
    // taosx
    taosArrayDestroy(pRsp->taosxRsp.createTableLen);
    taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree);
  }
929 930

  return NULL;
L
Liu Jicong 已提交
931 932
}

L
Liu Jicong 已提交
933
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
934
  SMqRspWrapper* rspWrapper = NULL;
L
Liu Jicong 已提交
935
  while (1) {
L
Liu Jicong 已提交
936 937 938 939 940
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper) {
      tmqFreeRspWrapper(rspWrapper);
      taosFreeQitem(rspWrapper);
    } else {
L
Liu Jicong 已提交
941
      break;
L
Liu Jicong 已提交
942
    }
L
Liu Jicong 已提交
943 944
  }

L
Liu Jicong 已提交
945
  rspWrapper = NULL;
L
Liu Jicong 已提交
946 947
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
L
Liu Jicong 已提交
948 949 950 951 952
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper) {
      tmqFreeRspWrapper(rspWrapper);
      taosFreeQitem(rspWrapper);
    } else {
L
Liu Jicong 已提交
953
      break;
L
Liu Jicong 已提交
954
    }
L
Liu Jicong 已提交
955 956 957
  }
}

D
dapan1121 已提交
958
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
959 960
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
dengyihao's avatar
dengyihao 已提交
961 962

  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
963 964 965
  tsem_post(&pParam->rspSem);
  return 0;
}
966

L
Liu Jicong 已提交
967
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
968
  if(tmq == NULL) return TSDB_CODE_INVALID_PARA;
X
Xiaoyu Wang 已提交
969 970 971
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
wmmhello's avatar
wmmhello 已提交
972
  taosRLockLatch(&tmq->lock);
X
Xiaoyu Wang 已提交
973
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
974
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
975
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
976
  }
wmmhello's avatar
wmmhello 已提交
977
  taosRUnLockLatch(&tmq->lock);
L
Liu Jicong 已提交
978
  return 0;
X
Xiaoyu Wang 已提交
979 980
}

L
Liu Jicong 已提交
981
int32_t tmq_unsubscribe(tmq_t* tmq) {
982
  if(tmq == NULL) return TSDB_CODE_INVALID_PARA;
983 984 985 986 987 988 989 990
  if (tmq->autoCommit) {
    int32_t rsp = tmq_commit_sync(tmq, NULL);
    if (rsp != 0) {
      return rsp;
    }
  }
  taosSsleep(2);  // sleep 2s for hb to send offset and rows to server

L
Liu Jicong 已提交
991 992
  int32_t     rsp;
  int32_t     retryCnt = 0;
L
Liu Jicong 已提交
993
  tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
994 995 996 997 998 999 1000 1001 1002 1003
  while (1) {
    rsp = tmq_subscribe(tmq, lst);
    if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
      break;
    } else {
      retryCnt++;
      taosMsleep(500);
    }
  }

L
Liu Jicong 已提交
1004 1005
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
1006 1007
}

1008 1009 1010 1011 1012 1013
static void freeClientVgImpl(void* param) {
  SMqClientTopic* pTopic = param;
  taosMemoryFreeClear(pTopic->schema.pSchema);
  taosArrayDestroy(pTopic->vgs);
}

1014
void tmqFreeImpl(void* handle) {
1015 1016
  tmq_t*  tmq = (tmq_t*)handle;
  int64_t id = tmq->consumerId;
L
Liu Jicong 已提交
1017

1018
  // TODO stop timer
L
Liu Jicong 已提交
1019 1020 1021 1022
  if (tmq->mqueue) {
    tmqClearUnhandleMsg(tmq);
    taosCloseQueue(tmq->mqueue);
  }
L
Liu Jicong 已提交
1023

H
Haojun Liao 已提交
1024 1025 1026 1027 1028
  if (tmq->delayedTask) {
    taosCloseQueue(tmq->delayedTask);
  }

  taosFreeQall(tmq->qall);
1029
  tsem_destroy(&tmq->rspSem);
L
Liu Jicong 已提交
1030

1031
  taosArrayDestroyEx(tmq->clientTopics, freeClientVgImpl);
1032 1033
  taos_close_internal(tmq->pTscObj);
  taosMemoryFree(tmq);
1034 1035

  tscDebug("consumer:0x%" PRIx64 " closed", id);
L
Liu Jicong 已提交
1036 1037
}

1038 1039 1040 1041 1042 1043 1044 1045 1046
static void tmqMgmtInit(void) {
  tmqInitRes = 0;
  tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");

  if (tmqMgmt.timer == NULL) {
    tmqInitRes = TSDB_CODE_OUT_OF_MEMORY;
  }

  tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
1047
  if (tmqMgmt.rsetId < 0) {
1048 1049 1050 1051
    tmqInitRes = terrno;
  }
}

L
Liu Jicong 已提交
1052
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
1053
  if(conf == NULL) return NULL;
1054 1055 1056 1057
  taosThreadOnce(&tmqInit, tmqMgmtInit);
  if (tmqInitRes != 0) {
    terrno = tmqInitRes;
    return NULL;
L
Liu Jicong 已提交
1058 1059
  }

L
Liu Jicong 已提交
1060 1061
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
L
Liu Jicong 已提交
1062
    terrno = TSDB_CODE_OUT_OF_MEMORY;
1063
    tscError("failed to create consumer, groupId:%s, code:%s", conf->groupId, terrstr());
L
Liu Jicong 已提交
1064 1065
    return NULL;
  }
L
Liu Jicong 已提交
1066

L
Liu Jicong 已提交
1067 1068 1069
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

L
Liu Jicong 已提交
1070 1071 1072
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->delayedTask = taosOpenQueue();
H
Haojun Liao 已提交
1073
  pTmq->qall = taosAllocateQall();
L
Liu Jicong 已提交
1074

X
Xiaoyu Wang 已提交
1075 1076
  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL ||
      conf->groupId[0] == 0) {
L
Liu Jicong 已提交
1077
    terrno = TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
1078
    tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
1079
    goto _failed;
L
Liu Jicong 已提交
1080
  }
L
Liu Jicong 已提交
1081

L
Liu Jicong 已提交
1082 1083
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
1084 1085
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
wmmhello's avatar
wmmhello 已提交
1086
//  pTmq->needReportOffsetRows = true;
L
Liu Jicong 已提交
1087

L
Liu Jicong 已提交
1088 1089 1090
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
1091
  pTmq->withTbName = conf->withTbName;
L
Liu Jicong 已提交
1092
  pTmq->useSnapshot = conf->snapEnable;
L
Liu Jicong 已提交
1093
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
1094
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
1095 1096
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
1097
  pTmq->resetOffsetCfg = conf->resetOffset;
wmmhello's avatar
wmmhello 已提交
1098
  taosInitRWLatch(&pTmq->lock);
L
Liu Jicong 已提交
1099

1100 1101
  pTmq->hbBgEnable = conf->hbBgEnable;

L
Liu Jicong 已提交
1102
  // assign consumerId
L
Liu Jicong 已提交
1103
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
1104

L
Liu Jicong 已提交
1105 1106
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
1107
    tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
S
Shengliang Guan 已提交
1108
             pTmq->groupId);
1109
    goto _failed;
L
Liu Jicong 已提交
1110
  }
L
Liu Jicong 已提交
1111

L
Liu Jicong 已提交
1112 1113 1114
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
1115
    tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
L
Liu Jicong 已提交
1116
    tsem_destroy(&pTmq->rspSem);
1117
    goto _failed;
L
Liu Jicong 已提交
1118
  }
L
Liu Jicong 已提交
1119

1120 1121
  pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
  if (pTmq->refId < 0) {
1122
    goto _failed;
1123 1124
  }

1125
  if (pTmq->hbBgEnable) {
L
Liu Jicong 已提交
1126 1127
    int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
    *pRefId = pTmq->refId;
1128
    pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
1129 1130
  }

1131
  char         buf[TSDB_OFFSET_LEN] = {0};
1132 1133
  STqOffsetVal offset = {.type = pTmq->resetOffsetCfg};
  tFormatOffset(buf, tListLen(buf), &offset);
X
Xiaoyu Wang 已提交
1134 1135 1136 1137
  tscInfo("consumer:0x%" PRIx64 " is setup, refId:%" PRId64
          ", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s, backgroudHB:%d",
          pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval,
          buf, pTmq->hbBgEnable);
L
Liu Jicong 已提交
1138

1139
  return pTmq;
1140

1141 1142
_failed:
  tmqFreeImpl(pTmq);
L
Liu Jicong 已提交
1143
  return NULL;
1144 1145
}

L
Liu Jicong 已提交
1146
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
1147
  if(tmq == NULL) return TSDB_CODE_INVALID_PARA;
1148
  const int32_t   MAX_RETRY_COUNT = 120 * 2;  // let's wait for 2 mins at most
L
Liu Jicong 已提交
1149 1150 1151
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
L
Liu Jicong 已提交
1152
  SMsgSendInfo*   sendInfo = NULL;
L
Liu Jicong 已提交
1153
  SCMSubscribeReq req = {0};
1154
  int32_t         code = 0;
1155

1156
  tscInfo("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz);
L
Liu Jicong 已提交
1157

1158
  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
1159
  tstrncpy(req.clientId, tmq->clientId, 256);
L
Liu Jicong 已提交
1160
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
1161 1162
  req.topicNames = taosArrayInit(sz, sizeof(void*));

1163 1164 1165 1166
  if (req.topicNames == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
1167

1168 1169 1170 1171 1172
  req.withTbName = tmq->withTbName;
  req.autoCommit = tmq->autoCommit;
  req.autoCommitInterval = tmq->autoCommitInterval;
  req.resetOffsetCfg = tmq->resetOffsetCfg;

L
Liu Jicong 已提交
1173 1174
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
1175 1176

    SName name = {0};
L
Liu Jicong 已提交
1177 1178 1179 1180
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
1181 1182
    }

1183
    tNameExtractFullName(&name, topicFName);
1184
    tscInfo("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName);
L
Liu Jicong 已提交
1185 1186

    taosArrayPush(req.topicNames, &topicFName);
1187 1188
  }

L
Liu Jicong 已提交
1189
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
1190

L
Liu Jicong 已提交
1191
  buf = taosMemoryMalloc(tlen);
1192 1193 1194 1195
  if (buf == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
L
Liu Jicong 已提交
1196

1197 1198 1199
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

L
Liu Jicong 已提交
1200
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
1201 1202 1203 1204
  if (sendInfo == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
1205

H
Haojun Liao 已提交
1206
  SMqSubscribeCbParam param = { .rspErr = 0};
1207
  if (tsem_init(&param.rspSem, 0, 0) != 0) {
wmmhello's avatar
wmmhello 已提交
1208
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
1209 1210
    goto FAIL;
  }
L
Liu Jicong 已提交
1211

1212
  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
1213

L
Liu Jicong 已提交
1214 1215
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1216 1217
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
1218
  sendInfo->msgType = TDMT_MND_TMQ_SUBSCRIBE;
L
Liu Jicong 已提交
1219

1220 1221 1222 1223 1224
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
1225 1226
  // avoid double free if msg is sent
  buf = NULL;
L
Liu Jicong 已提交
1227
  sendInfo = NULL;
L
Liu Jicong 已提交
1228

L
Liu Jicong 已提交
1229 1230
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
1231

1232 1233 1234 1235
  if (param.rspErr != 0) {
    code = param.rspErr;
    goto FAIL;
  }
L
Liu Jicong 已提交
1236

L
Liu Jicong 已提交
1237
  int32_t retryCnt = 0;
1238
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) {
1239
    if (retryCnt++ > MAX_RETRY_COUNT) {
wmmhello's avatar
wmmhello 已提交
1240
      tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
1241
      code = TSDB_CODE_MND_CONSUMER_NOT_READY;
L
Liu Jicong 已提交
1242 1243
      goto FAIL;
    }
1244

1245
    tscInfo("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
L
Liu Jicong 已提交
1246 1247
    taosMsleep(500);
  }
1248

1249 1250
  // init ep timer
  if (tmq->epTimer == NULL) {
1251 1252 1253
    int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId1 = tmq->refId;
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, pRefId1, tmqMgmt.timer);
1254
  }
L
Liu Jicong 已提交
1255 1256

  // init auto commit timer
1257
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
1258 1259 1260
    int64_t* pRefId2 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId2 = tmq->refId;
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId2, tmqMgmt.timer);
L
Liu Jicong 已提交
1261 1262
  }

L
Liu Jicong 已提交
1263
FAIL:
L
Liu Jicong 已提交
1264
  taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
1265
  taosMemoryFree(buf);
L
Liu Jicong 已提交
1266
  taosMemoryFree(sendInfo);
L
Liu Jicong 已提交
1267

L
Liu Jicong 已提交
1268
  return code;
1269 1270
}

L
Liu Jicong 已提交
1271
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
1272
  if(conf == NULL) return;
1273
  conf->commitCb = cb;
L
Liu Jicong 已提交
1274
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
1275
}
1276

wmmhello's avatar
wmmhello 已提交
1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304
static SMqClientVg* getVgInfo(tmq_t* tmq, char* topicName, int32_t  vgId){
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
  for(int i = 0; i < topicNumCur; i++){
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if(strcmp(pTopicCur->topicName, topicName) == 0){
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
        if(pVgCur->vgId == vgId){
          return pVgCur;
        }
      }
    }
  }
  return NULL;
}

static SMqClientTopic* getTopicInfo(tmq_t* tmq, char* topicName){
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
  for(int i = 0; i < topicNumCur; i++){
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if(strcmp(pTopicCur->topicName, topicName) == 0){
      return pTopicCur;
    }
  }
  return NULL;
}

D
dapan1121 已提交
1305
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1306
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
1307 1308

  int64_t         refId = pParam->refId;
wmmhello's avatar
wmmhello 已提交
1309 1310
//  SMqClientVg*    pVg = pParam->pVg;
//  SMqClientTopic* pTopic = pParam->pTopic;
1311

1312
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
1313 1314 1315
  if (tmq == NULL) {
    taosMemoryFree(pParam);
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1316
    taosMemoryFree(pMsg->pEpSet);
1317 1318 1319 1320
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

H
Haojun Liao 已提交
1321 1322 1323 1324
  int32_t  epoch = pParam->epoch;
  int32_t  vgId = pParam->vgId;
  uint64_t requestId = pParam->requestId;

L
Liu Jicong 已提交
1325
  if (code != 0) {
L
Liu Jicong 已提交
1326
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1327 1328
    if (pMsg->pEpSet) taosMemoryFree(pMsg->pEpSet);

H
Haojun Liao 已提交
1329
    // in case of consumer mismatch, wait for 500ms and retry
L
Liu Jicong 已提交
1330
    if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
1331
//      taosMsleep(500);
L
Liu Jicong 已提交
1332
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
X
Xiaoyu Wang 已提交
1333 1334
      tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, wait for 500ms and set status to be RECOVER",
               tmq->consumerId);
H
Haojun Liao 已提交
1335
    } else if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
S
Shengliang Guan 已提交
1336
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1337
      if (pRspWrapper == NULL) {
H
Haojun Liao 已提交
1338 1339
        tscWarn("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d since out of memory, reqId:0x%" PRIx64,
                tmq->consumerId, vgId, epoch, requestId);
L
Liu Jicong 已提交
1340 1341
        goto CREATE_MSG_FAIL;
      }
H
Haojun Liao 已提交
1342

L
Liu Jicong 已提交
1343 1344
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      taosWriteQitem(tmq->mqueue, pRspWrapper);
1345 1346
//    } else if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) {  // poll data while insert
//      taosMsleep(5);
wmmhello's avatar
wmmhello 已提交
1347 1348 1349
    } else{
      tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d, since %s, reqId:0x%" PRIx64, tmq->consumerId,
               vgId, epoch, tstrerror(code), requestId);
L
Liu Jicong 已提交
1350
    }
H
Haojun Liao 已提交
1351

L
fix txn  
Liu Jicong 已提交
1352
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1353 1354
  }

X
Xiaoyu Wang 已提交
1355
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
1356 1357
  int32_t clientEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < clientEpoch) {
L
Liu Jicong 已提交
1358
    // do not write into queue since updating epoch reset
X
Xiaoyu Wang 已提交
1359 1360
    tscWarn("consumer:0x%" PRIx64
            " msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64,
1361
            tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
H
Haojun Liao 已提交
1362

1363
    tsem_post(&tmq->rspSem);
1364 1365
    taosReleaseRef(tmqMgmt.rsetId, refId);

L
Liu Jicong 已提交
1366
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1367
    taosMemoryFree(pMsg->pEpSet);
wmmhello's avatar
wmmhello 已提交
1368 1369
    taosMemoryFree(pParam);

X
Xiaoyu Wang 已提交
1370 1371 1372
    return 0;
  }

1373
  if (msgEpoch != clientEpoch) {
H
Haojun Liao 已提交
1374
    tscWarn("consumer:0x%" PRIx64 " mismatch rsp from vgId:%d, epoch %d, current epoch %d, reqId:0x%" PRIx64,
1375
            tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
X
Xiaoyu Wang 已提交
1376 1377
  }

L
Liu Jicong 已提交
1378 1379 1380
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

S
Shengliang Guan 已提交
1381
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1382
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1383
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1384
    taosMemoryFree(pMsg->pEpSet);
X
Xiaoyu Wang 已提交
1385 1386
    tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, epoch %d since out of memory", tmq->consumerId, vgId,
            epoch);
L
fix txn  
Liu Jicong 已提交
1387
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1388
  }
L
Liu Jicong 已提交
1389

L
Liu Jicong 已提交
1390
  pRspWrapper->tmqRspType = rspType;
wmmhello's avatar
wmmhello 已提交
1391 1392
//  pRspWrapper->vgHandle = pVg;
//  pRspWrapper->topicHandle = pTopic;
H
Haojun Liao 已提交
1393
  pRspWrapper->reqId = requestId;
1394
  pRspWrapper->pEpset = pMsg->pEpSet;
wmmhello's avatar
wmmhello 已提交
1395 1396
  pRspWrapper->vgId = vgId;
  strcpy(pRspWrapper->topicName, pParam->topicName);
L
Liu Jicong 已提交
1397

1398
  pMsg->pEpSet = NULL;
L
Liu Jicong 已提交
1399
  if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1400 1401
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
1402
    tDecodeMqDataRsp(&decoder, &pRspWrapper->dataRsp);
wmmhello's avatar
wmmhello 已提交
1403
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1404
    memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1405

wmmhello's avatar
wmmhello 已提交
1406
    char buf[TSDB_OFFSET_LEN] = {0};
1407
    tFormatOffset(buf, TSDB_OFFSET_LEN, &pRspWrapper->dataRsp.rspOffset);
H
Haojun Liao 已提交
1408
    tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req ver:%" PRId64 ", rsp:%s type %d, reqId:0x%" PRIx64,
H
Haojun Liao 已提交
1409
             tmq->consumerId, vgId, pRspWrapper->dataRsp.reqOffset.version, buf, rspType, requestId);
L
Liu Jicong 已提交
1410
  } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
1411 1412
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
1413
    tDecodeMqMetaRsp(&decoder, &pRspWrapper->metaRsp);
1414
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1415
    memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1416 1417 1418 1419 1420 1421
  } else if (rspType == TMQ_MSG_TYPE__TAOSX_RSP) {
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp);
    tDecoderClear(&decoder);
    memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead));
X
Xiaoyu Wang 已提交
1422 1423
  } else {  // invalid rspType
    tscError("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType);
L
Liu Jicong 已提交
1424
  }
L
Liu Jicong 已提交
1425

L
Liu Jicong 已提交
1426
  taosMemoryFree(pMsg->pData);
H
Haojun Liao 已提交
1427
  taosWriteQitem(tmq->mqueue, pRspWrapper);
L
Liu Jicong 已提交
1428

1429
  int32_t total = taosQueueItemSize(tmq->mqueue);
H
Haojun Liao 已提交
1430
  tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64,
1431
           tmq->consumerId, rspType, vgId, total, requestId);
H
Haojun Liao 已提交
1432

1433
  tsem_post(&tmq->rspSem);
1434
  taosReleaseRef(tmqMgmt.rsetId, refId);
wmmhello's avatar
wmmhello 已提交
1435
  taosMemoryFree(pParam);
1436

L
Liu Jicong 已提交
1437
  return 0;
H
Haojun Liao 已提交
1438

L
fix txn  
Liu Jicong 已提交
1439
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1440
  if (epoch == tmq->epoch) {
wmmhello's avatar
wmmhello 已提交
1441 1442 1443 1444 1445
    taosWLockLatch(&tmq->lock);
    SMqClientVg* pVg = getVgInfo(tmq, pParam->topicName, vgId);
    if(pVg){
      atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
    }
wmmhello's avatar
wmmhello 已提交
1446
    taosWUnLockLatch(&tmq->lock);
L
Liu Jicong 已提交
1447
  }
H
Haojun Liao 已提交
1448

1449
  tsem_post(&tmq->rspSem);
1450
  taosReleaseRef(tmqMgmt.rsetId, refId);
wmmhello's avatar
wmmhello 已提交
1451
  taosMemoryFree(pParam);
1452

L
Liu Jicong 已提交
1453
  return -1;
1454 1455
}

H
Haojun Liao 已提交
1456
typedef struct SVgroupSaveInfo {
wmmhello's avatar
wmmhello 已提交
1457 1458
  STqOffsetVal currentOffset;
  STqOffsetVal commitOffset;
1459
  STqOffsetVal seekOffset;
H
Haojun Liao 已提交
1460 1461 1462
  int64_t      numOfRows;
} SVgroupSaveInfo;

H
Haojun Liao 已提交
1463 1464 1465 1466 1467 1468
static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap,
                                   tmq_t* tmq) {
  pTopic->schema = pTopicEp->schema;
  pTopicEp->schema.nCols = 0;
  pTopicEp->schema.pSchema = NULL;

X
Xiaoyu Wang 已提交
1469
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
H
Haojun Liao 已提交
1470 1471 1472 1473 1474
  int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);

  tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN);

1475
  tscInfo("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet);
H
Haojun Liao 已提交
1476 1477 1478 1479
  pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));

  for (int32_t j = 0; j < vgNumGet; j++) {
    SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
H
Haojun Liao 已提交
1480 1481

    makeTopicVgroupKey(vgKey, pTopic->topicName, pVgEp->vgId);
H
Haojun Liao 已提交
1482
    SVgroupSaveInfo* pInfo = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
H
Haojun Liao 已提交
1483

wmmhello's avatar
wmmhello 已提交
1484 1485
    STqOffsetVal offsetNew = {0};
    offsetNew.type = tmq->resetOffsetCfg;
H
Haojun Liao 已提交
1486 1487 1488 1489 1490

    SMqClientVg clientVg = {
        .pollCnt = 0,
        .vgId = pVgEp->vgId,
        .epSet = pVgEp->epSet,
wmmhello's avatar
wmmhello 已提交
1491
        .vgStatus = TMQ_VG_STATUS__IDLE,
H
Haojun Liao 已提交
1492
        .vgSkipCnt = 0,
H
Haojun Liao 已提交
1493
        .emptyBlockReceiveTs = 0,
wmmhello's avatar
wmmhello 已提交
1494
        .numOfRows = pInfo ? pInfo->numOfRows : 0,
H
Haojun Liao 已提交
1495 1496
    };

1497
    clientVg.offsetInfo.endOffset = pInfo ? pInfo->currentOffset : offsetNew;
wmmhello's avatar
wmmhello 已提交
1498
    clientVg.offsetInfo.committedOffset = pInfo ? pInfo->commitOffset : offsetNew;
1499
    clientVg.offsetInfo.beginOffset = pInfo ? pInfo->seekOffset : offsetNew;
H
Haojun Liao 已提交
1500 1501
    clientVg.offsetInfo.walVerBegin = -1;
    clientVg.offsetInfo.walVerEnd = -1;
1502
    clientVg.seekUpdated = false;
1503
//    clientVg.receivedInfoFromVnode = false;
1504

H
Haojun Liao 已提交
1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517
    taosArrayPush(pTopic->vgs, &clientVg);
  }
}

static void freeClientVgInfo(void* param) {
  SMqClientTopic* pTopic = param;
  if (pTopic->schema.nCols) {
    taosMemoryFreeClear(pTopic->schema.pSchema);
  }

  taosArrayDestroy(pTopic->vgs);
}

1518
static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
1519 1520 1521
  bool set = false;

  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
wmmhello's avatar
wmmhello 已提交
1522 1523 1524
  if (epoch <= tmq->epoch) {
    return false;
  }
1525 1526 1527 1528 1529 1530

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

H
Haojun Liao 已提交
1531 1532
  SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pVgOffsetHashMap == NULL) {
1533 1534 1535
    taosArrayDestroy(newTopics);
    return false;
  }
1536

wmmhello's avatar
wmmhello 已提交
1537 1538 1539 1540 1541 1542
  taosWLockLatch(&tmq->lock);
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);

  char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
  tscInfo("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
          tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur);
H
Haojun Liao 已提交
1543
  // todo extract method
1544 1545 1546 1547 1548
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
1549
      tscInfo("consumer:0x%" PRIx64 ", current vg num: %d", tmq->consumerId, vgNumCur);
1550 1551
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
H
Haojun Liao 已提交
1552 1553
        makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId);

wmmhello's avatar
wmmhello 已提交
1554
        char buf[TSDB_OFFSET_LEN] = {0};
1555
        tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.endOffset);
1556
        tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId,
X
Xiaoyu Wang 已提交
1557
                 vgKey, buf);
H
Haojun Liao 已提交
1558

1559
        SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.endOffset, .seekOffset = pVgCur->offsetInfo.beginOffset, .commitOffset = pVgCur->offsetInfo.committedOffset, .numOfRows = pVgCur->numOfRows};
H
Haojun Liao 已提交
1560
        taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo));
1561 1562 1563 1564 1565 1566 1567
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
H
Haojun Liao 已提交
1568
    initClientTopicFromRsp(&topic, pTopicEp, pVgOffsetHashMap, tmq);
1569 1570
    taosArrayPush(newTopics, &topic);
  }
1571

H
Haojun Liao 已提交
1572 1573
  taosHashCleanup(pVgOffsetHashMap);

1574
  // destroy current buffered existed topics info
1575
  if (tmq->clientTopics) {
H
Haojun Liao 已提交
1576
    taosArrayDestroyEx(tmq->clientTopics, freeClientVgInfo);
X
Xiaoyu Wang 已提交
1577
  }
H
Haojun Liao 已提交
1578
  tmq->clientTopics = newTopics;
wmmhello's avatar
wmmhello 已提交
1579
  taosWUnLockLatch(&tmq->lock);
1580

X
Xiaoyu Wang 已提交
1581
  int8_t flag = (topicNumGet == 0) ? TMQ_CONSUMER_STATUS__NO_TOPIC : TMQ_CONSUMER_STATUS__READY;
H
Haojun Liao 已提交
1582
  atomic_store_8(&tmq->status, flag);
X
Xiaoyu Wang 已提交
1583
  atomic_store_32(&tmq->epoch, epoch);
H
Haojun Liao 已提交
1584

1585
  tscInfo("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId);
X
Xiaoyu Wang 已提交
1586 1587 1588
  return set;
}

1589
int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
1590
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
1591 1592 1593
  tmq_t*           tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);

  if (tmq == NULL) {
1594
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
wmmhello's avatar
wmmhello 已提交
1595
//    pParam->pUserFn(tmq, terrno, NULL, pParam->pParam);
1596

1597
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1598
    taosMemoryFree(pMsg->pEpSet);
1599 1600
    taosMemoryFree(pParam);
    return terrno;
1601 1602
  }

H
Haojun Liao 已提交
1603
  if (code != TSDB_CODE_SUCCESS) {
1604 1605 1606 1607 1608 1609 1610 1611 1612
    tscError("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code));
    pParam->pUserFn(tmq, code, NULL, pParam->pParam);

    taosReleaseRef(tmqMgmt.rsetId, pParam->refId);

    taosMemoryFree(pMsg->pData);
    taosMemoryFree(pMsg->pEpSet);
    taosMemoryFree(pParam);
    return code;
1613
  }
L
Liu Jicong 已提交
1614

L
Liu Jicong 已提交
1615
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1616
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1617
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1618 1619 1620
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
  if (head->epoch <= epoch) {
1621
    tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep",
1622
             tmq->consumerId, head->epoch, epoch);
1623

1624 1625 1626 1627 1628 1629 1630 1631
    if (tmq->status == TMQ_CONSUMER_STATUS__RECOVER) {
      SMqAskEpRsp rsp;
      tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
      int8_t flag = (taosArrayGetSize(rsp.topics) == 0) ? TMQ_CONSUMER_STATUS__NO_TOPIC : TMQ_CONSUMER_STATUS__READY;
      atomic_store_8(&tmq->status, flag);
      tDeleteSMqAskEpRsp(&rsp);
    }

X
Xiaoyu Wang 已提交
1632
  } else {
1633
    tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId,
1634
             head->epoch, epoch);
1635
  }
L
Liu Jicong 已提交
1636

1637
  pParam->pUserFn(tmq, code, pMsg, pParam->pParam);
1638 1639
  taosReleaseRef(tmqMgmt.rsetId, pParam->refId);

dengyihao's avatar
dengyihao 已提交
1640
  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
1641
  taosMemoryFree(pMsg->pData);
1642
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
1643
  return code;
1644 1645
}

L
Liu Jicong 已提交
1646
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1647 1648 1649 1650
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1651

1652
  pReq->withTbName = tmq->withTbName;
L
Liu Jicong 已提交
1653
  pReq->consumerId = tmq->consumerId;
1654
  pReq->timeout = timeout;
X
Xiaoyu Wang 已提交
1655
  pReq->epoch = tmq->epoch;
1656
  pReq->reqOffset = pVg->offsetInfo.endOffset;
D
dapan1121 已提交
1657
  pReq->head.vgId = pVg->vgId;
1658 1659
  pReq->useSnapshot = tmq->useSnapshot;
  pReq->reqId = generateRequestId();
1660 1661
}

L
Liu Jicong 已提交
1662 1663
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
L
Liu Jicong 已提交
1664
  pRspObj->resType = RES_TYPE__TMQ_META;
L
Liu Jicong 已提交
1665 1666 1667 1668 1669 1670 1671 1672
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

1673
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
L
Liu Jicong 已提交
1674 1675
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
1676

1677
  (*numOfRows) = 0;
L
Liu Jicong 已提交
1678 1679
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
1680

L
Liu Jicong 已提交
1681
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1682
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1683
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
L
Liu Jicong 已提交
1684

L
Liu Jicong 已提交
1685 1686
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
1687

L
Liu Jicong 已提交
1688
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1689 1690
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1691

1692
  // extract the rows in this data packet
X
Xiaoyu Wang 已提交
1693
  for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) {
1694
    SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, i);
X
Xiaoyu Wang 已提交
1695
    int64_t            rows = htobe64(pRetrieve->numOfRows);
1696
    pVg->numOfRows += rows;
1697
    (*numOfRows) += rows;
1698 1699
  }

L
Liu Jicong 已提交
1700
  return pRspObj;
X
Xiaoyu Wang 已提交
1701 1702
}

1703
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
L
Liu Jicong 已提交
1704
  SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
1705
  pRspObj->resType = RES_TYPE__TMQ_METADATA;
L
Liu Jicong 已提交
1706 1707 1708 1709
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;
  pRspObj->resIter = -1;
1710
  memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp));
L
Liu Jicong 已提交
1711 1712 1713

  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
1714
  if (!pWrapper->taosxRsp.withSchema) {
L
Liu Jicong 已提交
1715 1716 1717
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }

1718 1719 1720 1721 1722 1723 1724
  // extract the rows in this data packet
  for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) {
    SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, i);
    int64_t            rows = htobe64(pRetrieve->numOfRows);
    pVg->numOfRows += rows;
    (*numOfRows) += rows;
  }
L
Liu Jicong 已提交
1725 1726 1727
  return pRspObj;
}

1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760
static int32_t handleErrorBeforePoll(SMqClientVg* pVg, tmq_t* pTmq) {
  atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  tsem_post(&pTmq->rspSem);
  return -1;
}

static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg, int64_t timeout) {
  SMqPollReq req = {0};
  tmqBuildConsumeReqImpl(&req, pTmq, timeout, pTopic, pVg);

  int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
  if (msgSize < 0) {
    return handleErrorBeforePoll(pVg, pTmq);
  }

  char* msg = taosMemoryCalloc(1, msgSize);
  if (NULL == msg) {
    return handleErrorBeforePoll(pVg, pTmq);
  }

  if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
    taosMemoryFree(msg);
    return handleErrorBeforePoll(pVg, pTmq);
  }

  SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
  if (pParam == NULL) {
    taosMemoryFree(msg);
    return handleErrorBeforePoll(pVg, pTmq);
  }

  pParam->refId = pTmq->refId;
  pParam->epoch = pTmq->epoch;
wmmhello's avatar
wmmhello 已提交
1761 1762 1763
//  pParam->pVg = pVg;  // pVg may be released,fix it
//  pParam->pTopic = pTopic;
  strcpy(pParam->topicName, pTopic->topicName);
1764
  pParam->vgId = pVg->vgId;
H
Haojun Liao 已提交
1765
  pParam->requestId = req.reqId;
1766 1767 1768 1769 1770 1771 1772 1773

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pParam);
    taosMemoryFree(msg);
    return handleErrorBeforePoll(pVg, pTmq);
  }

H
Haojun Liao 已提交
1774
  sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
1775 1776 1777 1778 1779 1780 1781
  sendInfo->requestId = req.reqId;
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqPollCb;
  sendInfo->msgType = TDMT_VND_TMQ_CONSUME;

  int64_t transporterId = 0;
wmmhello's avatar
wmmhello 已提交
1782
  char    offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
1783
  tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset);
1784

X
Xiaoyu Wang 已提交
1785 1786
  tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId,
           pTopic->topicName, pVg->vgId, pTmq->epoch, offsetFormatBuf, req.reqId);
1787 1788 1789
  asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);

  pVg->pollCnt++;
1790
  pVg->seekUpdated = false;   // reset this flag.
1791 1792 1793 1794 1795
  pTmq->pollCnt++;

  return TSDB_CODE_SUCCESS;
}

1796
// broadcast the poll request to all related vnodes
H
Haojun Liao 已提交
1797
static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
1798 1799 1800
  if(atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER){
    return 0;
  }
wmmhello's avatar
wmmhello 已提交
1801 1802 1803
  int32_t code = 0;

  taosWLockLatch(&tmq->lock);
1804
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
X
Xiaoyu Wang 已提交
1805
  tscDebug("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics);
1806 1807

  for (int i = 0; i < numOfTopics; i++) {
X
Xiaoyu Wang 已提交
1808
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
X
Xiaoyu Wang 已提交
1809
    int32_t         numOfVg = taosArrayGetSize(pTopic->vgs);
1810 1811

    for (int j = 0; j < numOfVg; j++) {
X
Xiaoyu Wang 已提交
1812
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
wmmhello's avatar
wmmhello 已提交
1813
      if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) {  // less than 10ms
1814
        tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId,
X
Xiaoyu Wang 已提交
1815
                 tmq->epoch, pVg->vgId);
H
Haojun Liao 已提交
1816 1817 1818
        continue;
      }

1819
      int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
1820
      if (vgStatus == TMQ_VG_STATUS__WAIT) {
L
Liu Jicong 已提交
1821
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
1822
        tscTrace("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
X
Xiaoyu Wang 已提交
1823
                 pVg->vgId, vgSkipCnt);
X
Xiaoyu Wang 已提交
1824 1825
        continue;
      }
1826

L
Liu Jicong 已提交
1827
      atomic_store_32(&pVg->vgSkipCnt, 0);
wmmhello's avatar
wmmhello 已提交
1828
      code = doTmqPollImpl(tmq, pTopic, pVg, timeout);
1829
      if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1830
        goto end;
D
dapan1121 已提交
1831
      }
X
Xiaoyu Wang 已提交
1832 1833
    }
  }
1834

wmmhello's avatar
wmmhello 已提交
1835 1836 1837 1838
end:
  taosWUnLockLatch(&tmq->lock);
  tscDebug("consumer:0x%" PRIx64 " end to poll data, code:%d", tmq->consumerId, code);
  return code;
X
Xiaoyu Wang 已提交
1839 1840
}

H
Haojun Liao 已提交
1841
static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
L
Liu Jicong 已提交
1842
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1843
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1844 1845
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1846
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
1847
      doUpdateLocalEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1848
      /*tmqClearUnhandleMsg(tmq);*/
L
Liu Jicong 已提交
1849
      tDeleteSMqAskEpRsp(rspMsg);
X
Xiaoyu Wang 已提交
1850 1851
      *pReset = true;
    } else {
L
Liu Jicong 已提交
1852
      tmqFreeRspWrapper(rspWrapper);
X
Xiaoyu Wang 已提交
1853 1854 1855 1856 1857 1858 1859 1860
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

1861
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId){
wmmhello's avatar
wmmhello 已提交
1862 1863
  if (!pVg->seekUpdated) {
    tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId);
wmmhello's avatar
wmmhello 已提交
1864 1865
    if(reqOffset->type != 0) pVg->offsetInfo.beginOffset = *reqOffset;
    if(rspOffset->type != 0) pVg->offsetInfo.endOffset = *rspOffset;
wmmhello's avatar
wmmhello 已提交
1866 1867 1868 1869 1870 1871 1872 1873 1874
  } else {
    tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId);
  }

  // update the status
  atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);

  // update the valid wal version range
  pVg->offsetInfo.walVerBegin = sver;
1875
  pVg->offsetInfo.walVerEnd = ever + 1;
1876
//  pVg->receivedInfoFromVnode = true;
wmmhello's avatar
wmmhello 已提交
1877 1878
}

H
Haojun Liao 已提交
1879
static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
H
Haojun Liao 已提交
1880
  tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems);
1881

X
Xiaoyu Wang 已提交
1882
  while (1) {
1883 1884
    SMqRspWrapper* pRspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&pRspWrapper);
1885

1886
    if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1887
      taosReadAllQitems(tmq->mqueue, tmq->qall);
1888 1889
      taosGetQitem(tmq->qall, (void**)&pRspWrapper);
      if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1890 1891
        return NULL;
      }
X
Xiaoyu Wang 已提交
1892 1893
    }

X
Xiaoyu Wang 已提交
1894
    tscDebug("consumer:0x%" PRIx64 " handle rsp, type:%d", tmq->consumerId, pRspWrapper->tmqRspType);
H
Haojun Liao 已提交
1895

1896 1897
    if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(pRspWrapper);
L
Liu Jicong 已提交
1898
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
H
Haojun Liao 已提交
1899
      tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(terrno));
L
Liu Jicong 已提交
1900
      return NULL;
1901 1902
    } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
H
Haojun Liao 已提交
1903

X
Xiaoyu Wang 已提交
1904
      int32_t     consumerEpoch = atomic_load_32(&tmq->epoch);
1905 1906 1907
      SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp;

      if (pDataRsp->head.epoch == consumerEpoch) {
wmmhello's avatar
wmmhello 已提交
1908
        taosWLockLatch(&tmq->lock);
wmmhello's avatar
wmmhello 已提交
1909 1910 1911 1912 1913 1914
        SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
        pollRspWrapper->vgHandle = pVg;
        pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
        if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){
          tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
                   pollRspWrapper->topicName, pollRspWrapper->vgId);
wmmhello's avatar
wmmhello 已提交
1915
          taosWUnLockLatch(&tmq->lock);
wmmhello's avatar
wmmhello 已提交
1916 1917
          return NULL;
        }
1918 1919 1920 1921
        // update the epset
        if (pollRspWrapper->pEpset != NULL) {
          SEp* pEp = GET_ACTIVE_EP(pollRspWrapper->pEpset);
          SEp* pOld = GET_ACTIVE_EP(&(pVg->epSet));
X
Xiaoyu Wang 已提交
1922 1923
          tscDebug("consumer:0x%" PRIx64 " update epset vgId:%d, ep:%s:%d, old ep:%s:%d", tmq->consumerId, pVg->vgId,
                   pEp->fqdn, pEp->port, pOld->fqdn, pOld->port);
1924 1925 1926
          pVg->epSet = *pollRspWrapper->pEpset;
        }

1927
        updateVgInfo(pVg, &pDataRsp->reqOffset, &pDataRsp->rspOffset, pDataRsp->head.walsver, pDataRsp->head.walever, tmq->consumerId);
1928

wmmhello's avatar
wmmhello 已提交
1929
        char buf[TSDB_OFFSET_LEN] = {0};
1930
        tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset);
1931
        if (pDataRsp->blockNum == 0) {
X
Xiaoyu Wang 已提交
1932
          tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64
wmmhello's avatar
wmmhello 已提交
1933
                   ", total:%" PRId64 ", reqId:0x%" PRIx64,
X
Xiaoyu Wang 已提交
1934
                   tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
1935
          pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
1936
          pVg->emptyBlockReceiveTs = taosGetTimestampMs();
L
Liu Jicong 已提交
1937
          taosFreeQitem(pollRspWrapper);
1938
        } else {  // build rsp
X
Xiaoyu Wang 已提交
1939
          int64_t    numOfRows = 0;
1940
          SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
H
Haojun Liao 已提交
1941
          tmq->totalRows += numOfRows;
1942
          pVg->emptyBlockReceiveTs = 0;
H
Haojun Liao 已提交
1943
          tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
wmmhello's avatar
wmmhello 已提交
1944
                   ", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64,
H
Haojun Liao 已提交
1945
                   tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
H
Haojun Liao 已提交
1946
                   pollRspWrapper->reqId);
1947
          taosFreeQitem(pollRspWrapper);
wmmhello's avatar
wmmhello 已提交
1948
          taosWUnLockLatch(&tmq->lock);
1949 1950
          return pRsp;
        }
wmmhello's avatar
wmmhello 已提交
1951
        taosWUnLockLatch(&tmq->lock);
X
Xiaoyu Wang 已提交
1952
      } else {
H
Haojun Liao 已提交
1953
        tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
1954
                 tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch);
1955
        pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
L
Liu Jicong 已提交
1956 1957
        taosFreeQitem(pollRspWrapper);
      }
1958
    } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
1959
      // todo handle the wal range and epset for each vgroup
1960
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
L
Liu Jicong 已提交
1961
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
1962 1963 1964

      tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId);

L
Liu Jicong 已提交
1965
      if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
wmmhello's avatar
wmmhello 已提交
1966
        taosWLockLatch(&tmq->lock);
wmmhello's avatar
wmmhello 已提交
1967 1968 1969 1970 1971 1972
        SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
        pollRspWrapper->vgHandle = pVg;
        pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
        if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){
          tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
                   pollRspWrapper->topicName, pollRspWrapper->vgId);
wmmhello's avatar
wmmhello 已提交
1973
          taosWUnLockLatch(&tmq->lock);
wmmhello's avatar
wmmhello 已提交
1974 1975
          return NULL;
        }
H
Haojun Liao 已提交
1976

1977
        updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId);
L
Liu Jicong 已提交
1978
        // build rsp
L
Liu Jicong 已提交
1979
        SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1980
        taosFreeQitem(pollRspWrapper);
wmmhello's avatar
wmmhello 已提交
1981
        taosWUnLockLatch(&tmq->lock);
L
Liu Jicong 已提交
1982 1983
        return pRsp;
      } else {
H
Haojun Liao 已提交
1984
        tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
1985
                 tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
1986
        pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
L
Liu Jicong 已提交
1987
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1988
      }
1989 1990
    } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
X
Xiaoyu Wang 已提交
1991
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
H
Haojun Liao 已提交
1992

L
Liu Jicong 已提交
1993
      if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
wmmhello's avatar
wmmhello 已提交
1994
        taosWLockLatch(&tmq->lock);
wmmhello's avatar
wmmhello 已提交
1995 1996 1997 1998 1999 2000
        SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
        pollRspWrapper->vgHandle = pVg;
        pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
        if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){
          tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
                   pollRspWrapper->topicName, pollRspWrapper->vgId);
wmmhello's avatar
wmmhello 已提交
2001
          taosWUnLockLatch(&tmq->lock);
wmmhello's avatar
wmmhello 已提交
2002 2003
          return NULL;
        }
H
Haojun Liao 已提交
2004

2005
        updateVgInfo(pVg, &pollRspWrapper->taosxRsp.reqOffset, &pollRspWrapper->taosxRsp.rspOffset, pollRspWrapper->taosxRsp.head.walsver, pollRspWrapper->taosxRsp.head.walever, tmq->consumerId);
H
Haojun Liao 已提交
2006

L
Liu Jicong 已提交
2007
        if (pollRspWrapper->taosxRsp.blockNum == 0) {
wmmhello's avatar
wmmhello 已提交
2008
          tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ", reqId:0x%" PRIx64,
H
Haojun Liao 已提交
2009
                   tmq->consumerId, pVg->vgId, pVg->numOfRows, pollRspWrapper->reqId);
H
Haojun Liao 已提交
2010
          pVg->emptyBlockReceiveTs = taosGetTimestampMs();
2011
          pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
H
Haojun Liao 已提交
2012
          taosFreeQitem(pollRspWrapper);
H
Haojun Liao 已提交
2013
        } else {
X
Xiaoyu Wang 已提交
2014
          pVg->emptyBlockReceiveTs = 0;  // reset the ts
wmmhello's avatar
wmmhello 已提交
2015 2016 2017 2018 2019 2020 2021 2022
          // build rsp
          void*   pRsp = NULL;
          int64_t numOfRows = 0;
          if (pollRspWrapper->taosxRsp.createTableNum == 0) {
            pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
          } else {
            pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
          }
2023

wmmhello's avatar
wmmhello 已提交
2024
          tmq->totalRows += numOfRows;
H
Haojun Liao 已提交
2025

wmmhello's avatar
wmmhello 已提交
2026
          char buf[TSDB_OFFSET_LEN] = {0};
2027
          tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.endOffset);
wmmhello's avatar
wmmhello 已提交
2028 2029 2030 2031
          tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
                       ", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64,
                   tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows,
                   tmq->totalRows, pollRspWrapper->reqId);
H
Haojun Liao 已提交
2032

wmmhello's avatar
wmmhello 已提交
2033 2034 2035 2036 2037
          taosFreeQitem(pollRspWrapper);
          taosWUnLockLatch(&tmq->lock);
          return pRsp;
        }
        taosWUnLockLatch(&tmq->lock);
L
Liu Jicong 已提交
2038
      } else {
H
Haojun Liao 已提交
2039
        tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
2040
                 tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
2041
        pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
L
Liu Jicong 已提交
2042 2043
        taosFreeQitem(pollRspWrapper);
      }
X
Xiaoyu Wang 已提交
2044
    } else {
H
Haojun Liao 已提交
2045 2046
      tscDebug("consumer:0x%" PRIx64 " not data msg received", tmq->consumerId);

X
Xiaoyu Wang 已提交
2047
      bool reset = false;
2048 2049
      tmqHandleNoPollRsp(tmq, pRspWrapper, &reset);
      taosFreeQitem(pRspWrapper);
X
Xiaoyu Wang 已提交
2050
      if (pollIfReset && reset) {
2051
        tscDebug("consumer:0x%" PRIx64 ", reset and repoll", tmq->consumerId);
2052
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
2053 2054 2055 2056 2057
      }
    }
  }
}

2058
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
2059 2060
  if(tmq == NULL) return NULL;

L
Liu Jicong 已提交
2061 2062
  void*   rspObj;
  int64_t startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
2063

2064
  tscInfo("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime,
X
Xiaoyu Wang 已提交
2065
           timeout);
L
Liu Jicong 已提交
2066

2067
  // in no topic status, delayed task also need to be processed
L
Liu Jicong 已提交
2068
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
2069
    tscInfo("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId);
2070
    taosMsleep(500);  //     sleep for a while
2071 2072 2073
    return NULL;
  }

wmmhello's avatar
wmmhello 已提交
2074
  while (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
L
Liu Jicong 已提交
2075
    int32_t retryCnt = 0;
2076
    while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) {
H
Haojun Liao 已提交
2077
      if (retryCnt++ > 40) {
L
Liu Jicong 已提交
2078 2079
        return NULL;
      }
2080

2081
      tscInfo("consumer:0x%" PRIx64 " not ready, retry:%d/40 in 500ms", tmq->consumerId, retryCnt);
L
Liu Jicong 已提交
2082 2083 2084 2085
      taosMsleep(500);
    }
  }

X
Xiaoyu Wang 已提交
2086
  while (1) {
L
Liu Jicong 已提交
2087
    tmqHandleAllDelayedTask(tmq);
2088

L
Liu Jicong 已提交
2089
    if (tmqPollImpl(tmq, timeout) < 0) {
2090
      tscError("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId);
L
Liu Jicong 已提交
2091
    }
L
Liu Jicong 已提交
2092

2093
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
2094
    if (rspObj) {
2095
      tscDebug("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj);
L
Liu Jicong 已提交
2096
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
2097
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
2098
      tscInfo("consumer:0x%" PRIx64 " return null since no committed offset", tmq->consumerId);
L
Liu Jicong 已提交
2099
      return NULL;
X
Xiaoyu Wang 已提交
2100
    }
2101

2102
    if (timeout >= 0) {
L
Liu Jicong 已提交
2103
      int64_t currentTime = taosGetTimestampMs();
2104 2105
      int64_t elapsedTime = currentTime - startTime;
      if (elapsedTime > timeout) {
2106
        tscInfo("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
L
Liu Jicong 已提交
2107
                 tmq->consumerId, tmq->epoch, startTime, currentTime);
X
Xiaoyu Wang 已提交
2108 2109
        return NULL;
      }
2110
      tsem_timewait(&tmq->rspSem, (timeout - elapsedTime));
L
Liu Jicong 已提交
2111 2112
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
L
Liu Jicong 已提交
2113
      tsem_timewait(&tmq->rspSem, 1000);
X
Xiaoyu Wang 已提交
2114 2115 2116 2117
    }
  }
}

wmmhello's avatar
wmmhello 已提交
2118 2119
static void displayConsumeStatistics(tmq_t* pTmq) {
  taosRLockLatch(&pTmq->lock);
2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132
  int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics);
  tscDebug("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d",
           pTmq->consumerId, pTmq->pollCnt, pTmq->totalRows, numOfTopics, pTmq->epoch);

  tscDebug("consumer:0x%" PRIx64 " rows dist begin: ", pTmq->consumerId);
  for (int32_t i = 0; i < numOfTopics; ++i) {
    SMqClientTopic* pTopics = taosArrayGet(pTmq->clientTopics, i);

    tscDebug("consumer:0x%" PRIx64 " topic:%d", pTmq->consumerId, i);
    int32_t numOfVgs = taosArrayGetSize(pTopics->vgs);
    for (int32_t j = 0; j < numOfVgs; ++j) {
      SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j);
      tscDebug("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows);
2133
    }
2134
  }
wmmhello's avatar
wmmhello 已提交
2135
  taosRUnLockLatch(&pTmq->lock);
2136 2137
  tscDebug("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId);
}
2138

2139
int32_t tmq_consumer_close(tmq_t* tmq) {
2140 2141
  if(tmq == NULL) return TSDB_CODE_INVALID_PARA;

2142
  tscInfo("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status);
2143
  displayConsumeStatistics(tmq);
2144

2145 2146 2147 2148 2149 2150
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
    // if auto commit is set, commit before close consumer. Otherwise, do nothing.
    if (tmq->autoCommit) {
      int32_t rsp = tmq_commit_sync(tmq, NULL);
      if (rsp != 0) {
        return rsp;
2151 2152
      }
    }
2153
    taosSsleep(2);  // sleep 2s for hb to send offset and rows to server
2154

L
Liu Jicong 已提交
2155
    int32_t     retryCnt = 0;
2156
    tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
2157
    while (1) {
2158
      int32_t rsp = tmq_subscribe(tmq, lst);
L
Liu Jicong 已提交
2159 2160 2161 2162 2163 2164 2165 2166
      if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
        break;
      } else {
        retryCnt++;
        taosMsleep(500);
      }
    }

2167
    tmq_list_destroy(lst);
2168
  } else {
2169
    tscInfo("consumer:0x%" PRIx64 " not in ready state, close it directly", tmq->consumerId);
L
Liu Jicong 已提交
2170
  }
H
Haojun Liao 已提交
2171

2172
  taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
L
Liu Jicong 已提交
2173
  return 0;
2174
}
L
Liu Jicong 已提交
2175

L
Liu Jicong 已提交
2176 2177
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
2178
    return "success";
L
Liu Jicong 已提交
2179
  } else if (err == -1) {
L
Liu Jicong 已提交
2180 2181 2182
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
2183 2184
  }
}
L
Liu Jicong 已提交
2185

L
Liu Jicong 已提交
2186
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
2187 2188 2189
  if (res == NULL){
    return TMQ_RES_INVALID;
  }
L
Liu Jicong 已提交
2190 2191 2192 2193
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
    return TMQ_RES_TABLE_META;
2194 2195
  } else if (TD_RES_TMQ_METADATA(res)) {
    return TMQ_RES_METADATA;
L
Liu Jicong 已提交
2196 2197 2198 2199 2200
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
2201
const char* tmq_get_topic_name(TAOS_RES* res) {
2202 2203 2204
  if (res == NULL){
    return NULL;
  }
L
Liu Jicong 已提交
2205 2206
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
2207
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
2208 2209 2210
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
2211 2212 2213
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
2214 2215 2216 2217 2218
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
2219
const char* tmq_get_db_name(TAOS_RES* res) {
2220 2221 2222 2223
  if (res == NULL){
    return NULL;
  }

L
Liu Jicong 已提交
2224 2225 2226
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
2227 2228 2229
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
2230 2231 2232
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
2233 2234 2235 2236 2237
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
2238
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
2239 2240 2241
  if (res == NULL){
    return -1;
  }
L
Liu Jicong 已提交
2242 2243 2244
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
2245 2246 2247
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
2248
  } else if (TD_RES_TMQ_METADATA(res)) {
2249 2250
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
2251 2252 2253 2254
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
2255

2256
int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
2257 2258 2259
  if (res == NULL){
    return TSDB_CODE_INVALID_PARA;
  }
2260 2261
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*) res;
2262
    STqOffsetVal* pOffset = &pRspObj->rsp.reqOffset;
2263
    if (pOffset->type == TMQ_OFFSET__LOG) {
2264
      return pRspObj->rsp.reqOffset.version;
2265 2266
    }else{
      tscError("invalid offset type:%d", pOffset->type);
2267 2268 2269 2270 2271 2272 2273 2274
    }
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pRspObj = (SMqMetaRspObj*)res;
    if (pRspObj->metaRsp.rspOffset.type == TMQ_OFFSET__LOG) {
      return pRspObj->metaRsp.rspOffset.version;
    }
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*) res;
2275 2276
    if (pRspObj->rsp.reqOffset.type == TMQ_OFFSET__LOG) {
      return pRspObj->rsp.reqOffset.version;
2277
    }
2278
  } else{
2279
    tscError("invalid tmq type:%d", *(int8_t*)res);
2280 2281 2282
  }

  // data from tsdb, no valid offset info
2283
  return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
2284 2285
}

L
Liu Jicong 已提交
2286
const char* tmq_get_table_name(TAOS_RES* res) {
2287 2288 2289
  if (res == NULL){
    return NULL;
  }
L
Liu Jicong 已提交
2290 2291 2292 2293 2294 2295
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
2296
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
2297 2298
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
L
Liu Jicong 已提交
2299 2300 2301
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
2302
    }
L
Liu Jicong 已提交
2303 2304
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
  }
L
Liu Jicong 已提交
2305 2306
  return NULL;
}
2307

2308
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) {
2309 2310
  if (tmq == NULL) {
    tscError("invalid tmq handle, null");
2311 2312 2313
    if(cb != NULL) {
      cb(tmq, TSDB_CODE_INVALID_PARA, param);
    }
2314 2315
    return;
  }
2316 2317 2318
  if (pRes == NULL) {  // here needs to commit all offsets.
    asyncCommitAllOffsets(tmq, cb, param);
  } else {  // only commit one offset
2319
    asyncCommitFromResult(tmq, pRes, cb, param);
2320
  }
L
Liu Jicong 已提交
2321 2322
}

2323
static void commitCallBackFn(tmq_t *UNUSED_PARAM(tmq), int32_t code, void* param) {
2324 2325 2326
  SSyncCommitInfo* pInfo = (SSyncCommitInfo*) param;
  pInfo->code = code;
  tsem_post(&pInfo->sem);
2327
}
2328

2329
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
2330 2331 2332 2333 2334
  if (tmq == NULL) {
    tscError("invalid tmq handle, null");
    return TSDB_CODE_INVALID_PARA;
  }

2335 2336 2337 2338 2339 2340 2341 2342
  int32_t code = 0;

  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
  tsem_init(&pInfo->sem, 0, 0);
  pInfo->code = 0;

  if (pRes == NULL) {
    asyncCommitAllOffsets(tmq, commitCallBackFn, pInfo);
H
Haojun Liao 已提交
2343
  } else {
2344
    asyncCommitFromResult(tmq, pRes, commitCallBackFn, pInfo);
2345 2346
  }

2347 2348
  tsem_wait(&pInfo->sem);
  code = pInfo->code;
H
Haojun Liao 已提交
2349 2350

  tsem_destroy(&pInfo->sem);
2351 2352
  taosMemoryFree(pInfo);

2353
  tscInfo("consumer:0x%" PRIx64 " sync res commit done, code:%s", tmq->consumerId, tstrerror(code));
2354 2355 2356
  return code;
}

2357
// wal range will be ok after calling tmq_get_topic_assignment or poll interface
2358 2359 2360 2361
static int32_t checkWalRange(SVgOffsetInfo* offset, int64_t value){
  if (offset->walVerBegin == -1 || offset->walVerEnd == -1) {
    tscError("Assignment or poll interface need to be called first");
    return TSDB_CODE_TMQ_NEED_INITIALIZED;
2362
  }
2363 2364 2365 2366 2367 2368 2369

  if (value != -1 && (value < offset->walVerBegin || value > offset->walVerEnd)) {
    tscError("invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", value, offset->walVerBegin, offset->walVerEnd);
    return TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE;
  }

  return 0;
2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383
}

int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset){
  if (tmq == NULL || pTopicName == NULL) {
    tscError("invalid tmq handle, null");
    return TSDB_CODE_INVALID_PARA;
  }

  int32_t accId = tmq->pTscObj->acctId;
  char tname[TSDB_TOPIC_FNAME_LEN] = {0};
  sprintf(tname, "%d.%s", accId, pTopicName);

  taosWLockLatch(&tmq->lock);
  SMqClientVg* pVg = NULL;
2384 2385
  int32_t code = getClientVg(tmq, tname, vgId, &pVg);
  if(code != 0){
2386
    taosWUnLockLatch(&tmq->lock);
2387
    return code;
2388 2389 2390
  }

  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
2391 2392
  code = checkWalRange(pOffsetInfo, offset);
  if (code != 0) {
2393
    taosWUnLockLatch(&tmq->lock);
2394
    return code;
2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408
  }
  taosWUnLockLatch(&tmq->lock);

  STqOffsetVal offsetVal = {.type = TMQ_OFFSET__LOG, .version = offset};

  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
  if (pInfo == NULL) {
    tscError("consumer:0x%"PRIx64" failed to prepare seek operation", tmq->consumerId);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  tsem_init(&pInfo->sem, 0, 0);
  pInfo->code = 0;

2409 2410 2411 2412 2413
  code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, commitCallBackFn, pInfo);
  if(code == 0){
    tsem_wait(&pInfo->sem);
    code = pInfo->code;
  }
2414 2415 2416 2417

  tsem_destroy(&pInfo->sem);
  taosMemoryFree(pInfo);

2418
  tscInfo("consumer:0x%" PRIx64 " sync send commit to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code));
2419 2420 2421 2422

  return code;
}

2423 2424
void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param){
  int32_t code = 0;
2425 2426
  if (tmq == NULL || pTopicName == NULL) {
    tscError("invalid tmq handle, null");
2427 2428
    code = TSDB_CODE_INVALID_PARA;
    goto  end;
2429 2430 2431 2432 2433 2434 2435 2436
  }

  int32_t accId = tmq->pTscObj->acctId;
  char tname[TSDB_TOPIC_FNAME_LEN] = {0};
  sprintf(tname, "%d.%s", accId, pTopicName);

  taosWLockLatch(&tmq->lock);
  SMqClientVg* pVg = NULL;
2437
  code = getClientVg(tmq, tname, vgId, &pVg);
2438 2439
  if(code != 0){
    taosWUnLockLatch(&tmq->lock);
2440
    goto end;
2441 2442 2443 2444 2445 2446
  }

  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
  code = checkWalRange(pOffsetInfo, offset);
  if (code != 0) {
    taosWUnLockLatch(&tmq->lock);
2447
    goto end;
2448 2449 2450 2451 2452 2453 2454
  }
  taosWUnLockLatch(&tmq->lock);

  STqOffsetVal offsetVal = {.type = TMQ_OFFSET__LOG, .version = offset};

  code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, cb, param);

2455
  tscInfo("consumer:0x%" PRIx64 " async send commit to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code));
2456

2457 2458 2459 2460
end:
  if(code != 0 && cb != NULL){
    cb(tmq, code, param);
  }
2461 2462
}

2463 2464 2465 2466 2467 2468 2469 2470 2471
void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
  SAskEpInfo* pInfo = param;
  pInfo->code = code;

  if (code == TSDB_CODE_SUCCESS) {
    SMqRspHead* head = pDataBuf->pData;

    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &rsp);
2472
    doUpdateLocalEp(pTmq, head->epoch, &rsp);
2473 2474 2475
    tDeleteSMqAskEpRsp(&rsp);
  }

H
Haojun Liao 已提交
2476
  tsem_post(&pInfo->sem);
2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502
}

void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return;
  }

  SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0);
  if (pWrapper == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return;
  }

  SMqRspHead* head = pDataBuf->pData;

  pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
  pWrapper->epoch = head->epoch;
  memcpy(&pWrapper->msg, pDataBuf->pData, sizeof(SMqRspHead));
  tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &pWrapper->msg);

  taosWriteQitem(pTmq->mqueue, pWrapper);
}

int32_t doAskEp(tmq_t* pTmq) {
  SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo));
H
Haojun Liao 已提交
2503
  tsem_init(&pInfo->sem, 0, 0);
2504 2505

  asyncAskEp(pTmq, updateEpCallbackFn, pInfo);
H
Haojun Liao 已提交
2506
  tsem_wait(&pInfo->sem);
2507 2508

  int32_t code = pInfo->code;
H
Haojun Liao 已提交
2509
  tsem_destroy(&pInfo->sem);
2510 2511 2512 2513 2514
  taosMemoryFree(pInfo);
  return code;
}

void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) {
2515
  SMqAskEpReq req = {0};
2516 2517 2518
  req.consumerId = pTmq->consumerId;
  req.epoch = pTmq->epoch;
  strcpy(req.cgroup, pTmq->groupId);
2519 2520 2521

  int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
  if (tlen < 0) {
2522 2523 2524
    tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", pTmq->consumerId);
    askEpFn(pTmq, TSDB_CODE_INVALID_PARA, NULL, param);
    return;
2525 2526 2527 2528
  }

  void* pReq = taosMemoryCalloc(1, tlen);
  if (pReq == NULL) {
2529 2530 2531
    tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", pTmq->consumerId, tlen);
    askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
    return;
2532 2533 2534
  }

  if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
2535
    tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen);
2536
    taosMemoryFree(pReq);
2537 2538 2539

    askEpFn(pTmq, TSDB_CODE_INVALID_PARA, NULL, param);
    return;
2540 2541 2542 2543
  }

  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
  if (pParam == NULL) {
2544
    tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId);
2545
    taosMemoryFree(pReq);
2546 2547 2548

    askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
    return;
2549 2550
  }

2551 2552 2553 2554
  pParam->refId = pTmq->refId;
  pParam->epoch = pTmq->epoch;
  pParam->pUserFn = askEpFn;
  pParam->pParam = param;
2555 2556 2557 2558 2559

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pParam);
    taosMemoryFree(pReq);
2560 2561
    askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
    return;
2562 2563
  }

X
Xiaoyu Wang 已提交
2564
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
2565 2566 2567 2568

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
2569
  sendInfo->fp = askEpCallbackFn;
2570 2571
  sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;

2572
  SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
2573
  tscInfo("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
2574 2575

  int64_t transporterId = 0;
2576
  asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
2577 2578 2579 2580 2581 2582 2583
}

int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
2584 2585 2586
  int64_t refId = pParamSet->refId;

  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
2587 2588 2589 2590 2591 2592 2593
  if (tmq == NULL) {
    taosMemoryFree(pParamSet);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  // if no more waiting rsp
2594 2595 2596 2597
  if(pParamSet->callbackFn != NULL){
    pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam);
  }

2598
  taosMemoryFree(pParamSet);
wmmhello's avatar
wmmhello 已提交
2599
//  tmq->needReportOffsetRows = true;
2600 2601

  taosReleaseRef(tmqMgmt.rsetId, refId);
2602
  return 0;
2603 2604
}

2605
void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) {
2606 2607
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
  if (waitingRspNum == 0) {
2608
    tscInfo("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic, vgId);
2609
    tmqCommitDone(pParamSet);
H
Haojun Liao 已提交
2610
  } else {
2611
    tscInfo("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId, waitingRspNum);
2612 2613
  }
}
2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635

SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) {
  SMqRspObj* pRspObj = (SMqRspObj*)res;
  pRspObj->resIter++;

  if (pRspObj->resIter < pRspObj->rsp.blockNum) {
    SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter);
    if (pRspObj->rsp.withSchema) {
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRspObj->rsp.blockSchema, pRspObj->resIter);
      setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols);
      taosMemoryFreeClear(pRspObj->resInfo.row);
      taosMemoryFreeClear(pRspObj->resInfo.pCol);
      taosMemoryFreeClear(pRspObj->resInfo.length);
      taosMemoryFreeClear(pRspObj->resInfo.convertBuf);
      taosMemoryFreeClear(pRspObj->resInfo.convertJson);
    }

    setQueryResultFromRsp(&pRspObj->resInfo, pRetrieve, convertUcs4, false);
    return &pRspObj->resInfo;
  }

  return NULL;
H
Haojun Liao 已提交
2636 2637
}

2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656
static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
  SMqVgWalInfoParam* pParam = param;
  SMqVgCommon* pCommon = pParam->pCommon;

  int32_t total = atomic_add_fetch_32(&pCommon->numOfRsp, 1);
  if (code != TSDB_CODE_SUCCESS) {
    tscError("consumer:0x%" PRIx64 " failed to get the wal info from vgId:%d for topic:%s", pCommon->consumerId,
             pParam->vgId, pCommon->pTopicName);
    pCommon->code = code;
  } else {
    SMqDataRsp rsp;
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeMqDataRsp(&decoder, &rsp);
    tDecoderClear(&decoder);

    SMqRspHead* pHead = pMsg->pData;

    tmq_topic_assignment assignment = {.begin = pHead->walsver,
2657
                                       .end = pHead->walever + 1,
2658
                                       .currentOffset = rsp.rspOffset.version,
2659
                                       .vgId = pParam->vgId};
2660 2661 2662 2663 2664 2665 2666 2667 2668 2669

    taosThreadMutexLock(&pCommon->mutex);
    taosArrayPush(pCommon->pList, &assignment);
    taosThreadMutexUnlock(&pCommon->mutex);
  }

  if (total == pParam->totalReq) {
    tsem_post(&pCommon->rsp);
  }

2670 2671
  taosMemoryFree(pMsg->pData);
  taosMemoryFree(pMsg->pEpSet);
2672 2673 2674 2675 2676
  taosMemoryFree(pParam);
  return 0;
}

static void destroyCommonInfo(SMqVgCommon* pCommon) {
wmmhello's avatar
wmmhello 已提交
2677 2678 2679
  if(pCommon == NULL){
    return;
  }
2680 2681 2682 2683 2684 2685 2686
  taosArrayDestroy(pCommon->pList);
  tsem_destroy(&pCommon->rsp);
  taosThreadMutexDestroy(&pCommon->mutex);
  taosMemoryFree(pCommon->pTopicName);
  taosMemoryFree(pCommon);
}

2687 2688 2689 2690 2691 2692 2693
static bool isInSnapshotMode(int8_t type, bool useSnapshot){
  if ((type < TMQ_OFFSET__LOG && useSnapshot) || type > TMQ_OFFSET__LOG) {
    return true;
  }
  return false;
}

2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789
static int32_t tmCommittedCb(void* param, SDataBuf* pMsg, int32_t code) {
  SMqCommittedParam* pParam = param;

  if (code != 0){
    goto end;
  }
  if (pMsg) {
    SDecoder decoder;
    tDecoderInit(&decoder, (uint8_t*)pMsg->pData, pMsg->len);
    if (tDecodeMqVgOffset(&decoder, &pParam->vgOffset) < 0) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto end;
    }
    tDecoderClear(&decoder);
  }

  end:
  if(pMsg){
    taosMemoryFree(pMsg->pData);
    taosMemoryFree(pMsg->pEpSet);
  }
  pParam->code = code;
  tsem_post(&pParam->sem);
  return 0;
}

int64_t getCommittedFromServer(tmq_t *tmq, char* tname, int32_t vgId, SEpSet* epSet){
  int32_t code = 0;
  SMqVgOffset pOffset = {0};

  pOffset.consumerId = tmq->consumerId;

  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pOffset.offset.subKey, tmq->groupId, groupLen);
  pOffset.offset.subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pOffset.offset.subKey + groupLen + 1, tname);

  int32_t len = 0;
  tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code);
  if (code < 0) {
    return TSDB_CODE_INVALID_PARA;
  }

  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
  if (buf == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  ((SMsgHead*)buf)->vgId = htonl(vgId);

  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
  tEncodeMqVgOffset(&encoder, &pOffset);
  tEncoderClear(&encoder);

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(buf);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SMqCommittedParam* pParam = taosMemoryMalloc(sizeof(SMqCommittedParam));
  if (pParam == NULL) {
    taosMemoryFree(buf);
    taosMemoryFree(sendInfo);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  tsem_init(&pParam->sem, 0, 0);

  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL};
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmCommittedCb;
  sendInfo->msgType = TDMT_VND_TMQ_VG_COMMITTEDINFO;

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, sendInfo);

  tsem_wait(&pParam->sem);
  code = pParam->code;
  if(code == TSDB_CODE_SUCCESS){
    if(pParam->vgOffset.offset.val.type == TMQ_OFFSET__LOG){
      code = pParam->vgOffset.offset.val.version;
    }else{
      code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
    }
  }
  tsem_destroy(&pParam->sem);
  taosMemoryFree(pParam);

  return code;
}

2790
int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId){
2791
  if (tmq == NULL || pTopicName == NULL) {
2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802
    tscError("invalid tmq handle, null");
    return TSDB_CODE_INVALID_PARA;
  }

  int32_t accId = tmq->pTscObj->acctId;
  char tname[TSDB_TOPIC_FNAME_LEN] = {0};
  sprintf(tname, "%d.%s", accId, pTopicName);

  taosWLockLatch(&tmq->lock);

  SMqClientVg* pVg = NULL;
2803 2804
  int32_t code = getClientVg(tmq, tname, vgId, &pVg);
  if(code != 0){
2805
    taosWUnLockLatch(&tmq->lock);
2806
    return code;
2807 2808
  }

2809 2810
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
  int32_t type = pOffsetInfo->endOffset.type;
2811 2812 2813 2814 2815 2816
  if (isInSnapshotMode(type, tmq->useSnapshot)) {
    tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, position error", tmq->consumerId, type);
    taosWUnLockLatch(&tmq->lock);
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
  }

2817 2818
  code = checkWalRange(pOffsetInfo, -1);
  if (code != 0) {
2819
    taosWUnLockLatch(&tmq->lock);
2820
    return code;
2821
  }
2822 2823 2824 2825
  SEpSet epSet = pVg->epSet;
  int64_t begin = pVg->offsetInfo.walVerBegin;
  int64_t end = pVg->offsetInfo.walVerEnd;
  taosWUnLockLatch(&tmq->lock);
2826 2827 2828

  int64_t position = 0;
  if(type == TMQ_OFFSET__LOG){
2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839 2840
    position = pOffsetInfo->endOffset.version;
  }else if(type == TMQ_OFFSET__RESET_EARLIEST || type == TMQ_OFFSET__RESET_LATEST){
    code = getCommittedFromServer(tmq, tname, vgId, &epSet);
    if(code == TSDB_CODE_TMQ_NO_COMMITTED){
      if(type == TMQ_OFFSET__RESET_EARLIEST){
        position = begin;
      } else if(type == TMQ_OFFSET__RESET_LATEST){
        position = end;
      }
    }else{
      position = code;
    }
2841 2842 2843 2844
  }else{
    tscError("consumer:0x%" PRIx64 " offset type:%d can not be reach here", tmq->consumerId, type);
  }

2845
  tscInfo("consumer:0x%" PRIx64 " tmq_position vgId:%d position:%" PRId64, tmq->consumerId, vgId, position);
2846 2847 2848
  return position;
}

2849 2850 2851 2852 2853 2854 2855 2856 2857 2858 2859 2860 2861 2862 2863 2864 2865 2866 2867 2868 2869 2870 2871 2872 2873 2874 2875 2876 2877 2878 2879 2880 2881 2882 2883 2884
int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId){
  if (tmq == NULL || pTopicName == NULL) {
    tscError("invalid tmq handle, null");
    return TSDB_CODE_INVALID_PARA;
  }

  int32_t accId = tmq->pTscObj->acctId;
  char tname[TSDB_TOPIC_FNAME_LEN] = {0};
  sprintf(tname, "%d.%s", accId, pTopicName);

  taosWLockLatch(&tmq->lock);

  SMqClientVg* pVg = NULL;
  int32_t code = getClientVg(tmq, tname, vgId, &pVg);
  if(code != 0){
    taosWUnLockLatch(&tmq->lock);
    return code;
  }

  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
  if (isInSnapshotMode(pOffsetInfo->endOffset.type, tmq->useSnapshot)) {
    tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId, pOffsetInfo->endOffset.type);
    taosWUnLockLatch(&tmq->lock);
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
  }

  if (isInSnapshotMode(pOffsetInfo->committedOffset.type, tmq->useSnapshot)) {
    tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId, pOffsetInfo->committedOffset.type);
    taosWUnLockLatch(&tmq->lock);
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
  }

  int64_t committed = 0;
  if(pOffsetInfo->committedOffset.type == TMQ_OFFSET__LOG){
    committed = pOffsetInfo->committedOffset.version;
    taosWUnLockLatch(&tmq->lock);
2885
    goto end;
2886 2887 2888 2889
  }
  SEpSet epSet = pVg->epSet;
  taosWUnLockLatch(&tmq->lock);

2890 2891 2892 2893 2894
  committed = getCommittedFromServer(tmq, tname, vgId, &epSet);

end:
  tscInfo("consumer:0x%" PRIx64 " tmq_committed vgId:%d committed:%" PRId64, tmq->consumerId, vgId, committed);
  return committed;
2895 2896
}

H
Haojun Liao 已提交
2897
int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment,
H
Haojun Liao 已提交
2898
                                 int32_t* numOfAssignment) {
2899 2900 2901 2902
  if(tmq == NULL || pTopicName == NULL || assignment == NULL || numOfAssignment == NULL){
    tscError("invalid tmq handle, null");
    return TSDB_CODE_INVALID_PARA;
  }
H
Haojun Liao 已提交
2903 2904
  *numOfAssignment = 0;
  *assignment = NULL;
wmmhello's avatar
wmmhello 已提交
2905
  SMqVgCommon* pCommon = NULL;
H
Haojun Liao 已提交
2906

2907
  int32_t accId = tmq->pTscObj->acctId;
2908
  char    tname[TSDB_TOPIC_FNAME_LEN] = {0};
2909
  sprintf(tname, "%d.%s", accId, pTopicName);
wmmhello's avatar
wmmhello 已提交
2910
  int32_t code = TSDB_CODE_SUCCESS;
2911

wmmhello's avatar
wmmhello 已提交
2912
  taosWLockLatch(&tmq->lock);
2913
  SMqClientTopic* pTopic = getTopicByName(tmq, tname);
H
Haojun Liao 已提交
2914
  if (pTopic == NULL) {
2915
    code = TSDB_CODE_TMQ_INVALID_TOPIC;
wmmhello's avatar
wmmhello 已提交
2916
    goto end;
H
Haojun Liao 已提交
2917 2918 2919 2920
  }

  // in case of snapshot is opened, no valid offset will return
  *numOfAssignment = taosArrayGetSize(pTopic->vgs);
2921 2922
  for (int32_t j = 0; j < (*numOfAssignment); ++j) {
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
2923
    int32_t type = pClientVg->offsetInfo.beginOffset.type;
2924 2925
    if (isInSnapshotMode(type, tmq->useSnapshot)) {
      tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, assignment not allowed", tmq->consumerId, type);
2926 2927 2928 2929
      code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
      goto end;
    }
  }
2930 2931 2932 2933 2934

  *assignment = taosMemoryCalloc(*numOfAssignment, sizeof(tmq_topic_assignment));
  if (*assignment == NULL) {
    tscError("consumer:0x%" PRIx64 " failed to malloc buffer, size:%" PRIzu, tmq->consumerId,
             (*numOfAssignment) * sizeof(tmq_topic_assignment));
wmmhello's avatar
wmmhello 已提交
2935 2936
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
2937 2938
  }

2939 2940
  bool needFetch = false;

H
Haojun Liao 已提交
2941 2942
  for (int32_t j = 0; j < (*numOfAssignment); ++j) {
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
2943
    if (pClientVg->offsetInfo.beginOffset.type != TMQ_OFFSET__LOG) {
2944 2945 2946
      needFetch = true;
      break;
    }
H
Haojun Liao 已提交
2947 2948

    tmq_topic_assignment* pAssignment = &(*assignment)[j];
2949
    pAssignment->currentOffset = pClientVg->offsetInfo.beginOffset.version;
H
Haojun Liao 已提交
2950 2951
    pAssignment->begin = pClientVg->offsetInfo.walVerBegin;
    pAssignment->end = pClientVg->offsetInfo.walVerEnd;
2952
    pAssignment->vgId = pClientVg->vgId;
wmmhello's avatar
wmmhello 已提交
2953 2954
    tscInfo("consumer:0x%" PRIx64 " get assignment from local:%d->%" PRId64, tmq->consumerId,
            pAssignment->vgId, pAssignment->currentOffset);
H
Haojun Liao 已提交
2955 2956
  }

2957
  if (needFetch) {
wmmhello's avatar
wmmhello 已提交
2958
    pCommon = taosMemoryCalloc(1, sizeof(SMqVgCommon));
2959 2960
    if (pCommon == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
2961 2962
      code = terrno;
      goto end;
2963 2964 2965 2966 2967 2968 2969 2970 2971 2972 2973 2974 2975 2976
    }

    pCommon->pList= taosArrayInit(4, sizeof(tmq_topic_assignment));
    tsem_init(&pCommon->rsp, 0, 0);
    taosThreadMutexInit(&pCommon->mutex, 0);
    pCommon->pTopicName = taosStrdup(pTopic->topicName);
    pCommon->consumerId = tmq->consumerId;

    terrno = TSDB_CODE_OUT_OF_MEMORY;
    for (int32_t i = 0; i < (*numOfAssignment); ++i) {
      SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);

      SMqVgWalInfoParam* pParam = taosMemoryMalloc(sizeof(SMqVgWalInfoParam));
      if (pParam == NULL) {
wmmhello's avatar
wmmhello 已提交
2977 2978
        code = terrno;
        goto end;
2979 2980 2981 2982 2983 2984 2985 2986 2987
      }

      pParam->epoch = tmq->epoch;
      pParam->vgId = pClientVg->vgId;
      pParam->totalReq = *numOfAssignment;
      pParam->pCommon = pCommon;

      SMqPollReq req = {0};
      tmqBuildConsumeReqImpl(&req, tmq, 10, pTopic, pClientVg);
2988
      req.reqOffset = pClientVg->offsetInfo.beginOffset;
2989 2990 2991 2992

      int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
      if (msgSize < 0) {
        taosMemoryFree(pParam);
wmmhello's avatar
wmmhello 已提交
2993 2994
        code = terrno;
        goto end;
2995 2996 2997 2998 2999
      }

      char* msg = taosMemoryCalloc(1, msgSize);
      if (NULL == msg) {
        taosMemoryFree(pParam);
wmmhello's avatar
wmmhello 已提交
3000 3001
        code = terrno;
        goto end;
3002 3003 3004 3005 3006
      }

      if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
        taosMemoryFree(msg);
        taosMemoryFree(pParam);
wmmhello's avatar
wmmhello 已提交
3007 3008
        code = terrno;
        goto end;
3009 3010 3011 3012 3013 3014
      }

      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
      if (sendInfo == NULL) {
        taosMemoryFree(pParam);
        taosMemoryFree(msg);
wmmhello's avatar
wmmhello 已提交
3015 3016
        code = terrno;
        goto end;
3017 3018 3019 3020 3021 3022 3023 3024 3025 3026
      }

      sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
      sendInfo->requestId = req.reqId;
      sendInfo->requestObjRefId = 0;
      sendInfo->param = pParam;
      sendInfo->fp = tmqGetWalInfoCb;
      sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO;

      int64_t transporterId = 0;
wmmhello's avatar
wmmhello 已提交
3027
      char    offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
3028
      tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.beginOffset);
3029

3030
      tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64,
wmmhello's avatar
wmmhello 已提交
3031
              tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
3032 3033 3034 3035
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo);
    }

    tsem_wait(&pCommon->rsp);
wmmhello's avatar
wmmhello 已提交
3036
    code = pCommon->code;
3037 3038 3039

    terrno = code;
    if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
3040
      goto end;
3041
    }
wmmhello's avatar
wmmhello 已提交
3042 3043 3044
    int32_t num = taosArrayGetSize(pCommon->pList);
    for(int32_t i = 0; i < num; ++i) {
      (*assignment)[i] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i);
3045
    }
wmmhello's avatar
wmmhello 已提交
3046
    *numOfAssignment = num;
3047

3048 3049 3050 3051 3052 3053 3054 3055 3056 3057
    for (int32_t j = 0; j < (*numOfAssignment); ++j) {
      tmq_topic_assignment* p = &(*assignment)[j];

      for(int32_t i = 0; i < taosArrayGetSize(pTopic->vgs); ++i) {
        SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
        if (pClientVg->vgId != p->vgId) {
          continue;
        }

        SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;
3058
        tscInfo("consumer:0x%" PRIx64 " %s vgId:%d offset is update to:%"PRId64, tmq->consumerId, pTopic->topicName, p->vgId, p->currentOffset);
3059 3060 3061 3062 3063

        pOffsetInfo->walVerBegin = p->begin;
        pOffsetInfo->walVerEnd = p->end;
      }
    }
wmmhello's avatar
wmmhello 已提交
3064
  }
3065

wmmhello's avatar
wmmhello 已提交
3066 3067 3068 3069 3070
end:
  if(code != TSDB_CODE_SUCCESS){
    taosMemoryFree(*assignment);
    *assignment = NULL;
    *numOfAssignment = 0;
3071
  }
wmmhello's avatar
wmmhello 已提交
3072 3073 3074
  destroyCommonInfo(pCommon);
  taosWUnLockLatch(&tmq->lock);
  return code;
H
Haojun Liao 已提交
3075 3076
}

T
t_max 已提交
3077 3078 3079 3080 3081 3082 3083 3084
void tmq_free_assignment(tmq_topic_assignment* pAssignment) {
    if (pAssignment == NULL) {
        return;
    }

    taosMemoryFree(pAssignment);
}

3085 3086 3087 3088 3089 3090 3091 3092 3093 3094 3095
static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) {
  if (pMsg) {
    taosMemoryFree(pMsg->pData);
    taosMemoryFree(pMsg->pEpSet);
  }
  SMqSeekParam* pParam = param;
  pParam->code = code;
  tsem_post(&pParam->sem);
  return 0;
}

3096
// seek interface have to send msg to server to cancel push handle if needed, because consumer may be in wait status if there is no data to poll
3097
int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
3098
  if (tmq == NULL || pTopicName == NULL) {
H
Haojun Liao 已提交
3099
    tscError("invalid tmq handle, null");
H
Haojun Liao 已提交
3100 3101 3102
    return TSDB_CODE_INVALID_PARA;
  }

3103
  int32_t accId = tmq->pTscObj->acctId;
3104
  char tname[TSDB_TOPIC_FNAME_LEN] = {0};
3105 3106
  sprintf(tname, "%d.%s", accId, pTopicName);

wmmhello's avatar
wmmhello 已提交
3107
  taosWLockLatch(&tmq->lock);
H
Haojun Liao 已提交
3108 3109

  SMqClientVg* pVg = NULL;
3110 3111
  int32_t code = getClientVg(tmq, tname, vgId, &pVg);
  if(code != 0){
wmmhello's avatar
wmmhello 已提交
3112
    taosWUnLockLatch(&tmq->lock);
3113
    return code;
H
Haojun Liao 已提交
3114 3115 3116 3117
  }

  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;

3118
  int32_t type = pOffsetInfo->endOffset.type;
3119
  if (isInSnapshotMode(type, tmq->useSnapshot)) {
wmmhello's avatar
wmmhello 已提交
3120 3121
    tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type);
    taosWUnLockLatch(&tmq->lock);
3122
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
wmmhello's avatar
wmmhello 已提交
3123 3124
  }

3125
  code = checkWalRange(pOffsetInfo, offset);
3126
  if (code != 0) {
wmmhello's avatar
wmmhello 已提交
3127
    taosWUnLockLatch(&tmq->lock);
3128
    return code;
wmmhello's avatar
wmmhello 已提交
3129
  }
H
Haojun Liao 已提交
3130

3131
  tscInfo("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, vgId);
H
Haojun Liao 已提交
3132
  // update the offset, and then commit to vnode
3133 3134 3135
  pOffsetInfo->endOffset.type = TMQ_OFFSET__LOG;
  pOffsetInfo->endOffset.version = offset;
  pOffsetInfo->beginOffset = pOffsetInfo->endOffset;
wmmhello's avatar
wmmhello 已提交
3136
  pVg->seekUpdated = true;
3137 3138
  SEpSet epSet = pVg->epSet;
  taosWUnLockLatch(&tmq->lock);
3139 3140

  SMqSeekReq req = {0};
3141 3142
  snprintf(req.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s:%s", tmq->groupId, tname);
  req.head.vgId = vgId;
3143 3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166 3167 3168 3169 3170 3171 3172 3173 3174 3175 3176 3177 3178 3179
  req.consumerId = tmq->consumerId;

  int32_t msgSize = tSerializeSMqSeekReq(NULL, 0, &req);
  if (msgSize < 0) {
    return TSDB_CODE_PAR_INTERNAL_ERROR;
  }

  char* msg = taosMemoryCalloc(1, msgSize);
  if (NULL == msg) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  if (tSerializeSMqSeekReq(msg, msgSize, &req) < 0) {
    taosMemoryFree(msg);
    return TSDB_CODE_PAR_INTERNAL_ERROR;
  }

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(msg);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SMqSeekParam* pParam = taosMemoryMalloc(sizeof(SMqSeekParam));
  if (pParam == NULL) {
    taosMemoryFree(msg);
    taosMemoryFree(sendInfo);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  tsem_init(&pParam->sem, 0, 0);

  sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqSeekCb;
  sendInfo->msgType = TDMT_VND_TMQ_SEEK;
H
Haojun Liao 已提交
3180

3181
  int64_t transporterId = 0;
3182
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
3183

3184
  tsem_wait(&pParam->sem);
3185
  code = pParam->code;
3186 3187
  tsem_destroy(&pParam->sem);
  taosMemoryFree(pParam);
H
Haojun Liao 已提交
3188

3189
  tscInfo("consumer:0x%" PRIx64 "send seek to vgId:%d, return code:%s", tmq->consumerId, vgId, tstrerror(code));
3190 3191

  return code;
P
plum-lihui 已提交
3192
}